天天看點

Python程序池multiprocessing.Pool的用法

一、multiprocessing子產品

multiprocessing

子產品提供了一個

Process

類來代表一個程序對象,multiprocessing子產品像線程一樣管理程序,這個是multiprocessing的核心,它與threading很相似,對多核CPU的使用率會比threading好的多

看一下Process類的構造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})      

參數說明: 

group:程序所屬組(基本不用) 

target:表示調用對象

args:表示調用對象的位置參數元組

name:别名 

kwargs:表示調用對象的字典

示例:

import multiprocessing


def do(n):             # 參數n由args=(1,)傳入
    name = multiprocessing.current_process().name        # 擷取目前程序的名字
    print(name, 'starting')
    print("worker ", n)
    return


if __name__ == '__main__':
    numList = []
    for i in range(5):
        p = multiprocessing.Process(target=do, args=(i,))      # (i,)中加入","表示元祖
        numList.append(p)
        print(numList)
        p.start()                 # 用start()方法啟動程序,執行do()方法
        p.join()                  # 等待子程序結束以後再繼續往下運作,通常用于程序間的同步
        print("Process end.")      

運作結果:

[<Process(Process-1, initial)>]
Process-1 starting
worker  0
Process end.
[<Process(Process-1, stopped)>, <Process(Process-2, initial)>]
Process-2 starting
worker  1
Process end.
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, initial)>]
Process-3 starting
worker  2
Process end.
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, initial)>]
Process-4 starting
worker  3
Process end.
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, initial)>]
Process-5 starting
worker  4
Process end.      

通過列印numList可以看出目前程序結束後,再開始下一個程序

注意: 

在Windows上要想使用程序子產品,就必須把有關程序的代碼寫在目前.py檔案的if __name__ == ‘__main__’ :語句的下面,才能正常使用Windows下的程序子產品。Unix/Linux下則不需要

二、Pool類

 Pool類可以提供指定數量的程序供使用者調用,當有新的請求送出到Pool中時,如果池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求

下面介紹一下multiprocessing 子產品下的Pool類下的幾個方法:

1.apply()

函數原型:apply(func[, args=()[, kwds={}]])

該函數用于傳遞不定參數,同python中的apply函數一緻,主程序會被阻塞直到函數執行結束(不建議使用,并且3.x以後不再出現)

2.apply_async

函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

與apply用法一緻,但它是非阻塞的且支援結果傳回後進行回調

3.map()

函數原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與内置的map函數用法行為基本一緻,它會使程序阻塞直到結果傳回

注意:雖然第二個參數是一個疊代器,但在實際使用中,必須在整個隊列都就緒後,程式才會運作子程序

4.map_async()

函數原型:map_async(func, iterable[, chunksize[, callback]])

與map用法一緻,但是它是非阻塞的

5.close()

關閉程序池(pool),使其不再接受新的任務

6.terminal()

結束工作程序,不再處理未處理的任務

7.join()

主程序阻塞等待子程序的退出, join方法要在close或terminate之後使用

示例1--使用map()函數

import time
from multiprocessing import Pool


def run(fn):
    # fn: 函數參數是資料清單的一個元素
    time.sleep(1)
    print(fn * fn)


if __name__ == "__main__":
    testFL = [1, 2, 3, 4, 5, 6]
    print('shunxu:')  # 順序執行(也就是串行執行,單程序)
    s = time.time()
    for fn in testFL:
        run(fn)
    t1 = time.time()
    print("順序執行時間:", int(t1 - s))

    print('concurrent:')  # 建立多個程序,并行執行
    pool = Pool(3)  # 建立擁有3個程序數量的程序池
    # testFL:要處理的資料清單,run:處理testFL清單中資料的函數
    pool.map(run, testFL)
    pool.close()  # 關閉程序池,不再接受新的程序
    pool.join()  # 主程序阻塞等待子程序的退出
    t2 = time.time()
    print("并行執行時間:", int(t2 - t1))      
Python程式池multiprocessing.Pool的用法

1、map函數中testFL為可疊代對象--清單

2、當建立3個程序時,會一次列印出3個結果“1,4,9”,當當建立2個程序時,會一次列印出2個結果“1,4”,以此類推,當建立多餘6個程序時,會一次列印出所有結果

3、如果使用Pool(),不傳入參數,可以建立一個動态控制大小的程序池

Python程式池multiprocessing.Pool的用法
Python程式池multiprocessing.Pool的用法

從結果可以看出,并發執行的時間明顯比順序執行要快很多,但是程序是要耗資源的,是以平時工作中,程序數也不能開太大。 對Pool對象調用join()方法會等待所有子程序執行完畢,調用join()之前必須先調用close(),讓其不再接受新的Process了

示例2--使用map()_async函數

print('concurrent:')  # 建立多個程序,并行執行
    pool = Pool(3)  # 建立擁有3個程序數量的程序池
    # testFL:要處理的資料清單,run:處理testFL清單中資料的函數
    pool.map_async(run, testFL)
    pool.close()  # 關閉程序池,不再接受新的程序
    pool.join()  # 主程序阻塞等待子程序的退出
    t2 = time.time()
    print("并行執行時間:", int(t2 - t1))      
Python程式池multiprocessing.Pool的用法

從結果可以看出,map_async()和map()用時相同。目前還沒有看出兩者的差別,後面知道後再完善

示例3--使用apply()函數

print('concurrent:')  # 建立多個程序,并行執行
    pool = Pool(3)  # 建立擁有3個程序數量的程序池
    # testFL:要處理的資料清單,run:處理testFL清單中資料的函數
    for fn in testFL:
        pool.apply(run, (fn,))
    pool.close()  # 關閉程序池,不再接受新的程序
    pool.join()  # 主程序阻塞等待子程序的退出
    t2 = time.time()
    print("并行執行時間:", int(t2 - t1))      
Python程式池multiprocessing.Pool的用法

可見,使用apply()方法,并行執行和順序執行用時相同,經過試驗,程序數目增大也不會減少并行執行的時間

原因:以阻塞的形式産生程序任務,生成1個任務程序并等它執行完出池,第2個程序才會進池,主程序一直阻塞等待,每次隻執行1個程序任務

示例4--使用apply_async()函數

print('concurrent:')  # 建立多個程序,并行執行
    pool = Pool(3)  # 建立擁有3個程序數量的程序池
    # testFL:要處理的資料清單,run:處理testFL清單中資料的函數
    for fn in testFL:
        pool.apply_async(run, (fn,))
    pool.close()  # 關閉程序池,不再接受新的程序
    pool.join()  # 主程序阻塞等待子程序的退出
    t2 = time.time()
    print("并行執行時間:", int(t2 - t1))      
Python程式池multiprocessing.Pool的用法

可見,使用apply_async()方法,并行執行時間與使用map()、map_async()方法相同

注意:

map_async()和map()方法,第2個參數可以是清單也可以是元祖,如下圖:

Python程式池multiprocessing.Pool的用法

而使用apply()和apply_async()方法時,第2個參數隻能傳入元祖,傳入清單程序不會被執行,如下圖:

Python程式池multiprocessing.Pool的用法

三、apply_async()方法callback參數的用法

from multiprocessing import Pool
import time


def fun_01(i):
    time.sleep(2)
    print('start_time:', time.ctime())
    return i + 100


def fun_02(arg):
    print('end_time:', arg, time.ctime())


if __name__ == '__main__':
    pool = Pool(3)
    for i in range(4):
        pool.apply_async(func=fun_01, args=(i,), callback=fun_02)  # fun_02的入參為fun_01的傳回值
        # pool.apply_async(func=fun_01, args=(i,))
    pool.close()
    pool.join()
    print('done')      
start_time: Thu Nov 14 16:31:41 2019
end_time: 100 Thu Nov 14 16:31:41 2019
start_time: Thu Nov 14 16:31:41 2019
end_time: 101 Thu Nov 14 16:31:41 2019
start_time: Thu Nov 14 16:31:41 2019
end_time: 102 Thu Nov 14 16:31:41 2019
start_time: Thu Nov 14 16:31:43 2019
end_time: 103 Thu Nov 14 16:31:43 2019
done      

map_async()方法callback參數的用法與apply_async()相同

四、使用程序池并關注結果

import multiprocessing
import time


def func(msg):
    print('hello :', msg, time.ctime())
    time.sleep(2)
    print('end', time.ctime())
    return 'done' + msg


if __name__ == '__main__':
    pool = multiprocessing.Pool(2)
    result = []
    for i in range(3):
        msg = 'hello %s' % i
        result.append(pool.apply_async(func=func, args=(msg,)))

    pool.close()
    pool.join()

    for res in result:
        print('***:', res.get())             # get()函數得出每個傳回結果的值

    print('All end--')      
Python程式池multiprocessing.Pool的用法

五、多程序執行多個函數

使用apply_async()或者apply()方法,可以實作多程序執行多個方法

import multiprocessing
import time
import os


def Lee():
    print('\nRun task Lee--%s******ppid:%s' % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(5)
    end = time.time()
    print('Task Lee,runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())


def Marlon():
    print("\nRun task Marlon-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(10)
    end = time.time()
    print('Task Marlon runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())


def Allen():
    print("\nRun task Allen-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(15)
    end = time.time()
    print('Task Allen runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())


def Frank():
    print("\nRun task Frank-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(20)
    end = time.time()
    print('Task Frank runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())


if __name__ == '__main__':
    func_list = [Lee, Marlon, Allen, Frank]
    print('parent process id %s' % os.getpid())

    pool = multiprocessing.Pool(4)
    for func in func_list:
        pool.apply_async(func)

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()
    print('All subprocesses done.')      
parent process id 84172
Waiting for all subprocesses done...

Run task Lee--84868******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019

Run task Marlon-84252******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019

Run task Allen-85344******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019

Run task Frank-85116******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019
Task Lee,runs 5.00 seconds. ~~~~ Thu Nov 14 17:44:19 2019
Task Marlon runs 10.00 seconds. ~~~~ Thu Nov 14 17:44:24 2019
Task Allen runs 15.00 seconds. ~~~~ Thu Nov 14 17:44:29 2019
Task Frank runs 20.00 seconds. ~~~~ Thu Nov 14 17:44:34 2019
All subprocesses done.      

六、其他

1、擷取目前計算機的CPU數量

Python程式池multiprocessing.Pool的用法