今日概要
1、線程
2、協程
3、socketserver
4、基于udp的socket(見第八節)
一、線程
1、threading子產品
第一種方法:執行個體化
import threading
import time
#第一種方法執行個體化
def sayhi(num):
print('running on num %s' %(num))
time.sleep(3)
if __name__ == '__main__':
t1 = threading.Thread(target=sayhi,args=(1,)) #執行個體化線程
t2 = threading.Thread(target=sayhi, args=(1,))
t1.start() #執行start方法
t2.start()
print(threading.active_count()) #統計線程數量
print(t1.getName())
print(t2.getName())
第二種方法類的繼承
import threading
import time
#第二種方法類的繼承
class MyThread(threading.Thread):
def __init__(self,num):
super().__init__()
self.num = num
def run(self):
print('running on num %s' % (self.num))
print('start time : %s'%(time.time()))
time.sleep(3)
print('end time : %s'%(time.time()))
if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
2、在一個程序下開啟多個線程與在一個程序下開啟多個子程序的差別
誰的開啟速度快? 開啟線程的速度快
from threading import Thread
from multiprocessing import Process
def talk(name):
print ('%s is talking' %name)
if __name__ == '__main__':
t = Thread(target=talk,args=('dragon',))
t.start()
print ('主線程')
'''
輸出:
dragon is talking
主線程
'''
p = Process(target=talk,args=('dragon',))
p.start()
print('主線程')
'''
輸出:
主線程
dragon is talking
'''
看一看pid? 線程的pid和目前程序一樣,程序的pid各自獨立
from threading import Thread
from multiprocessing import Process
import os
def talk():
print ('%s is talking' %os.getpid())
if __name__ == '__main__':
t = Thread(target=talk)
t1 = Thread(target=talk)
t.start()
t1.start()
print('主線程pid: %s' %os.getpid())
'''
21592 is talking
21592 is talking
主線程pid: 21592
'''
p = Process(target=talk)
p1 = Process(target=talk)
p.start()
p1.start()
print('主線程pid: %s' % os.getpid())
'''
主線程pid: 21724
21460 is talking
20496 is talking
'''
from socket import *
from threading import Thread
def server(ip,port):
s = socket(AF_INET,SOCK_STREAM)
s.bind((ip,port))
s.listen(5)
while True: #連結循環變成多線程
conn,addr = s.accept()
t = Thread(target=talk,args=(conn,addr))
t.start()
def talk(conn,addr):
'''通信循環'''
try:
while True:
res = conn.recv(1024)
if not res :break
print('client %s:%s msg:%s' % (addr[0], addr[1], res))
conn.send(res.upper())
except Exception:
pass
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',9999)
多線程服務端
#!/usr/bin/python
# -*- coding:utf-8 -*-
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',9999))
while True:
msg=input('>>: ').strip()
if not msg:continue
c.send(msg.encode('utf-8'))
res=c.recv(1024)
print(res.decode('utf-8'))
用戶端
例子:
三個任務,一個接收使用者輸入,一個将使用者輸入的内容格式化成大寫,一個将格式化後的結果存入檔案
from threading import Thread
msg_list = []
format_list = []
def talk():
while True:
user_input = input('-> : ').strip()
if len(user_input) == 0 : continue
msg_list.append(user_input)
def format():
while True:
if msg_list:
res = msg_list.pop()
res = res.upper()
format_list.append(res)
def save():
while True:
if format_list:
res = format_list.pop()
with open('db.txt','a',encoding='utf-8') as f :
f.write('%s\n' %(res))
if __name__ == '__main__':
t1 = Thread(target=talk)
t2 = Thread(target=format)
t3 = Thread(target=save)
t1.start()
t2.start()
t3.start()
多線程例子
3、線程的join和setdaemon
from threading import Thread
import threading
import time
def talk():
time.sleep(1)
print ('%s is talk' %threading.current_thread().getName())
if __name__ == '__main__':
t = Thread(target=talk)
t.setDaemon(True) #設定守護線程的守護程序
t.start()
t.join()
print ('主線程')
print (t.is_alive())
4、線程的其他用法
thread執行個體對象的方法
isAlive(): 傳回線程是否活動的。
getName(): 傳回線程名。
setName(): 設定線程名。
threading子產品提供的一些方法:
threading.currentThread(): 傳回目前的線程變量。
threading.enumerate(): 傳回一個包含正在運作的線程的list。正在運作指線程啟動後、結束前,不包括啟動前和終止後的線程。
threading.activeCount(): 傳回正在運作的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
import threading
def work():
import time
time.sleep(3)
print(threading.current_thread().getName())
if __name__ == '__main__':
#在主程序下開啟線程
t=Thread(target=work)
t.start()
print(threading.current_thread().getName()) #主線程的名字
print(threading.current_thread()) #主線程
print(threading.enumerate()) #連同主線程在内有兩個運作的線程
print(threading.active_count())
print('主線程/主程序')
'''
MainThread
<_MainThread(MainThread, started 23516)>
[<_MainThread(MainThread, started 23516)>, <Thread(Thread-1, started 26208)>]
2
主線程/主程序
Thread-1
'''
5、python解釋器GIL
在Cpython解釋器中,同一個程序下開啟的多線程,同一時刻隻能有一個線程執行,無法利用多核優勢,如果不加GIL鎖,解釋器級别如垃圾回收,就會出現混亂,由于同一份資源多個線程間的競争
首先需要明确的一點是
GIL
并不是Python的特性,它是在實作Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(文法)标準,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下預設的Python執行環境。是以在很多人的概念裡CPython就是Python,也就想當然的把
GIL
歸結為Python語言的缺陷。是以這裡要先明确一點:GIL并不是Python的特性,Python完全可以不依賴于GIL
參考連接配接:http://www.dabeaz.com/python/UnderstandingGIL.pdf 這篇文章透徹的剖析了GIL對python多線程的影響
cpu到底是用來做計算的,還是用來做I/O的?
1. 多cpu,意味着可以有多個核并行完成計算,是以多核提升的是計算性能
2. 每個cpu一旦遇到I/O阻塞,仍然需要等待,是以多核對I/O操作沒什麼用處
一個勞工相當于cpu,此時計算相當于勞工在幹活,I/O阻塞相當于為勞工幹活提供所需原材料的過程,勞工幹活的過程中如果沒有原材料了,則勞工幹活的過程需要停止,直到等待原材料的到來。
如果你的工廠幹的大多數任務都要有準備原材料的過程(I/O密集型),那麼你有再多的勞工,意義也不大,還不如一個人,在等材料的過程中讓勞工去幹别的活,
反過來講,如果你的工廠原材料都齊全,那當然是勞工越多,效率越高
結論:
對計算來說,cpu越多越好,但是對于I/O來說,再多的cpu也沒用
當然對于一個程式來說,不會是純計算或者純I/O,我們隻能相對的去看一個程式到底是計算密集型還是I/O密集型,進而進一步分析python的多線程有無用武之地
分析:
我們有四個任務需要處理,處理方式肯定是要玩出并發的效果,解決方案可以是:
方案一:開啟四個程序
方案二:一個程序下,開啟四個線程
單核情況下,分析結果:
如果四個任務是計算密集型,沒有多核來并行計算,方案一徒增了建立程序的開銷,方案二勝
如果四個任務是I/O密集型,方案一建立程序的開銷大,且程序的切換速度遠不如線程,方案二勝
多核情況下,分析結果:
如果四個任務是計算密集型,多核意味着并行計算,在python中一個程序中同一時刻隻有一個線程執行用不上多核,方案一勝
如果四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝
結論:現在的計算機基本上都是多核,python對于計算密集型的任務開多線程的效率并不能帶來多大性能上的提升,甚至不如串行(沒有大量切換),但是,對于IO密集型的任務效率還是有顯著提升的。
測試:計算密集型,多程序勝出
#計算密集型
from threading import Thread
from multiprocessing import Process
import time
def work():
res=0
for i in range(1000000):
res+=i
if __name__ == '__main__':
t_l=[]
start_time=time.time()
for i in range(100):
#t=Thread(target=work) #在我的機器上,8核cpu,多線程 7.210500001907349s
t=Process(target=work) #在我的機器上,8核cpu,多程序 3.642500162124634s
t_l.append(t)
t.start()
for i in t_l:
i.join()
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
print('主線程')
測試:io密集型,多線程勝出
from threading import Thread
from multiprocessing import Process
import time
import os
def work():
time.sleep(2) #模拟I/O操作,可以打開一個檔案來測試I/O,與sleep是一個效果
# print(os.getpid())
if __name__ == '__main__':
t_l=[]
start_time=time.time()
for i in range(500):
#t=Thread(target=work) #2.0440001487731934s 耗時
t=Process(target=work) #11.970999956130981s 耗時
t_l.append(t)
t.start()
for t in t_l:
t.join()
stop_time=time.time()
print ('run time is %s' %(stop_time-start_time))
多線程用于IO密集型,如socket,爬蟲,web
多程序用于計算密集型,如金融分析
6、同步鎖
import time
import threading
def addNum():
global num #在每個線程中都擷取這個全局變量
#num-=1
temp=num
time.sleep(0.1)
num =temp-1 # 對此公共變量進行-1操作
num = 100 #設定一個共享變量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有線程執行完畢
t.join()
print('Result: ', num)
以上是不加鎖的情況,傳回值會是99,同時或者num為100進行-1的動作
7、互斥鎖
不加鎖的情況下,會導緻同一份資源互相搶占
n = 100
def work():
global n
temp = n
time.sleep(0.001)
n = temp-1
if __name__ == '__main__':
t_l=[]
for i in range(100):
t=Thread(target=work)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print(n)
'''
結果不為0
'''
加上互斥鎖,犧牲了性能,保護了資料安全
from threading import Thread,Lock
import time
n=100
def work():
with mutex1:
global n
temp=n
time.sleep(0.01)
n=temp-1
if __name__ == '__main__':
mutex1=Lock()
t_l=[]
for i in range(100):
t=Thread(target=work)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print(n)
'''
結果為0
'''
8、GIL和LOCK的差別
鎖的目的是為了保護共享的資料,同一時間隻能有一個線程來修改共享的資料
然後,我們可以得出結論:保護不同的資料就應該加不同的鎖。
GIL 與Lock是兩把鎖,保護的資料不一樣,前者是解釋器級别的(當然保護的就是解釋器級别的資料,比如垃圾回收的資料),後者是保護使用者自己開發的應用程式的資料,很明顯GIL不負責這件事,隻能使用者自定義加鎖處理,即Lock
python的gil詳解:
因為Python解釋器幫你自動定期進行記憶體回收,你可以了解為python解釋器裡有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些記憶體資料是可以被清空的,此時你自己的程式 裡的線程和 py解釋器自己的線程是并發運作的,假設你的線程删除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的記憶體空間指派了,結果就有可能新指派的資料被删除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運作時,其它人都不能動,這樣就解決了上述的問題, 這可以說是Python早期版本的遺留問題。
9、死鎖和遞歸鎖
死鎖: 是指兩個或兩個以上的程序或線程在執行過程中,因争奪資源而造成的一種互相等待的現象,若無外力作用,它們都将無法推進下去。此時稱系統處于死鎖狀态或系統産生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,如下就是死鎖
#!/usr/bin/python
# -*- coding:utf-8 -*-
from threading import Thread,Lock,RLock
import time
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutex.acquire()
print('\033[45m %s 拿到A鎖\033[0m' %(self.name))
mutex1.acquire()
print('\033[45m %s 拿到B鎖\033[0m' % (self.name))
mutex1.release()
mutex.release()
def f2(self):
mutex1.acquire()
time.sleep(1)
print('\033[45m %s 拿到B鎖\033[0m' %(self.name))
mutex.acquire()
print('\033[45m %s 拿到A鎖\033[0m' % (self.name))
mutex.release()
mutex1.release()
if __name__ == '__main__':
mutex = Lock() #定義鎖對象
mutex1 = Lock()
#mutex = mutex1 = RLock() #定義遞歸鎖對象,遞歸鎖跟計數器一個原理,隻有計數器為0的時候,其他線程才能進行加鎖
'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
Thread-1 拿到B鎖
卡主死鎖了,線程1拿到B鎖在等A鎖釋放,線程2拿到A鎖在等B鎖釋放,兩個鎖進入互相等待的情況,産生了死鎖
'''
遞歸鎖:
遞歸鎖,在Python中為了支援在同一線程中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock内部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,進而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:
mutex1=mutex2=threading.RLock() #一個線程拿到鎖,counter加1,該線程内又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都隻能等待,等待該線程釋放所有鎖,即counter遞減到0為止
10、信号量Semahpore
信号量類似一個令牌池
Semaphore管理一個内置的計數器,
每當調用acquire()時内置計數器-1;
調用release() 時内置計數器+1;
計數器不能小于0;當計數器為0時,acquire()将阻塞線程直到其他線程調用release()。
from threading import Thread,Semaphore
import time
def work(id):
with sem:
time.sleep(2)
print('%s say hello' %(id))
if __name__ == '__main__':
sem = Semaphore(5)
for i in range(20):
t = Thread(target=work,args=(i,))
t.start()
與程序池是完全不同的概念,程序池Pool(4),最大隻能産生4個程序,而且從頭到尾都隻是這四個程序,不會産生新的,而信号量是産生一堆線程/程序,每次隻能執行4個,其他的都在阻塞狀态
11、事件event
線程的一個關鍵特性是每個線程都是獨立運作且狀态不可預測。如果程式中的其 他線程需要通過判斷某個線程的狀态來确定自己下一步的操作,這時線程同步問題就 會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設定的信号标志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信号标志被設定為假。如果有線程等待一個Event對象, 而這個Event對象的标志為假,那麼這個線程将會被一直阻塞直至該标志為真。一個線程如果将一個Event對象的信号标志設定為真,它将喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設定為真的Event對象,那麼它将忽略這個事件, 繼續執行
參數介紹:
event.isSet():傳回event的狀态值;
event.wait():如果 event.isSet()==False将阻塞線程;
event.set(): 設定event的狀态值為True,所有阻塞池的線程激活進入就緒狀态, 等待作業系統排程;
event.clear():恢複event的狀态值為False。
例子:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import threading
from threading import Thread,Event
import time
def conn_mysql(): #一堆執行程式線程
print('%s is waiting' %(threading.current_thread().getName()))
e.wait()
print('%s is starting' % (threading.current_thread().getName()))
time.sleep(2)
def check_mysql(): #一個檢查mysql狀态的線程
print('%s is checking' % (threading.current_thread().getName()))
time.sleep(5)
e.set()
if __name__ == '__main__':
e = Event()
t1 = Thread(target=conn_mysql)
t2 = Thread(target=conn_mysql)
t3 = Thread(target=conn_mysql)
t4 = Thread(target=check_mysql)
t1.start()
t2.start()
t3.start()
t4.start()
'''
輸出:
Thread-1 is waiting
Thread-2 is waiting
Thread-3 is waiting
Thread-4 is checking
Thread-1 is starting
Thread-2 is starting
Thread-3 is starting
'''
可以考慮一種應用場景(僅僅作為說明),例如,我們有多個線程從Redis隊列中讀取資料來處理,這些線程都要嘗試去連接配接Redis的服務,一般情況下,如果Redis連接配接不成功,在各個線程的代碼中,都會去嘗試重新連接配接。如果我們想要在啟動時確定Redis服務正常,才讓那些工作線程去連接配接Redis伺服器,那麼我們就可以采用threading.Event機制來協調各個工作線程的連接配接操作:主線程中會去嘗試連接配接Redis服務,如果正常的話,觸發事件,各工作線程會嘗試連接配接Redis服務。
import threading
import time
import logging
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
def worker(event):
logging.debug('Waiting for redis ready...')
event.wait()
logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
time.sleep(1)
def main():
readis_ready = threading.Event()
t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
t1.start()
t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
t2.start()
logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
time.sleep(3) # simulate the check progress
readis_ready.set()
if __name__=="__main__":
main()
'''
輸出:
(t1 ) Waiting for redis ready...
(t2 ) Waiting for redis ready...
(MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
(t1 ) redis ready, and connect to redis server and do some work [Mon Jul 3 19:17:51 2017]
(t2 ) redis ready, and connect to redis server and do some work [Mon Jul 3 19:17:51 2017]
'''
redis示例
threading.Event的wait方法還接受一個逾時參數,預設情況下如果事件一緻沒有發生,wait方法會一直阻塞下去,而加入這個逾時參數之後,如果阻塞時間超過這個參數設定的值之後,wait方法會傳回。對應于上面的應用場景,如果Redis伺服器一緻沒有啟動,我們希望子線程能夠列印一些日志來不斷地提醒我們目前沒有一個可以連接配接的Redis服務,我們就可以通過設定這個逾時參數來達成這樣的目的:
from threading import Event,Thread
import threading
import time
def conn_mysql():
count = 0
while not e.is_set():
print('%s 第 <%s> 次嘗試' % (threading.current_thread().getName(), count))
count += 1
e.wait(0.5)
print('%s ready to conn mysql' % threading.current_thread().getName())
time.sleep(1)
def check_mysql():
print('%s checking...' %threading.current_thread().getName())
time.sleep(4)
e.set()
if __name__ == '__main__':
e=Event()
c1=Thread(target=conn_mysql)
c2=Thread(target=conn_mysql)
c3=Thread(target=conn_mysql)
c4=Thread(target=check_mysql)
c1.start()
c2.start()
加上重試mysql
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
while not event.is_set():
print('\033[42m%s 等待連接配接mysql。。。\033[0m' %threading.current_thread().getName())
event.wait(0.1)
print('\033[42mMysql初始化成功,%s開始連接配接。。。\033[0m' %threading.current_thread().getName())
def check_mysql():
print('\033[41m正在檢查mysql。。。\033[0m')
time.sleep(random.randint(1,3))
event.set()
time.sleep(random.randint(1,3))
if __name__ == '__main__':
event=Event()
t1=Thread(target=conn_mysql)
t2=Thread(target=conn_mysql)
t3=Thread(target=check_mysql)
t1.start()
t2.start()
t3.start()
修改之後的mysql
應用:連接配接池
12、condition(條件)
使得線程等待,隻有滿足某條件時,才釋放n個線程
import threading
import time
lock = threading.Condition()
def task(arg):
time.sleep(1)
lock.acquire()
lock.wait()
print('程序%s'%(arg))
lock.release()
for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()
while True:
user_input = input('--->').strip()
if user_input == 'q':break
lock.acquire()
lock.notify(int(user_input))
lock.release()
def condition_func():
ret = False
inp = input('>>>')
if inp == '1':
ret = True
return ret
def run(n):
con.acquire()
con.wait_for(condition_func)
print("run the thread: %s" %n)
con.release()
if __name__ == '__main__':
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
另外一種寫法
13、線程queue
queue隊列 :使用import queue,用法與程序Queue一樣
import queue
q = queue.Queue(3) #先進先出 --> 隊列
q.put('first')
q.put('second')
q.put((1,2,3,4))
print(q.get())
print(q.get())
print(q.get())
q = queue.LifoQueue() #先進後出 -> 堆棧
q.put('first')
q.put('second')
q.put((1,2,3,4))
print(q.get())
print(q.get())
print(q.get())
q = queue.PriorityQueue() #按照優先級輸出,數字越小優先級越高
q.put((10,'first'))
q.put((3,'second'))
q.put((5,(1,2,3,4)))
print(q.get())
print(q.get())
print(q.get())
二、協程
協程:是單線程下的并發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種使用者态的輕量級線程,即協程是由使用者程式自己控制排程的。
需要強調的是:
1. python的線程屬于核心級别的,即由作業系統控制排程(如單線程一旦遇到io就被迫交出cpu執行權限,切換其他線程運作)
2. 單線程内開啟協程,一旦遇到io,從應用程式級别(而非作業系統)控制切換
對比作業系統控制線程的切換,使用者在單線程内控制協程的切換,優點如下:
1. 協程的切換開銷更小,屬于程式級别的切換,作業系統完全感覺不到,因而更加輕量級
2. 單線程内就可以實作并發的效果,最大限度地利用cpu
要實作協程,關鍵在于使用者程式自己控制程式切換,切換之前必須由使用者程式自己儲存協程上一次調用時的狀态,如此,每次重新調用時,能夠從上次的位置繼續執行
(詳細的:協程擁有自己的寄存器上下文和棧。協程排程切換時,将寄存器上下文和棧儲存到其他地方,在切回來的時候,恢複先前儲存的寄存器上下文和棧)
之前已經學過一個在單線程下可以儲存程式的運作狀态,即yield:
1.yiled可以儲存狀态,yield的狀态儲存與作業系統的儲存線程狀态很像,但是yield是代碼級别控制的,更輕量級
2.send可以把一個函數的結果傳給另外一個函數,以此實作單線程内程式之間的切換
#!/usr/bin/python
# -*- coding:utf-8 -*-
import time
def consumer(item):
x =1
y =2
c = 3
d =4
a = 'sadsaddsdadasd'
pass
def producer(target,seq):
for item in seq:
target(item)
start_time = time.time()
producer(consumer,range(10000000))
stop_time = time.time()
print ('cost time is %s' %(stop_time-start_time)) #1.9440789222717285 s
def consumer():
x =1
y =2
c = 3
d =4
a = 'sadsaddsdadasd'
while True:
item = yield
def producer(target,seq):
for item in seq:
target.send(item)
g = consumer()
next(g)
start_time = time.time()
producer(g,range(10000000))
stop_time = time.time()
print ('cost time is %s' %(stop_time-start_time)) #1.7285699844360352
#用yield協程函數,執行的快的原因,不用yield需要不斷的開辟記憶體,垃圾回收, 而yield隻開辟一次記憶體位址,節省了這一塊的時間
'''
cost time is 1.9440789222717285
cost time is 1.7285699844360352
'''
缺點:
協程的本質是單線程下,無法利用多核,可以是一個程式開啟多個程序,每個程序内開啟多個線程,每個線程内開啟協程
協程指的是單個線程,因而一旦協程出現阻塞,将會阻塞整個線程
協程的定義(滿足1,2,3就可稱為協程):
-
- 必須在隻有一個單線程裡實作并發
- 修改共享資料不需加鎖
- 使用者程式裡自己儲存多個控制流的上下文棧
- 附加:一個協程遇到IO操作自動切換到其它協程(如何實作檢測IO,yield、greenlet都無法實作,就用到了gevent子產品(select機制))
yield切換在沒有io的情況下或者沒有重複開辟記憶體空間的操作,對效率沒有什麼提升,甚至更慢,為此,可以用greenlet來為大家示範這種切換
三、greenlet子產品
greenlet是一個用C實作的協程子產品,相比與python自帶的yield,它可以使你在任意函數之間随意切換,而不需把這個函數先聲明為generator
#!/usr/bin/python
# -*- coding:utf-8 -*-
from greenlet import greenlet
def test1():
print('test1,first')
gr2.switch()
print('test1,sencod')
gr2.switch()
def test2():
print('test2,first')
gr1.switch()
print('test2,sencod')
gr1=greenlet(test1)
gr2=greenlet(test2)
gr1.switch()
import time
from greenlet import greenlet
def eat(name):
print('%s eat food 1' %name)
gr2.switch('alex飛飛飛')
print('%s eat food 2' %name)
gr2.switch()
def play_phone(name):
print('%s play 1' %name)
gr1.switch()
print('%s play 2' %name)
gr1=greenlet(eat)
gr2=greenlet(play_phone)
gr1.switch(name='egon啦啦啦')#可以在第一次switch時傳入參數,以後都不需要
'''
egon啦啦啦 eat food 1
alex飛飛飛 play 1
egon啦啦啦 eat food 2
alex飛飛飛 play 2
'''
第一次傳入可以加參數
單純的切換(在沒有io的情況下或者沒有重複開辟記憶體空間的操作),反而會降低程式的執行速度
#順序執行
import time
def f1():
res=0
for i in range(10000000):
res+=i
def f2():
res=0
for i in range(10000000):
res*=i
start_time=time.time()
f1()
f2()
stop_time=time.time()
print('run time is: %s' %(stop_time-start_time)) #1.7395639419555664
#切換
from greenlet import greenlet
import time
def f1():
res=0
for i in range(10000000):
res+=i
gr2.switch()
def f2():
res=0
for i in range(10000000):
res*=i
gr1.switch()
gr1=greenlet(f1)
gr2=greenlet(f2)
start_time=time.time()
gr1.switch()
stop_time=time.time()
print('run time is: %s' %(stop_time-start_time)) #7.789067983627319
greenlet隻是提供了一種比generator更加便捷的切換方式,仍然是沒有解決遇到IO自動切換的問題
四、gevent子產品
Gevent 是一個第三方庫,可以輕松通過gevent實作并發同步或異步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充子產品形式接入Python的輕量級協程。 Greenlet全部運作在主程式作業系統程序的内部,但它們被協作式地排程。
g1=gevent.spawn()建立一個協程對象g1,
spawn括号内第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的
#!/usr/bin/python
# -*- coding:utf-8 -*-
from gevent import monkey;monkey.patch_all() #這是gevent給其他子產品打更新檔,不然别的子產品使用gevent就變成串行
import gevent
import time
def eat(name):
print('%s is eat first' %(name))
time.sleep(4) #等飯來
#gevent.sleep(1)
print('%s is eat second' %(name))
def play(name):
print('%s is play 1' %(name))
time.sleep(3) #模拟網卡了
#gevent.sleep(2)
print('%s is play 2' %(name))
if __name__ == '__main__':
g1= gevent.spawn(eat,'alex')
g2= gevent.spawn(play,name='alex')
# 因而也需要join方法,程序或現場的jion方法隻能join一個,而gevent的join方法可以join多個
g1.join()
g2.join()
print ('主線程')
上例gevent.sleep模拟的是gevent可以識别的io阻塞,而time.sleep或其他的阻塞,gevent是不能直接識别的需要用下面一行代碼,打更新檔,就可以識别了
from gevent import monkey;monkey.patch_all()必須放到被打更新檔者的前面,如time,socket子產品之前
或者我們幹脆記憶成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到檔案的開頭
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play_phone():
print('play phone 1')
time.sleep(1)
print('play phone 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('主')
用第三方的time
同步或者異步
import gevent
def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(0.5)
print('Task %s done' % pid)
def synchronous():
for i in range(1,10):
task(i)
def asynchronous():
threads = [gevent.spawn(task, i) for i in range(10)]
gevent.joinall(threads)
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
'''
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
'''
上面程式的重要部分是将task函數封裝到Greenlet内部線程的
gevent.spawn
。 初始化的greenlet清單存放在數組
threads
中,此數組被傳給
gevent.joinall
函數,後者阻塞目前流程,并執行所有給定的greenlet。執行流程隻會在 所有greenlet執行完後才會繼續向下走。
#gevent線程的一些用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)
g2=gevent.spawn(func2)
g1.join() #等待g1結束
g2.join() #等待g2結束
#或者上述兩步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的傳回值
#!/usr/bin/python
# -*- coding:utf-8 -*-
from gevent import monkey;monkey.patch_all()
import gevent
import time
import requests
def get_page(url):
print ('get page %s' %url)
response = requests.get(url)
if response.status_code == 200:
print (response.text)
start_time = time.time()
# get_page('http://www.baidu.com')
# get_page('http://www.python.org')
# get_page('http://www.yahoo.com')
g1 = gevent.spawn(get_page,url='http://www.baidu.com')
g2 = gevent.spawn(get_page,url='http://www.python.org')
g3 = gevent.spawn(get_page,url='http://www.yahoo.com')
gevent.joinall([
g1,
g2,
g3
])
stop_time = time.time()
print ('run time is %s ' %(stop_time-start_time))
協程應用-爬蟲
通過gevent實作單線程下的socket并發(from gevent import monkey;monkey.patch_all()一定要放到導入socket子產品之前,否則gevent無法識别socket的阻塞)
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打更新檔,可以用gevent自帶的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr)
def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',8080)
單線程實作并發服務端
from threading import Thread
from socket import *
import threading
def client(server_ip,port):
c=socket(AF_INET,SOCK_STREAM)
c.connect((server_ip,port))
count=0
while True:
c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
msg=c.recv(1024)
print(msg.decode('utf-8'))
count+=1
if __name__ == '__main__':
for i in range(500):
t=Thread(target=client,args=('127.0.0.1',8080))
t.start()
多線程并發多個client
五、socketserver實作并發
基于tcp的套接字,關鍵就是兩個循環,一個連結循環,一個通信循環socketserver子產品中分兩大類:server類(解決連結問題)和request類(解決通信問題)
server類:
request類:
繼承類關系:
線程繼承關系
程序的繼承關系
run方法繼承關系
以下述代碼為例,分析socketserver源碼:
ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()
查找屬性的順序:ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer
-
- 執行個體化得到ftpserver,先找類ThreadingTCPServer的__init__,在TCPServer中找到,進而執行server_bind,server_active
- 找ftpserver下的serve_forever,在BaseServer中找到,進而執行self._handle_request_noblock(),該方法同樣是在BaseServer中
- 執行self._handle_request_noblock()進而執行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),然後執行self.process_request(request, client_address)
- 在ThreadingMixIn中找到process_request,開啟多線程應對并發,進而執行process_request_thread,執行self.finish_request(request, client_address)
- 上述四部分完成了連結循環,本部分開始進入處理通訊部分,在BaseServer中找到finish_request,觸發我們自己定義的類的執行個體化,去找__init__方法,而我們自己定義的類沒有該方法,則去它的父類也就是BaseRequestHandler中找....
源碼分析總結:
基于tcp的socketserver我們自己定義的類中的
-
- self.server即套接字對象
- self.request即一個連結
- self.client_address即用戶端位址
基于udp的socketserver我們自己定義的類中的
-
- self.request是一個元組(第一個元素是用戶端發來的資料,第二部分是服務端的udp套接字對象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
- self.request 是在UDPServer類下的get_request函數
- data, client_addr = self.socket.recvfrom(self.max_packet_size)
#!/usr/bin/python
# -*- coding:utf-8 -*-
import socketserver
#MyHandler(conn, client_address, s)
class MyHandler(socketserver.BaseRequestHandler): #通訊循環
def handle(self):
while True:
res=self.request.recv(1024)
print('client %s msg:%s' %(self.client_address,res))
self.request.send(res.upper())
if __name__ == '__main__':
s=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyHandler)
s.serve_forever() #連結循環
基于udp的socketserver
#!/usr/bin/python
# -*- coding:utf-8 -*-
import socketserver
class MyUDPhandler(socketserver.BaseRequestHandler):
def handle(self):
client_msg,s=self.request
s.sendto(client_msg.upper(),self.client_address)
if __name__ == '__main__':
s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler)
s.serve_forever()
socketserver 例子
from SocketServer import TCPServer, ThreadingMixIn, StreamRequestHandler
#定義支援多線程的服務類,注意是多繼承
class Server(ThreadingMixIn, TCPServer): pass
#定義請求處理類
class Handler(StreamRequestHandler):
def handle(self):
addr = self.request.getpeername()
print 'Got connection from ',addr
self.wfile.write('Thank you for connection')
server = Server(('', 1234), Handler) #執行個體化服務類
server.serve_forever() #開啟服務