python 中
multiprocessing.pool
子產品裡有
Pool
類用于程序池管理,
Pool
執行個體中一些比較重要的方法:
- apply_async(func, args, kwargs, callback)
- 往程序池異步送出任務,即任務送出到程序池後可以不用等任務結果可以繼續往下運作;與其對立的是任務送出後,送出任務的程序會被阻塞直到任務被執行完傳回結果
- 參數func是必須的,args用于提供給 func的參數,args類型為
, kwargs 也是提供給 func函數的參數, callback 任務執行完後立刻執行的回調函數,callback函數禁止執行任何阻塞操作元組
- map_async(func, iterable, chunksize, callback)
- 參數 chunksize 、 callback 是可選的
- 将可疊代對象iterable的每一個元素作為參數傳遞給func并異步地執行函數,傳回結果是
類的執行個體AsyncResult
- 如果提供 callback 參數,當傳回結果變得可用時立即執行 callback 函數
- close( )
- 關閉程序池防止進一步操作,關閉後将不再接受任務
- join( )
- 等待所有工作程序退出,join( ) 隻能在close( )方法或terminate( )方法調用後再調用
往程序池裡扔任務裡調用
apply_async()
方法,該方法第一個參數傳遞的是函數名,如果傳入一個函數,函數将不會并發地執行;比如有個無參函數名字為
doMatch
,但往
apply_async()
方法傳遞的第一個參數如果是
doMatch()
,那麼送出到程序池的任務将不會并發執行
如果往
apply_async()
方法傳遞的是函數而不是函數名,送出的任務将不會被并發執行,并且傳回結果也是錯誤的
以下是未正确送出任務到程序池的代碼
import ctypes
import datetime
import random
import time
from multiprocessing import Queue
from multiprocessing.pool import Pool
from multiprocessing import Value
def doMatch():
s = "0123456789abcdefghijklmnopqrstuvwxyz"
index = random.randint(0, len(s) - 4)
time.sleep(1)
return s[index:index + 2]
if __name__ == '__main__':
param = "0123456789abcdefghijklmnopqrstuvwxyz"
val = Value(ctypes.c_uint32, 1)
index = 1
queue = Queue()
# 測試單程序執行 doMatch() 函數30次需要的時間
start = datetime.datetime.now()
for i in range(30):
print("single process:\t{}".format(doMatch()))
finish = datetime.datetime.now()
print("\n單線程執行花費的時間為:\t{}".format(finish - start))
# 測試9個程序的程序池執行 doMatch() 函數30次需要的時間
reslist = []
pool1 = Pool(9)
start = datetime.datetime.now()
for i in range(30):
res = pool1.apply_async(doMatch())
# e = entry(param)
# res = pool1.apply_async(e.exec, callback=e.display)
reslist.append(res)
pool1.close()
pool1.join()
finish = datetime.datetime.now()
# print("result = \n{}".format(reslist))
for res in reslist:
print(res)
print("9個程序的程序池執行花費的時間為:\t{}".format(finish - start))
# 測試3組3個程序的程序池執行 doMatch() 函數30次需要的時間
pool2 = Pool(3)
pool3 = Pool(3)
pool4 = Pool(3)
pools = [pool2, pool3, pool4]
index = 1
reslist = []
start = datetime.datetime.now()
for j in range(len(pools)):
for i in range(10):
# e = entry(param)
# pools[j].apply_async(e.exec)
asyncresult = pools[j].apply_async(doMatch())
reslist.append(asyncresult)
pools[j].close()
for j in range(len(pools)):
pools[j].join()
finish = datetime.datetime.now()
print("3個3個程序的程序池執行花費的時間為:\t{}".format(finish - start))
for result in reslist:
print(result)
運作結果:
修改傳遞給
apply_async()
的func參數為函數名再執行的結果:
總結 :
往程序池裡送出任務時需要送出要執行的函數名,不能直接向函數調用那樣直接送出這個函數及參數;
如果送出的任務需要額外帶參數, apply_async() 的第二個參數用于接收送出任務所需要的參數,是以可以傳遞兩個參數給 apply_async() 方法,第二個參數的類型要求為元組;
apply_async() 方法傳回的結果為 AsyncResult 的執行個體,可以使用 get() 方法擷取異步執行的結果