天天看點

multiprocess子產品使用程序池時往Pool對象傳入的函數不巧當導緻程序運作速度跟單線程速度一樣

python 中

multiprocessing.pool

子產品裡有

Pool

類用于程序池管理,

Pool

執行個體中一些比較重要的方法:

  • apply_async(func, args, kwargs, callback)
    • 往程序池異步送出任務,即任務送出到程序池後可以不用等任務結果可以繼續往下運作;與其對立的是任務送出後,送出任務的程序會被阻塞直到任務被執行完傳回結果
    • 參數func是必須的,args用于提供給 func的參數,args類型為

      元組

      , kwargs 也是提供給 func函數的參數, callback 任務執行完後立刻執行的回調函數,callback函數禁止執行任何阻塞操作
  • map_async(func, iterable, chunksize, callback)
    • 參數 chunksize 、 callback 是可選的
    • 将可疊代對象iterable的每一個元素作為參數傳遞給func并異步地執行函數,傳回結果是

      AsyncResult

      類的執行個體
    • 如果提供 callback 參數,當傳回結果變得可用時立即執行 callback 函數
  • close( )
    • 關閉程序池防止進一步操作,關閉後将不再接受任務
  • join( )
    • 等待所有工作程序退出,join( ) 隻能在close( )方法或terminate( )方法調用後再調用

往程序池裡扔任務裡調用

apply_async()

方法,該方法第一個參數傳遞的是函數名,如果傳入一個函數,函數将不會并發地執行;比如有個無參函數名字為

doMatch

,但往

apply_async()

方法傳遞的第一個參數如果是

doMatch()

,那麼送出到程序池的任務将不會并發執行

如果往

apply_async()

方法傳遞的是函數而不是函數名,送出的任務将不會被并發執行,并且傳回結果也是錯誤的

以下是未正确送出任務到程序池的代碼

import ctypes
import datetime
import random

import time

from multiprocessing import Queue
from multiprocessing.pool import Pool

from multiprocessing import Value

def doMatch():
    s = "0123456789abcdefghijklmnopqrstuvwxyz"
    index = random.randint(0, len(s) - 4)
    time.sleep(1)
    return s[index:index + 2]


if __name__ == '__main__':
    param = "0123456789abcdefghijklmnopqrstuvwxyz"
    val = Value(ctypes.c_uint32, 1)
    index = 1
    queue = Queue()

    # 測試單程序執行 doMatch() 函數30次需要的時間
    start = datetime.datetime.now()
    for i in range(30):
        print("single process:\t{}".format(doMatch()))
    finish = datetime.datetime.now()
    print("\n單線程執行花費的時間為:\t{}".format(finish - start))

    # 測試9個程序的程序池執行 doMatch() 函數30次需要的時間
    reslist = []
    pool1 = Pool(9)
    start = datetime.datetime.now()
    for i in range(30):
        res = pool1.apply_async(doMatch())
        # e = entry(param)
        # res = pool1.apply_async(e.exec, callback=e.display)
        reslist.append(res)
    pool1.close()
    pool1.join()
    finish = datetime.datetime.now()
    # print("result = \n{}".format(reslist))
    for res in reslist:
        print(res)

    print("9個程序的程序池執行花費的時間為:\t{}".format(finish - start))

    # 測試3組3個程序的程序池執行 doMatch() 函數30次需要的時間
    pool2 = Pool(3)
    pool3 = Pool(3)
    pool4 = Pool(3)
    pools = [pool2, pool3, pool4]

    index = 1

    reslist = []
    start = datetime.datetime.now()
    for j in range(len(pools)):
        for i in range(10):
            # e = entry(param)
            # pools[j].apply_async(e.exec)
            asyncresult = pools[j].apply_async(doMatch())
            reslist.append(asyncresult)

        pools[j].close()
    for j in range(len(pools)):
        pools[j].join()
    finish = datetime.datetime.now()
    print("3個3個程序的程序池執行花費的時間為:\t{}".format(finish - start))
    for result in reslist:
        print(result)           

運作結果:

multiprocess子產品使用程式池時往Pool對象傳入的函數不巧當導緻程式運作速度跟單線程速度一樣

修改傳遞給

apply_async()

的func參數為函數名再執行的結果:

multiprocess子產品使用程式池時往Pool對象傳入的函數不巧當導緻程式運作速度跟單線程速度一樣
multiprocess子產品使用程式池時往Pool對象傳入的函數不巧當導緻程式運作速度跟單線程速度一樣
multiprocess子產品使用程式池時往Pool對象傳入的函數不巧當導緻程式運作速度跟單線程速度一樣

總結 :

往程序池裡送出任務時需要送出要執行的函數名,不能直接向函數調用那樣直接送出這個函數及參數;

如果送出的任務需要額外帶參數, apply_async() 的第二個參數用于接收送出任務所需要的參數,是以可以傳遞兩個參數給 apply_async() 方法,第二個參數的類型要求為元組;

apply_async() 方法傳回的結果為 AsyncResult 的執行個體,可以使用 get() 方法擷取異步執行的結果