介紹
multiprocessing
是一個使用類似于
threading
子產品的API支援生成程序的包。該
multiprocessing
軟體包提供本地和遠端并發,通過使用子程序而不是線程有效地支援 全局解釋器鎖。multiprocessing子產品充分利用給定機器上的
多個處理器
。它可以在
Unix和Windows
上運作。
該multiprocessing子產品還引入了threading子產品中沒有模拟的API 。一個主要的例子是該 Pool對象提供了一種友善的方法,可以跨多個輸入值并行化函數的執行,跨過程配置設定輸入資料(資料并行)。以下示例示範了在子產品中定義此類函數的常見做法,以便子程序可以成功導入該子產品。這個資料并行的基本例子使用Pool
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
>>>[1, 4, 9]
#encoding:utf-8
# __author__ = 'donghao'
# __time__ = 2019/4/1 11:27
from multiprocessing import Pool
import time
import os
# 程序池
# 大量程序建立,使用pool的方法
def worker(msg):
start = time.time()
print('%s開始執行,程序号%d'%(msg,os.getpid()))
time.sleep(1)
end = time.time()
print('耗時%0.2f'%(end-start))
if __name__ == '__main__':
po = Pool(3)
for i in range(10):
po.apply_async(worker, (i,))
print('——tart____')
po.close() # 關閉程序池,關閉後不再接受新的請求
po.join() # 等待所有的子程序執行完成,必須放到close之後
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
close() 關閉pool,使其不在接受新的任務。
terminate() 結束工作程序,不在處理未完成的任務。
join() 主程序阻塞,等待子程序的退出, join方法要在close或terminate之後使用。
Process
在
multiprocessing
,通過建立
Process
對象然後調用其
start()
方法來生成程序。 Process 遵循API的
threading.Thread
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
顯示所涉及的各個程序ID
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('父程序:', os.getppid())
print('程序:', os.getpid())
def f(name):
info('函數 f')
print('我是', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('魯班七号',))
p.start()
p.join()
>>>
main line
module name: __main__
父程序: 1668
程序: 1368
函數 f
module name: __mp_main__
父程序: 1368
程序: 4644
我是 魯班七号
multiprocessing 支援程序之間的兩種通信
隊列
這個Queue是近乎克隆的queue.Queue。例如:
from multiprocessing import Process, Queue
def f(q):
q.put(['魯班七号', '妲己', '後裔'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints ['魯班七号', '妲己', '後裔']
p.join()
隊列是線程和程序安全的。
管道
from multiprocessing import Process, Pipe
def f(conn):
conn.send(['魯班七号', '妲己', '後裔'])
conn.close()
if __name__ == '__main__':
parent_conn,child_conn = Pipe()
p = Process(target=f,args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
parent_conn.close()
傳回的兩個連接配接對象Pipe()表示管道的兩端。每個連接配接對象都有
send()
和
recv()
方法(以及其他)。請注意,如果兩個程序(或線程)同時嘗試讀取或寫入管道的同一端,則管道中的資料可能會損壞。當然,同時使用管道的不同端的程序不存在損壞的風險
程序間的同步
multiprocessing
包含所有同步原語的等價物
threading
。例如,可以使用鎖來確定一次隻有一個程序列印到标準輸出:
from multiprocessing import Process, Lock
def f(l, i):
print('hello world', i)
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
不使用來自不同程序的鎖輸出容易被混淆。
程序間共享狀态
在進行并發程式設計時,通常最好盡量避免使用共享狀态。使用多個程序時尤其如此。
但是,如果您确實需要使用某些共享資料,那麼 multiprocessing提供了幾種方法。
共享記憶體
可以使用
Value
或 将資料存儲在共享存儲器映射中
Array
。例如,以下代碼
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
>>>
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
伺服器程序
Manager()
控制器傳回的管理器對象控制一個伺服器程序,該程序儲存Python對象并允許其他程序使用代理操作它們
通過傳回的經理Manager()将支援類型
list,dict,Namespace,Lock, RLock,Semaphore,BoundedSemaphore, Condition,Event,Barrier, Queue,Value和Array
例如
from multiprocessing import Process, Manager
def f(d, l, kills):
d['name'] = '程咬金'
d['slogan'] = '真男人,必須要有強健的肌肉,身體和精神'
d['裝備'] = None
l.reverse()
kills.append('後裔')
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
kills = manager.list(['達摩','魯班七号'])
p = Process(target=f, args=(d, l, kills))
p.start()
p.join()
print(d)
print(l)
print(kills)
>>>
{'name': '程咬金', 'slogan': '真男人,必須要有強健的肌肉,身體和精神', '裝備': None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
['達摩', '魯班七号', '後裔']
伺服器程序管理器比使用共享記憶體對象更靈活,因為它們可以支援任意對象類型。此外,單個管理器可以通過網絡由不同計算機上的程序共享。但是,它們比使用共享記憶體慢。
daemon程式
# 不加daemon屬性
import multiprocessing
import time
def worker(interval):
print("work start:{0}".format(time.ctime()))
time.sleep(interval)
print("work end:{0}".format(time.ctime()))
if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.start()
print("end!")
>>>
end!
work start:Mon Apr 1 16:08:40 2019
work end:Mon Apr 1 16:08:43 2019
#加上daemon屬性
import multiprocessing
import time
def worker(interval):
print("work start:{0}".format(time.ctime()))
time.sleep(interval)
print("work end:{0}".format(time.ctime()))
if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.daemon = True
p.start()
print("end!")
>>>
end!
注:因子程序設定了daemon屬性,主程序結束,它們就随着結束了。
Event用來實作程序間同步通信。
import multiprocessing
import time
def wait_for_event(e):
print("wait_for_event: starting")
e.wait()
print("wairt_for_event: e.is_set()->" + str(e.is_set()))
def wait_for_event_timeout(e, t):
print("wait_for_event_timeout:starting")
e.wait(t)
print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
if __name__ == "__main__":
e = multiprocessing.Event()
w1 = multiprocessing.Process(name = "block",
target = wait_for_event,
args = (e,))
w2 = multiprocessing.Process(name = "non-block",
target = wait_for_event_timeout,
args = (e, 1))
w1.start()
w2.start()
time.sleep(5)
e.set()
print("main: event is set")
>>>
wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True
檔案拷貝器:
#encoding:utf-8
# __author__ = 'donghao'
# __time__ = 2019/4/1 14:14
from multiprocessing import pool,Manager,Queue
import os,time
def mycopy(old_file_name, new_file_name, filename, queue):
f = open(old_file_name+'/' + filename,'rb')
content = f.read()
f.close()
w = open(new_file_name+'/' + filename,'wb')
w.write(content)
w.close()
queue.put(filename)
def main():
old_file_name = input('請輸入檔案名稱')
path = os.listdir(old_file_name)
length = len(path)
po = pool.Pool(5)
queue = Manager().Queue()
try:
new_file_name = old_file_name+'[副本]'
os.mkdir(new_file_name)
except:
pass
for filename in path:
po.apply_async(mycopy,args=(old_file_name, new_file_name, filename, queue))
po.close()
copy_file_nums = 0
while True:
filename = queue.get()
copy_file_nums += 1
print('\r 拷貝進度: %0.2f %%'%(copy_file_nums*100/length),end='')
if copy_file_nums >= length:
break
print('\n檔案拷貝成功!')
po.join()
if __name__ == '__main__':
main()