一、并發和并行
1、多任務
多任務的概念
簡單的說,就事作業系統可以同時運作多個任務
CPU與多任務的關系:
單核CPU可不可以多任務?
也可以執行多任務,由于CPU執行代碼都是順序執行的,那麼單核CPU是怎麼執行多任務的呢?
答案就是作業系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,在切換到任務3,執行0.01秒.......這樣反複下去,表面上看,每個任務都是交替執行的,但是,由于CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。
真正的并行執行多任務隻能在多核CPU上實作,但是由于任務數量遠遠多于CPU的核心數量,是以,作業系統也會自動把多任務輪流排程到每個核心上執行。
2、并發和并行
并發: 指的是任務數多于CPU核數,通過作業系統的各種任務排程算法,實作用多任務“一起”執行(實際上總有一些任務不在執行,因為切換任務的速度相當快,看上去一起在執行而已)
并行: 指的是任務數小于等于CPU核數,即任務真的在一起執行的
并發
并行
3、同步和異步
同步(同步協調): 是指線程在通路某一資源時,獲得了資源的傳回結果之後才會執行其他操作,(先做某件事,再做某件事)
異步: 與同步相對,是指線程再通路某一資源時,無論是否取得傳回結果,都進行下一步操作;當有了資源傳回結果時,系統自會通知線程。
二、線程
問題:
目前有兩件事情,做事情1需要5秒,做事情2需要6秒
def func1():
for i in range(5):
print("-----正再做事情1------")
time.sleep(1)
def func2():
for i in range(6):
print("-----正再做事情2------")
time.sleep(1)
單任務
先做事情一
在做事情二
多任務
兩個事情同時做
怎麼才能同時做呢?
多線程執行
1、Threadting子產品介紹
Python的_thread子產品是比較底層的子產品,Python的threading子產品是怼thread子產品做了一層包裝的,可以更加友善的使用
建立多線程:threading.Thread(target=func1)
參數target指定線程執行的任務(函數)
Thread類提供了以下方法
run() :用以表示線程活動的方法
start() :啟動線程活動
join(timeout) :等待子線程的方法,預設等待子線程執行結束
lsAlive() :傳回線程是否活動的
getName() :傳回線程名
setName() :設定線程名
threading提供的以下方法
threading.current_thread() :傳回目前執行的線程
threading.enumerate() :傳回正在運作的所有線程(list)
threading.active_count() :傳回正在運作的線程數量
2、多線線程實作多任務
利用threading子產品實作
import threading
import time
def fun_1():
for i in range(5):
print("線程{}:{}".format(threading.current_thread(), i))
time.sleep(1)
def fun_2():
for i in range(6):
print("線程{}:{}".format(threading.current_thread(), i))
time.sleep(1)
def main():
# 建立線程1 執行函數1
t1 = threading.Thread(target=fun_1, name="th_1")
# 建立線程2 執行函數2
t2 = threading.Thread(target=fun_2, name="th_2")
s_time = time.time()
# fun_1()
# fun_2()
# 設定名字
t1.setName("TH_1")
print(t1.getName())
# 開始
t1.start()
t2.start()
# 檢視線程集合,和線程數
print(threading.enumerate())
print(threading.active_count())
# 等待子線程執行完畢
t1.join()
t2.join()
e_time = time.time()
print("時間:", e_time - s_time)
if __name__ == '__main__':
main()
重寫run方法實作多線程
線程啟動start方法就是運作了run方法
通過繼承Thread類,重寫run方法,來創見線程
建立線程時不需要在傳任務函數
如果任務函數需要參數,需要重寫init方法,并傳回父類init方法
import threading
import time
import requests
# 計算時間裝飾器
def add_time(fun):
def inner(*args):
start_time = time.time()
fun(*args)
end_time = time.time()
print("---------------線程:{},執行時間:{}-----------------".format(threading.current_thread(), end_time - start_time))
return end_time - start_time
return inner
# 重寫run方法建立多線程
class RequestThread(threading.Thread):
def __init__(self, url):
self.url = url
super().__init__()
@add_time
def run(self):
for i in range(100):
res = requests.get(self.url).status_code
print("線程{}得執行結果為:{}".format(threading.current_thread(), res))
@add_time
def main():
t_list = [] # 線程清單
# 建立線程
for i in range(10):
t = RequestThread("http://httpbin.org/post")
t_list.append(t)
# 開啟線程
for th in t_list:
th.start()
# 等待線程結束
for th in t_list:
th.join()
if __name__ == '__main__':
total_time = main()
print("每個接口平均時間:", total_time / 1000)
3、多線程-共享全局變量
問題:1000000次的bug
兩個線程完成對全局變量的2百萬次修改
一個線程修改1百萬次
# 最後結果
# 線程2修改完a = 1269517
# 線程1修改完a = 1302318
# 最後修改完a = 1302318
可見增加了兩百萬次,a隻有一百多萬
因為多線程對全局變量的修改時不安全的
比如線程1擷取了a的值為10w,此時切換到線程2
線程2修改到20w去換到線程1
此時線程1對a的修改不是從20w開始,而是10w,是以會導緻a被重新覆寫了,導緻全局變量不安全的情況發生
4、同步&互斥鎖
上面的bug如何解決?
控制線程的執行,避免同時擷取資料
互斥鎖
線程同步能夠保證保證多個線程安全通路競争資源,最簡單的同步機制是引入互斥鎖
互斥鎖為資源設定一個狀态:鎖定/非鎖定
某個線程要更改共享資料時,先将其鎖定,此時資源的狀态為"鎖定",其他線程不能更改直到該線程釋放了資源,将資源狀态變成“非鎖定”,其它線程才可以去擷取鎖,然後再次鎖定該資源
互斥鎖保證了每次隻有一個線程進行寫入操作,進而保證了多線程情況下資料的正确性
threading子產品中定義了Lock類,可以友善的處理鎖定
建立鎖:lock = threading.Lock()
鎖定 : lock.acquire()
釋放鎖 :lock.release()
注意
如果這個鎖之前沒有上鎖的,那麼acquire不會阻塞
如果在調用acpuire對這個鎖上鎖之前,它被其他線程上了鎖,那麼此時acpuire會阻塞,直到這個鎖被其他線程解鎖為止
上鎖之後多線程執行肯定會變慢
上鎖,釋放鎖需要時間
上了鎖的代碼塊等于同步執行,必須執行完鎖内的代碼塊,并釋放了鎖,其他線程才有機會擷取鎖去執行。等于多線程鎖内的代碼是同步執行的,在隻考慮鎖内代碼塊的執行效率,甚至比單線程還差
案例
import threading
# 全局變量,利用多線程
# 對a進行+1 ,兩千萬次,一個線程加已一千萬次
import time
a = 0
def fun_1(lock):
global a
for i in range(1000000):
# 修改前上鎖
lock.acquire()
a += 1
# 修改完釋放鎖
lock.release()
print("線程1修改完a={}".format(a))
def fun_2(lock):
global a
for i in range(1000000):
# 修改前上鎖
lock.acquire()
a += 1
# 修改完釋放鎖
lock.release()
print("線程2修改完a={}".format(a))
def main():
# 建立鎖
lock = threading.Lock()
# 建立兩個線程,把鎖傳進任務上鎖
s_time = time.time()
t1 = threading.Thread(target=fun_1, args=(lock,))
t2 = threading.Thread(target=fun_2, args=(lock,))
t1.start()
t2.start()
# 等待子線程執行完畢
t1.join()
t2.join()
e_time = time.time()
print("最後修改完a={}".format(a))
print("最後修改完得時間:{}".format(e_time - s_time))
#
# 鎖後時間:
if __name__ == '__main__':
main()
"""
上鎖前:
線程2修改完a = 1269517
線程1修改完a = 1302318
最後修改完a = 1302318
時間:0.15441060066223145
上鎖後:
線程2修改完a=1976899
線程1修改完a=2000000
最後修改完a=2000000
時間:1.1480515003204346
"""
5、死鎖
如果多線程存在多把鎖
線程1擷取了鎖1,還要在擷取了鎖2才能執行
線程2擷取了鎖2,還要在擷取鎖1才能執行
此時,任務1就等着任務2去釋放了鎖2,才有機會執行
任務2就等着任務1釋放了鎖1,才有機會執行
最後導緻互相等對方釋放鎖,導緻死鎖
6、GIL全局解釋器鎖(擴充)
控制線程切換運作得鎖-解釋器預設得鎖
利用一個時間得門檻值去切換線程
或者遇到IO阻塞得時候去切換
多線程之間快速切換執行大大縮短時間就是因為任務在等待,但是CPU不會等待線程,它會去執行别得任務,多線程并發就是大大縮短了等待得時間
注意:
Python語言和GIL沒有半毛錢關系,僅僅是由于曆史原因在Cpython解釋器,難以移除GIL。
GIL:全局解釋器鎖,每個線程在執行得過程都需要先擷取GIL,保證同一時刻隻有一個線程可以執行代碼。
Python使用多線程隻能隻用一個CPU是因為,最一開始設計Python解釋器的人沒有想到多核的情況,後面代碼設計越多越多,導緻很難去修改了,一直到現在。
有人曾經删除過了GIL鎖,但是效果非常差,甚至不如單核;---參考文檔
Python使用多程序是可以利用多核CPU資源的
7、單線程和多線程
單線程和多線程到底誰更快
IO密集型多線程會比單線程快很多,充分省去了IO等待的時間
CPU密集型,計算非常多的這種任務,對CPU消耗非常大的,單線程理論上比多線程要快一點,但是計算不大的這種,單線程和多線程消耗的時間其實差不多的
三、列隊
Python的Queue子產品中提供了同步的,線程安全的隊列類,這些隊列都實作了鎖源語,能夠在多線程中直接使用,可以使用隊列來實作線程間的同步
FIFO :(先入先出)隊列Queue
LIFO :(後入先出)隊列LifoQueue
優先級隊列:PriorityQueue
1、Queue隊列的方法
from queue import Queue
q = Queue(3) # 建立隊列
# 操作方法就可以了
Queue.qsize() : 傳回目前隊列包含的消息數量
Queue.empty() : 隊列為空傳回True,反之False
Queue.full() : 隊列滿了傳回True,反之False
Queue.get(self,block=True,timeout=None) : 擷取隊列中的值
timeout (block=True) :隊列内為空,get值的等待時間,時間内等不到報錯,預設一直等
block : True:隊列為空等待擷取值,False:隊列為空報錯停止執行線程
Queue.put(self,block=True,timeout=None) : 寫入隊列
timeout(block=True) :隊列滿了寫入隊列值的等待時間,等待時間後還是滿的報錯,預設一直等待
block : True:隊列滿了等待寫入值,False:隊列滿了報錯停止執行線程
Queue.get_nowait() : 相當于Queue.get(False)
Queue.put_nowait() : 相當于Queue.put(False)
Queue.task_done() : 在完成一項工作之後,使用該方法,可以向隊列發送一個信号,表示該任務執行完畢
Queue.join() 實際上意味着等待隊列種多有的任務執行完之後,在往下執行,否者一直等待
任務執行完畢,意味着着收到了Queue.task_done()這個信号
如果用了join方法,沒有發送信号會一直等待
put了多少個資料,在get使用之後,就的發送多少個Queue.task_done() 使用完畢的信号,不然也會一直等待
如果發送的Queue.task_done()信号,比put的個數多,會報錯
2、LifoQueue隊列的方法
from queue import LifoQueue
q = LifoQueue(3) # 建立隊列
# 調用操作方法就可以了
繼承的Queue,和Queue的方法LifoQueue都有
唯一不同的就是先進入的後出來
内部重寫了Queue的幾個私有方法實作的先入後出,并沒有擴充新方法
3、PriorityQueue隊列的方法
from queue import PriorityQueue
q = PriorityQueue(3) # 建立隊列
# 調用操作方法就可以了
# 不同點,生産值的時候,需要傳元組
q.put((1,333)
繼承的Queue,和Queue的方法PriorityQueue都有
唯一不同的就是生産值的時候,需要傳元組
元組第一個元素,是優先級數值
get值的時候,優先get出數值最小的值
get來出來的也是元組
内部重寫了Queue的幾個私有方法實作的優先級,并沒有擴充新方法
4、生産者消費者模式實作
為什麼要使用生産者和消費模式
線上程世界裡,生産者是就是生産資料的線程;消費者就是消費資料的線程
如果生産者處理速度很快,而消費者處理速度很慢,那麼消費者必須等待消費者處理完,才能繼續生産資料,同理傳回來消費者消費的快,消費者必須等待生産
為了解決這個問題于是引入了生産和消費者模式
什麼是生産者和消費者模式
生産者消費者模式是通過一個容器來解決生産者和消費者的強偶問題
生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊
是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列中去,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力
小案例
# 1、用一個隊列來存儲商品
# 2、建立一個專門生産商品的線程類,當商品數量少于50時,開始生産商品,每次生産200個商品,沒生産完一輪 暫停1秒
# 3、建立一個專門消費商品的線程類,當商品數量大于10時就開始消費,,循環消費,每次消費3個。當商品執行個體少于10的時候,暫停2秒
# 4、建立一個線程生産商品,5個線程消費商品
import queue
import time
import threading
q = queue.Queue(250)
class ProGoods(threading.Thread):
"""生存商品"""
def run(self):
while True:
if q.qsize() < 50:
for j in range(200):
q.put("商品{}".format(str(j)))
time.sleep(1)
class ConGoods(threading.Thread):
"""消費商品"""
def run(self):
while True:
if q.qsize() > 10:
for j in range(3):
q.get()
print(q.qsize())
else:
time.sleep(2)
def main():
# 建立一個線程生産商品
pro_thread = ProGoods()
# 5個線程消費商品
con_thread_list = []
for i in range(5):
con_thread = ConGoods()
con_thread_list.append(con_thread)
# 開啟生存
pro_thread.start()
# 開啟消費
for con_thread in con_thread_list:
con_thread.start()
# 等待子線程
pro_thread.join()
for con_thread in con_thread_list:
con_thread.join()
print("最後剩餘商品:{}".format(q.qsize()))
if __name__ == '__main__':
main()
四、程序
1、程序介紹
什麼是程序
程式:例如xxx.py這是程式,是一個靜态的
程序:一個程式運作起來,代碼+用到的資源稱之為程序,它是作業系統配置設定資源的基本機關
不僅可以通過線程來完成多任務,程序也是可以的
程序的狀态
工作中,任務數往往大于CPU的核數,即一定有一些任務在等待CPU執行,是以有了不同的狀态
就緒狀态:運作的條件都已經滿足了,正在等待CPU執行
執行狀态:CPU正在執行其功能
等待狀态:等待某些條件滿足,例如一個程式sleep了,此時就處于等待狀态
2、程序、線程對比
功能
程序,能夠完成多任務,比如在一台電腦上能夠同時運作多個軟體
線程,能夠完成多任務,比如一個QQ中的多個聊天視窗
定義不同
程序是系統進行資源配置設定和排程的一個獨立機關
線程是程序的一個實體,是CPU排程和分派的基本機關,它是比程序更小的能獨立運作的基本機關,線程自己基本上不能擁有系統資源,隻擁有一點在運作中必不可少的資源,但是它可與同屬于一個程序的其他線程共享程序所擁有的全部資源
差別
一個程式至少擁有一個程序,一個程序至少擁有一個線程
線程的劃分适度小于程序(資源比程序小),使用多程序成功的并發性高
程序在執行過程中擁有獨立的記憶體單元,而多個線程共享記憶體,進而極大的提高了程式的運作效率
線程不能獨立執行,必須依存在程序中
可以将程序了解為一個工廠的流水線,而其中的線程就是這個流水線上的勞工
優缺點
線程個程序在使用上各有優缺點:線程執行開銷小,但不利于資源的管理和保護,而程序正相反
3、multiprocessing子產品
Process(group=None, target=None, name=None, args=(), kwargs={})
target: 給子程序傳遞執行的任務代碼
args:給target指定的函數傳遞參數,元組方式
kwargs:給target指定的函數傳遞參數,字段方式
name:給程序設定一個名字
group:指定程序組,大多數情況下用不到
Process執行個體的常用方法
start() :啟動子程序執行個體
is_alive() :判斷子程序是否還在活着
jion(timeout) :是否等待子程序執行結束,或者等待多少
terminate() :不管任務是否完成,立即終止子程序
Process執行個體對象常用的屬性
name :目前程序的名字
pid :目前程序的pid(程序号)
主程序中可以程序p.pid擷取
子程序中用os.getpid()擷取
4、程序之間的通信 -multiprocessing.Queue
程序 multiprocessing.Queue 和線程 queue.Queue的差別
方法一摸一樣
queue.Queue :是程序内多線程之間非阻塞隊列
multiprocessing.Queue :是跨程序通信隊列
1、程序中的Queue的使用
可以使用multiprocessing子產品中的Queue實作多程序之間的資料傳遞,Queue本身是一個消息隊列程式,首先用一個執行個體來示範一下Queue的工作原理:
在父程序中建立兩個子程序,
主往Queue中寫資料,兩個子程序在Queue中讀資料
注意程序之間的Queue要當作參數傳給任務函數(不共享全局變量)
import os
import time
import requests
from multiprocessing import Process, Queue
a = 0
def work1(q):
global a
while True:
a += 1
try:
url = q.get_nowait()
print("程序{},a ={},url ={}:".format(os.getpid(), a, url))
requests.get(url)
except queue.Empty:
break
def work2(q):
global a
while True:
a += 1
try:
url = q.get_nowait()
print("程序{},a ={},url ={}:".format(os.getpid(), a, url))
requests.get(url)
except queue.Empty:
break
if __name__ == '__main__':
q = Queue(100)
for i in range(10):
q.put("http://127.0.0.1:5000")
# 将隊列對象參數傳進任務函數
p1 = Process(target=work1, args=(q,))
p2 = Process(target=work2, args=(q,))
# 開啟
st = time.time()
p1.start()
p2.start()
# print(p1.name)
# print(p1.pid)
# 等待
p1.join()
p2.join()
et = time.time()
print("時間:", et - st)
5、程序池Pool
當需要建立的子程序數量過多的時候,我們可以利用程序池來建立
初始化Pool的時候,可以指定一個最大程序數,目前新的請求送出到Pool中中時,如果池中還沒有滿,那麼就會建立一個新的程序用來執行該請求,但是如果池中的程序數已經達到最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務
Pool常用的方法
apply_async(func,args=(),kwds={},callback=None,error_callback=None): 使用非阻塞的方式調用func(并行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序)
close() : 關閉程序池
terminate():不管子程序是否結束,立即終止
join() :主程序阻塞,等待子程序結束,必須在close或terminate之後使用
6、程序池中的隊列
multiprocessing子產品中Manager來建立隊列
q = Manager().Queue()
案例
import os
import queue
import time
import requests
from multiprocessing import Pool, Manager
a = 0
def work(q):
global a
while True:
a += 1
try:
url = q.get_nowait()
print("程序{},a ={},url ={}:".format(os.getpid(), a, url))
requests.get(url)
except queue.Empty:
break
if __name__ == '__main__':
# 程序池隊列
q = Manager().Queue(100)
for i in range(100):
q.put("http://127.0.0.1:5000")
# 程序池
p = Pool(5)
# 開啟5個程序消費
st = time.time()
for i in range(5):
p.apply_async(func=work, args=(q,))
# 關閉程序池
p.close()
# 等待子程序結束,必須在關閉程序池後使用
# 如果不等待,主程序結束,子程序也會跟着結束
p.join()
et = time.time()
print("時間:", et - st)
五 、協程
1、利用yield機制來實作多任務
内容回顧
什麼是生成器
生成器證明定義
案例
"""
生成器
生成器表達式
在函數中使用yield關鍵字:生成器
協程是通過生成器實作的多任務,在任務間不停的切換
"""
import time
def work1():
for i in range(10):
time.sleep(0.1)
print("---work1----{}".format(i))
yield
def work2():
for i in range(10):
time.sleep(0.1)
print("---work2----{}".format(i))
yield
# 建立兩個生成器對象,實作多任務
g1 = work1()
g2 = work2()
while True:
try:
next(g1)
next(g2)
except StopIteration:
break
# 協程:微線程
"""
協程的本質上是單任務
協程依賴于線程
協程相對與線程來說,占用的資源更少,幾乎不要占用什麼資源
"""
2、什麼是協程
協程是Python中另外一種實作多任務的方式,隻不過比線程更小的占用執行單元,為啥說它是一個執行單元?因為它自帶CPU上下文,這樣隻要在合适gr的時機,我們可以把一個協程切換到另一個協程,隻要這個過程中儲存或恢複CPU上下文那麼程式還是可以運作的
通俗的了解:在一個線程中的某個函數,可以在任何地方儲存目前函數的一些臨時變量資訊,然後切換到另外一個函數中執行,注意不是通過調用函數的方式做到的,并且切換的次數一級什麼時候在切換到原來的函數都由開發自己确定
3、協程和線程的差異
線程:在實作多任務時,切換從系統層面遠不止儲存和恢複CPU上下文這麼簡單,作業系統為了程式運作的高效性,每個線程都有自己緩存Chche等資料,作業系統還會幫你做這些資料的恢複操作,是以線程的切換比較耗性能
協程:協程的切換隻是單純的操作CPU的上下文,是以一秒切換個上百萬次都系統都抗的住
4、greenlet子產品
為了更好使用協程來完成多任務,Python中的greenlet子產品對其封裝,進而使得切換任務變得更加簡單
安裝方式使用
安裝: pip install greenlet
使用:建立多任務的greenlet對象
切換:switch()方法傳參和任務切換
import greenlet
def work1():
for i in range(10):
print("---work1----{}".format(i))
# 标記切換g2任務
g2.switch()
def work2():
for i in range(10):
print("---work2----{}".format(i))
# 标記切換g1任務
g1.switch()
# 建立兩個greenlet對象
g1 = greenlet.greenlet(work1)
g2 = greenlet.greenlet(work2)
# 切進任務1,有參數可以傳參數 *args, **kwargs
g1.switch()
5、gevent子產品
安裝:
pip install gevent
gevent.spawn(*args, **kwargs)
建立并開啟協程
第一個參數傳任務函數
其他參數依次傳入即可
gevent.sleep()
切換的标志
gevent裡等待方法
gevent().join()
線程等待協程的方法
import gevent
"""
協程:gevent,
對greenlet的再次封裝
協程存在于線程之中,線程預設不會等等待協程執行的
"""
def work1(a):
for i in range(10):
print("---work{}----{}".format(a, i))
# 切換的标志
gevent.sleep(0.001)
def work2(a):
for i in range(10):
print("---work{}----{}".format(a, i))
# 切換的标志
gevent.sleep(0.001)
# 建立兩個gevent對象
# 參數*args, **kwargs
# 預設就已經開啟執行了
g1 = gevent.spawn(work1, a=1)
g2 = gevent.spawn(work2, a=2)
# 線程等待協程執行完
g1.join()
g2.join()
gevent切換還是要主動用自己的等待标志才會切換,還不夠強大
gevent有一個更新檔可以智能的切換,在IO阻塞的時候自動切換
導入更新檔:from gevent import monkey
線程中調用:monkey.patch_all()
此時隻要有耗時操作就會自動切換
注意: 一個程序内調用一次monkey.patch_all()方法即可
多程序内每個子程序内調用,不能在主程序中調用
多線程主線程内調用,不能每個子線程都調用
from gevent import monkey
# 放在導包之前,會改系統的環境
# 不然會有警告
monkey.patch_all()
import time
import requests
import gevent
def work1(a):
for i in range(10):
res = requests.get("http://www.baidu.com").status_code
print("---work{}----{}的結果:{}".format(a, i, res))
def work2(a):
for i in range(10):
res = requests.get("http://www.baidu.com").status_code
print("---work{}----{}的結果:{}".format(a, i, res))
# 建立兩個gevent對象
# 參數*args, **kwargs
# 預設就已經開啟執行了
st = time.time()
g1 = gevent.spawn(work1, a=1)
g2 = gevent.spawn(work2, a=2)
# 線程等待協程執行完
g1.join()
g2.join()
# work1(1)
# work2(2)
et = time.time()
print("時間:", et - st)
# 兩個協程:時間: 0.4506347179412842
# 單線程 : 時間: 0.7228543758392334
協程的隊列
線程、程序、程序池的隊列都可以共享使用
六、總結
1、綜合練習
開啟多個程序,每個程序開啟多線程,每個線程開啟多個協程消費
import warnings
import time
import requests
import threading
import gevent
from gevent import monkey
from multiprocessing import Pool, Manager
warnings.filterwarnings("ignore")
# 10000個請求,開啟2個程序,每個程序中實作3個線程,每個線程中實作5個協程去處理 計算時間
def consume_request(q):
"""
取隊列的url請求
"""
a = 0
while q.qsize() > 0:
url = q.get()
requests.get(url)
a += 1
print("協程完成消費次數:",a)
def gevents(q):
"""
每個線程建立5個協程
"""
gev_list = []
for i in range(5):
gev = gevent.spawn(consume_request, q)
gev_list.append(gev)
for gev in gev_list:
gev.join()
def threads(pro_q):
"""
建立三個線程
"""
# 每個子程序調用一次協程更新檔
monkey.patch_all()
th_list = []
for i in range(3):
th = threading.Thread(target=gevents, args=(pro_q,))
th_list.append(th)
for th in th_list:
th.start()
for th in th_list:
th.join()
def create_process():
"""建立兩個程序"""
pro_q = Manager().Queue()
for i in range(1000):
pro_q.put("http://127.0.0.1:5000")
s_time = time.time()
pool = Pool(2)
for i in range(2):
pool.apply_async(func=threads, args=(pro_q,))
pool.close()
pool.join()
e_time = time.time()
print("耗時:", e_time - s_time)
if __name__ == '__main__':
create_process()
2、簡單總結
程序是資源配置設定的資源
線程是作業系統排程的機關
程序切換需要的資源最大,效率最低
線程切換需要的資源一般,效率一般(當然是了在不考慮GIL鎖的情況下)
協程切換任務資源很小,效率高
對程序、多線程根據CPU核數不一樣可能是并行,但是協程是在一個線程中,是以是并發
Python中的線程由于GIL鎖的存在,并不能實作并行