閱讀目錄
1.程序以及狀态
2.程序的建立-multiprocessing
3.程序、線程對比
4.程序間通信-Queue
5.程序的建立-程序池Pool
6.案例:檔案夾拷貝器(多程序版)
一.程序以及狀态(五狀态模型)
1. 程序
程式:例如xxx.py這是程式,是一個靜态的
程序:一個程式運作起來後,代碼+用到的資源 稱之為程序,它是作業系統配置設定資源的基本單元。
不僅可以通過線程完成多任務,程序也是可以的
2. 程序的狀态
工作中,任務數往往大于cpu的核數,即一定有一些任務正在執行,而另外一些任務在等待cpu進行執行,是以導緻了有了不同的狀态
- 就緒态:運作的條件都已經慢去,正在等在cpu執行
- 執行态:cpu正在執行其功能
- 等待态:等待某些條件滿足,例如一個程式sleep了,此時就處于等待态
二.程序的建立-multiprocessing
multiprocessing子產品就是跨平台版本的多程序子產品,提供了一個Process類來代表一個程序對象,這個對象可以了解為是一個獨立的程序,可以執行另外的事情
1. 2個while循環一起執行
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
def run_proc():
"""子程序要執行的代碼"""
while True:
print("----2----")
time.sleep(1)
if __name__=='__main__':
p = Process(target=run_proc)
p.start()
while True:
print("----1----")
time.sleep(1)
說明
- 建立子程序時,隻需要傳入一個執行函數和函數的參數,建立一個Process執行個體,用start()方法啟動
2. 程序pid
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time
def run_proc():
"""子程序要執行的代碼"""
print('子程序運作中,pid=%d...' % os.getpid()) # os.getpid擷取目前程序的程序号
print('子程序将要結束...')
if __name__ == '__main__':
print('父程序pid: %d' % os.getpid()) # os.getpid擷取目前程序的程序号
p = Process(target=run_proc)
p.start()
3. Process文法結構如下:
Process([group [, target [, name [, args [, kwargs]]]]])
- target:如果傳遞了函數的引用,可以任務這個子程序就執行這裡的代碼
- args:給target指定的函數傳遞的參數,以元組的方式傳遞
- kwargs:給target指定的函數傳遞命名參數
- name:給程序設定一個名字,可以不設定
- group:指定程序組,大多數情況下用不到
Process建立的執行個體對象的常用方法:
- start():啟動子程序執行個體(建立子程序)
- is_alive():判斷程序子程序是否還在活着
- join([timeout]):是否等待子程序執行結束,或等待多少秒
- terminate():不管任務是否完成,立即終止子程序
Process建立的執行個體對象的常用屬性:
- name:目前程序的别名,預設為Process-N,N為從1開始遞增的整數
- pid:目前程序的pid(程序号)
4. 給子程序指定的函數傳遞參數
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
from time import sleep
def run_proc(name, age, **kwargs):
for i in range(10):
print('子程序運作中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
print(kwargs)
sleep(0.2)
if __name__=='__main__':
p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
p.start()
sleep(1) # 1秒中之後,立即結束子程序
p.terminate()
p.join()
運作結果:
子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}
5. 程序間不同享全局變量
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time
nums = [11, 22]
def work1():
"""子程序要執行的代碼"""
print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
for i in range(3):
nums.append(i)
time.sleep(1)
print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
def work2():
"""子程序要執行的代碼"""
print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))
if __name__ == '__main__':
p1 = Process(target=work1)
p1.start()
p1.join()
p2 = Process(target=work2)
p2.start()
運作結果:
in process1 pid=11349 ,nums=[11, 22]
in process1 pid=11349 ,nums=[11, 22, 0]
in process1 pid=11349 ,nums=[11, 22, 0, 1]
in process1 pid=11349 ,nums=[11, 22, 0, 1, 2]
in process2 pid=11350 ,nums=[11, 22]
三.程序、線程對比
功能
- 程序,能夠完成多任務,比如 在一台電腦上能夠同時運作多個QQ
- 線程,能夠完成多任務,比如 一個QQ中的多個聊天視窗
定義的不同
- 程序是系統進行資源配置設定和排程的一個獨立機關.
- 線程是程序的一個實體,是CPU排程和分派的基本機關,它是比程序更小的能獨立運作的基本機關.線程自己基本上不擁有系統資源,隻擁有一點在運作中必不可少的資源(如程式計數器,一組寄存器和棧),但是它可與同屬一個程序的其他的線程共享程序所擁有的全部資源.
差別
- 一個程式至少有一個程序,一個程序至少有一個線程.
- 線程的劃分尺度小于程序(資源比程序少),使得多線程程式的并發性高。
- 程序在執行過程中擁有獨立的記憶體單元,而多個線程共享記憶體,進而極大地提高了程式的運作效率
- 線程不能夠獨立執行,必須依存在程序中
- 可以将程序了解為工廠中的一條流水線,而其中的線程就是這個流水線上的勞工
優缺點
線程和程序在使用上各有優缺點:線程執行開銷小,但不利于資源的管理和保護;而程序正相反。
四.程序間通信-Queue
Process之間有時需要通信,作業系統提供了很多機制來實作程序間的通信。
1. Queue的使用
可以使用multiprocessing子產品的Queue實作多程序之間的資料傳遞,Queue本身是一個消息列隊程式,首先用一個小執行個體來示範一下Queue的工作原理:
#coding=utf-8
from multiprocessing import Queue
q=Queue(3) #初始化一個Queue對象,最多可接收三條put消息
q.put("消息1")
q.put("消息2")
print(q.full()) #False
q.put("消息3")
print(q.full()) #True
#因為消息列隊已滿下面的try都會抛出異常,第一個try會等待2秒後再抛出異常,第二個Try會立刻抛出異常
try:
q.put("消息4",True,2)
except:
print("消息列隊已滿,現有消息數量:%s"%q.qsize())
try:
q.put_nowait("消息4")
except:
print("消息列隊已滿,現有消息數量:%s"%q.qsize())
#推薦的方式,先判斷消息列隊是否已滿,再寫入
if not q.full():
q.put_nowait("消息4")
#讀取消息時,先判斷消息列隊是否為空,再讀取
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())
運作結果:
False
True
消息列隊已滿,現有消息數量:3
消息列隊已滿,現有消息數量:3
消息1
消息2
消息3
說明
初始化Queue()對象時(例如:q=Queue()),若括号中沒有指定最大可接收的消息數量,或數量為負值,那麼就代表可接受的消息數量沒有上限(直到記憶體的盡頭);
- Queue.qsize():傳回目前隊列包含的消息數量;
- Queue.empty():如果隊列為空,傳回True,反之False ;
- Queue.full():如果隊列滿了,傳回True,反之False;
- Queue.get([block[, timeout]]):擷取隊列中的一條消息,然後将其從列隊中移除,block預設值為True;
1)如果block使用預設值,且沒有設定timeout(機關秒),消息列隊如果為空,此時程式将被阻塞(停在讀取狀态),直到從消息列隊讀到消息為止,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何消息,則抛出"Queue.Empty"異常;
2)如果block值為False,消息列隊如果為空,則會立刻抛出"Queue.Empty"異常;
- Queue.get_nowait():相當Queue.get(False);
- Queue.put(item,[block[, timeout]]):将item消息寫入隊列,block預設值為True;
1)如果block使用預設值,且沒有設定timeout(機關秒),消息列隊如果已經沒有空間可寫入,此時程式将被阻塞(停在寫入狀态),直到從消息列隊騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒空間,則抛出"Queue.Full"異常;
2)如果block值為False,消息列隊如果沒有空間可寫入,則會立刻抛出"Queue.Full"異常;
- Queue.put_nowait(item):相當Queue.put(item, False);
2. Queue執行個體
我們以Queue為例,在父程序中建立兩個子程序,一個往Queue裡寫資料,一個從Queue裡讀資料:
from multiprocessing import Process, Queue
import os, time, random
# 寫資料程序執行的代碼:
def write(q):
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀資料程序執行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print('Get %s from queue.' % value)
time.sleep(random.random())
else:
break
if __name__=='__main__':
# 父程序建立Queue,并傳給各個子程序:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子程序pw,寫入:
pw.start()
# 等待pw結束:
pw.join()
# 啟動子程序pr,讀取:
pr.start()
pr.join()
# pr程序裡是死循環,無法等待其結束,隻能強行終止:
print('')
print('所有資料都寫入并且讀完')
運作結果:
五.程序池Pool
當需要建立的子程序數量不多時,可以直接利用multiprocessing中的Process動态成生多個程序,但如果是上百甚至上千個目标,手動的去建立程序的工作量巨大,此時就可以用到multiprocessing子產品提供的Pool方法。
初始化Pool時,可以指定一個最大程序數,當有新的請求送出到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務,請看下面的執行個體:
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("%s開始執行,程序号為%d" % (msg,os.getpid()))
# random.random()随機生成0~1之間的浮點數
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start))
po = Pool(3) # 定義一個程序池,最大程序數3
for i in range(0,10):
# Pool().apply_async(要調用的目标,(傳遞給目标的參數元祖,))
# 每次循環将會用空閑出來的子程序去調用目标
po.apply_async(worker,(i,))
print("----start----")
po.close() # 關閉程序池,關閉後po不再接收新的請求
po.join() # 等待po中所有子程序執行完成,必須放在close語句之後
print("-----end-----")
運作結果:
----start----
0開始執行,程序号為21466
1開始執行,程序号為21468
2開始執行,程序号為21467
0 執行完畢,耗時1.01
3開始執行,程序号為21466
2 執行完畢,耗時1.24
4開始執行,程序号為21467
3 執行完畢,耗時0.56
5開始執行,程序号為21466
1 執行完畢,耗時1.68
6開始執行,程序号為21468
4 執行完畢,耗時0.67
7開始執行,程序号為21467
5 執行完畢,耗時0.83
8開始執行,程序号為21466
6 執行完畢,耗時0.75
9開始執行,程序号為21468
7 執行完畢,耗時1.03
8 執行完畢,耗時1.05
9 執行完畢,耗時1.69
-----end-----
multiprocessing.Pool常用函數解析:
- apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(并行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序),args為傳遞給func的參數清單,kwds為傳遞給func的關鍵字參數清單;
- close():關閉Pool,使其不再接受新的任務;
- terminate():不管任務是否完成,立即終止;
- join():主程序阻塞,等待子程序的退出, 必須在close或terminate之後使用;
程序池中的Queue
如果要使用Pool建立程序,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤資訊:
RuntimeError: Queue objects should only be shared between processes through inheritance.
下面的執行個體示範了程序池中的程序如何通信:
# -*- coding:utf-8 -*-
# 修改import中的Queue為Manager
from multiprocessing import Manager,Pool
import os,time,random
def reader(q):
print("reader啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader從Queue擷取到消息:%s" % q.get(True))
def writer(q):
print("writer啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
for i in "itcast":
q.put(i)
if __name__=="__main__":
print("(%s) start" % os.getpid())
q = Manager().Queue() # 使用Manager中的Queue
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # 先讓上面的任務向Queue存入資料,然後再讓下面的任務開始從中取資料
po.apply_async(reader, (q,))
po.close()
po.join()
print("(%s) End" % os.getpid())
運作結果:
(11095) start
writer啟動(11097),父程序為(11095)
reader啟動(11098),父程序為(11095)
reader從Queue擷取到消息:i
reader從Queue擷取到消息:t
reader從Queue擷取到消息:c
reader從Queue擷取到消息:a
reader從Queue擷取到消息:s
reader從Queue擷取到消息:t
(11095) End
六.應用:檔案夾copy器(多程序版)
import multiprocessing
import os
import time
import random
def copy_file(queue, file_name,source_folder_name, dest_folder_name):
"""copy檔案到指定的路徑"""
f_read = open(source_folder_name + "/" + file_name, "rb")
f_write = open(dest_folder_name + "/" + file_name, "wb")
while True:
time.sleep(random.random())
content = f_read.read(1024)
if content:
f_write.write(content)
else:
break
f_read.close()
f_write.close()
# 發送已經拷貝完畢的檔案名字
queue.put(file_name)
def main():
# 擷取要複制的檔案夾
source_folder_name = input("請輸入要複制檔案夾名字:")
# 整理目标檔案夾
dest_folder_name = source_folder_name + "[副本]"
# 建立目标檔案夾
try:
os.mkdir(dest_folder_name)
except:
pass # 如果檔案夾已經存在,那麼建立會失敗
# 擷取這個檔案夾中所有的普通檔案名
file_names = os.listdir(source_folder_name)
# 建立Queue
queue = multiprocessing.Manager().Queue()
# 建立程序池
pool = multiprocessing.Pool(3)
for file_name in file_names:
# 向程序池中添加任務
pool.apply_async(copy_file, args=(queue, file_name, source_folder_name, dest_folder_name))
# 主程序顯示進度
pool.close()
all_file_num = len(file_names)
while True:
file_name = queue.get()
if file_name in file_names:
file_names.remove(file_name)
copy_rate = (all_file_num-len(file_names))*100/all_file_num
print("\r%.2f...(%s)" % (copy_rate, file_name) + " "*50, end="")
if copy_rate >= 100:
break
print()
if __name__ == "__main__":
main()