昨日内容回顧
python中啟動子程序
并發程式設計
并發 :多段程式看起來是同時運作的
ftp 網盤
不支援并發
socketserver 多程序 并發
異步
兩個程序 分别做不同的事情
建立新程序
join :阻塞 直到 子程序結束
守護程序 daemon :子(守護)程序随着主程序代碼的結束而結束
程序之間資料隔離
使用類來開啟一個程序 :自定義類 繼承Process 重寫run方法 傳參數需要重寫init
屬性 pid name
方法 terminate is_alive
作業講解:
socket聊天并發執行個體,使用原生socket的TCP協定,實作一個聊天的并發執行個體
先來一個簡單的,單線程
server.py
import socket
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()
conn,addr = sk.accept()
msg = conn.recv(1024).decode('utf-8')
conn.send((msg+'sb').encode('utf-8'))
print(msg)
conn.close()
client.py
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',9000))
inp = input('>>>').encode('utf-8')
sk.send(inp)
msg = sk.recv(1024).decode('utf-8')
print(msg)
sk.close()
先執行server.py,再執行client.py
輸出:
>>>111
111sb
如果要多次會話呢?加一個while循環
import socket
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()
conn,addr = sk.accept()
while True:
msg = conn.recv(1024).decode('utf-8')
conn.send((msg+'sb').encode('utf-8'))
print(msg)
conn.close()
sk.close()
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',9000))
while True:
inp = input('>>>').encode('utf-8')
sk.send(inp)
msg = sk.recv(1024).decode('utf-8')
print(msg)
sk.close()
但是這樣,隻能一個用戶端和伺服器連接配接,再來一個用戶端,就卡住了。
sk.accept()是阻塞的,那麼,如何改成異步呢?
将server裡面的recv和send放到一個函數裡面,使用多線程調用
import socket
from multiprocessing import Process
def chat(conn): # 聊天
while True:
msg = conn.recv(1024).decode('utf-8')
print(msg)
conn.send((msg + '_sb').encode('utf-8'))
conn.close()
if __name__ == '__main__':
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()
while True:
conn,addr = sk.accept()
Process(target=chat,args=[conn,]).start() # 異步
sk.close()
多開幾個用戶端,也沒問題
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLyIzM1cTNyMDNx0CM5EDNwQzM1ETMxUDM4EDMy0CM5ATM0MTMvwVNwgTMwIzLcBTOwEDNzEzLcd2bsJ2Lc12bj5ycn9Gbi52YugTMwIzcldWYtl2Lc9CX6MHc0RHaiojIsJye.png)
這就有點像接線員一樣,接到客戶打的電話,轉給相應的人處理
這樣,就有多個子程序,同時執行,實作多個用戶端同時連接配接
程序同步(multiprocess.Lock、Semaphore、Event)
鎖 —— multiprocess.Lock
通過剛剛的學習,我們千方百計實作了程式的異步,讓多個任務可以同時在幾個程序中并發處理,他們之間的運作沒有順序,一旦開啟也不受我們控制。盡管并發程式設計讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。
當多個程序使用同一份資料資源的時候,就會引發資料安全或順序混亂問題。
多程序搶占輸出資源
import os
import time
import random
from multiprocessing import Process
def work(n):
print('%s: %s is running' %(n,os.getpid()))
time.sleep(random.random())
print('%s:%s is done' %(n,os.getpid()))
if __name__ == '__main__':
for i in range(3):
p=Process(target=work,args=(i,))
p.start()
執行輸出:
1: 16548 is running
0: 1448 is running
2: 1096 is running
2:1096 is done
0:1448 is done
1:16548 is done
看輸出結果,都是亂的。
如果想有序的執行,先run,再done,怎麼辦?
需要用到鎖
import os
import time
import random
from multiprocessing import Lock
from multiprocessing import Process
def work(n,lock):
lock.acquire() #取得鎖
print('%s: %s is running' %(n,os.getpid()))
time.sleep(random.random())
print('%s:%s is done' %(n,os.getpid()))
lock.release() #釋放鎖
if __name__ == '__main__':
lock = Lock() #建立鎖
for i in range(5):
p=Process(target=work,args=(i,lock))
p.start()
0: 17468 is running
0:17468 is done
2: 16688 is running
2:16688 is done
1: 15984 is running
1:15984 is done
3: 15828 is running
3:15828 is done
4: 18156 is running
4:18156 is done
從結果上來看,結果就比較整齊了。先run,再done。結果是無序的
#建立鎖
mutex = threading.Lock()
#鎖定
mutex.acquire([timeout])#timeout是逾時時間
#釋放
mutex.release()
其中,鎖定方法acquire可以有一個逾時時間的可選參數timeout。如果設定了timeout,則在逾時後通過傳回值可以判斷是否得到了鎖,進而可以進行一些其他的處理。
acquire參數
鎖的理論
acquire 表示拿鎖
最先拿到鑰匙的,它會做幾件事情:
1.用鑰匙開門
2.把鑰匙帶進門裡
3.把門反鎖
看下圖
理論上來講,程序一般是異步的
但是加了鎖之後,就變成同步了
那麼誰先拿到鑰匙呢?滿足以下2個條件:
1.作業系統先響應的程序
2.當時沒有時間片輪詢,剛好就是它
lock.acquire()和lock.release()之間的代碼,表示被鎖住了。
看join效果
import os
import time
import random
from multiprocessing import Process
def work(n):
print('%s: %s is running' %(n,os.getpid()))
time.sleep(random.random())
print('%s:%s is done' %(n,os.getpid()))
if __name__ == '__main__':
for i in range(5):
p=Process(target=work,args=(i,))
p.start()
p.join()
0: 17348 is running
0:17348 is done
1: 18164 is running
1:18164 is done
2: 18160 is running
2:18160 is done
3: 17652 is running
3:17652 is done
4: 8340 is running
4:8340 is done
如果鎖加在程序的開始和結束,參考work函數,它是同步的
join也是同步的,但是,他們之間的差別在于
鎖執行時是無序的,join是有序的
由并發變成了串行,犧牲了運作效率,但避免了競争。
上面這種情況雖然使用加鎖的形式實作了順序的執行,但是程式又重新變成串行了,這樣确實會浪費了時間,卻保證了資料的安全。
總結:
同步控制:
隻要用到了鎖 鎖之間的代碼就會變成同步的
鎖 :控制一段代碼 同一時間 隻能被一個程序執行
搶票的例子:
比如12306,大家都經曆過
模拟資料庫,建立一個檔案ticket,内容如下:
{"count":1}
注意一定要用雙引号,不然json無法識别
主程式代碼如下:
import json
from multiprocessing import Process
def check_ticket(i):
with open('ticket') as f:
ticket_count = json.load(f)
print('person%s'%i,ticket_count['count'])
if __name__ == '__main__':
for i in range(5):
Process(target=check_ticket,args=(i,)).start()
person1 1
person0 1
person3 1
person2 1
person4 1
開始買票,買票的時候,可能有網絡延時
收到請求之後,從資料庫中讀取資料
當你發票還有餘票時,把票減少這件事情記錄下來
中間會經曆網絡延時
import json
import time
import random
from multiprocessing import Process
def buy_ticket(i): # 購票
with open('ticket') as f: # 讀取檔案
tick_count = json.load(f) # 反序列化
time.sleep(random.random()) # 讀取延時
if tick_count['count'] > 0: # 當餘票小于0時
print('person%s購票成功'%i)
tick_count['count'] -= 1 # 票數減1
else:
print('餘票不足,person%s購票失敗'%i)
time.sleep(random.random()) # 寫入延時
with open('ticket','w') as f:
json.dump(tick_count,f) # 寫入檔案
if __name__ == '__main__':
for i in range(5):
Process(target=buy_ticket,args=(i,)).start()
模拟這2次延時
是因為伺服器和資料庫不在一台機器上面,它們之間互動資料,必然有延時
上面的代碼隻模拟了讀取延時和寫入延時,沒有模拟請求延時
person3購票成功
person0購票成功
person4購票成功
person1購票成功
person2購票成功
從輸出結果上來看,庫存隻有1張票,但是5個人都購票成功了。這顯然是不合理的!
這就造成了資料不安全
怎麼解決呢?加鎖
import json
import time
import random
from multiprocessing import Process,Lock
def buy_ticket(i,lock): # 購票
lock.acquire() #取得鎖
with open('ticket') as f: # 讀取檔案
tick_count = json.load(f) # 反序列化
time.sleep(random.random()) # 讀取延時
if tick_count['count'] > 0: # 當餘票小于0時
print('person%s購票成功'%i)
tick_count['count'] -= 1 # 票數減1
else:
print('餘票不足,person%s購票失敗'%i)
time.sleep(random.random()) # 寫入延時
with open('ticket','w') as f:
json.dump(tick_count,f) # 寫入檔案
lock.release() #釋放鎖
if __name__ == '__main__':
lock = Lock() #建立鎖
for i in range(5): # 模拟5個使用者搶票
Process(target=buy_ticket,args=(i,lock)).start()
将檔案ticket的count數字改為1
{"count": 1}
執行程式輸出:
餘票不足,person1購票失敗
餘票不足,person2購票失敗
餘票不足,person3購票失敗
從結果上來看,是正确的。保證了資料的安全性。
查資料,不涉及資料安全,因為沒有修改。
#加鎖可以保證多個程序修改同一塊資料時,同一時間隻能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
雖然可以用檔案共享資料實作程序間通信,但問題是:
1.效率低(共享資料基于檔案,而檔案是硬碟上的資料)
2.需要自己加鎖處理
#是以我們最好找尋一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是mutiprocessing子產品為我們提供的基于消息的IPC通信機制:隊列和管道。
隊列和管道都是将資料存放于記憶體中
隊列又是基于(管道+鎖)實作的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該盡量避免使用共享資料,盡可能使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。
信号量 —— multiprocess.Semaphore(了解)
互斥鎖同時隻允許一個線程更改資料,而信号量Semaphore是同時允許一定數量的線程更改資料 。
假設商場裡有4個迷你唱吧,是以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實作:
信号量同步基于内部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信号量概念P()和V()的Python實作。信号量同步機制适用于通路像伺服器這樣的有限資源。
信号量與程序池的概念很像,但是要區分開,信号量涉及到加鎖的概念
信号量介紹Semaphore
上面講的鎖一個,現在鎖變成一串了,數量由你來控制
下面有一個小KTV,隻能容納4個人,第5個人,就沒有鑰匙了
先不用信号量
#KTV 4個人
import time
import random
from multiprocessing import Process,Semaphore
def ktv(i):
print('person %s 進來唱歌了'%i)
time.sleep(random.randint(1,5))
print('person %s 從ktv出去了'%i)
if __name__ == '__main__':
for i in range(6): # 模拟6個人
Process(target=ktv,args=(i,)).start()
person 0 進來唱歌了
person 1 進來唱歌了
person 2 進來唱歌了
person 3 進來唱歌了
person 4 進來唱歌了
person 5 進來唱歌了
person 3 從ktv出去了
person 4 從ktv出去了
person 2 從ktv出去了
person 1 從ktv出去了
person 0 從ktv出去了
person 5 從ktv出去了
結果是有問題的,6個人,都可以進去
使用信号量來實作
#KTV 4個人
import time
import random
from multiprocessing import Process,Semaphore
def ktv(i,sem):
sem.acquire() #取得鎖
print('person %s 進來唱歌了'%i)
time.sleep(random.randint(1,5))
print('person %s 從ktv出去了'%i)
sem.release() #釋放鎖
if __name__ == '__main__':
sem = Semaphore(4) #初始化信号量,數量為4
for i in range(6): # 模拟6個人
Process(target=ktv,args=(i,sem)).start()
在同一時間,最多有4個人進去
acquire()是一個阻塞行為
信号量和鎖有點類似
那麼它們之間的差別在于:
信号量,相當于計數器
它是鎖+計數器
調用acquire() 計數器-1
當計數器到 0 時,再調用 acquire() 就會阻塞,直到其他線程來調用release()
調用release() 計數器+1
事件 —— multiprocess.Event(了解)
python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。
clear:将“Flag”設定為False
set:将“Flag”設定為True
事件介紹
著名的計算機模型:紅綠燈
車 比方是一個程序,wait() 等紅燈
根據狀态變化,wait遇到true信号,就非阻塞
遇到False,就阻塞
交通燈 也是有一個程序 紅燈->False 綠燈->True
這裡沒有黃燈
事件有幾個方法:
wait的方法 根據一個狀态來決定自己是否要阻塞
狀态相關的方法
set 将狀态改為True
clear 将狀态改為False
is_set 判斷目前的狀态是否為True
先來看幾行代碼
from multiprocessing import Event
e = Event() #建立一個事件的對象
print(e.is_set()) # 在事件的創世之初,狀态為False
執行輸出: False
在看一個列子
from multiprocessing import Event
e = Event() #建立一個事件的對象
print(e.is_set()) # 在事件的創世之初,狀态為False
e.wait()
print('1')
執行輸出: False,然後程式一緻卡着
為啥呢?因為狀态值為 False,那麼當程式執行 event.wait 方法時就會阻塞
再來
from multiprocessing import Event
e = Event() #建立一個事件的對象
print(e.is_set()) # 在事件的創世之初,狀态為False
e.set() # 将狀态設定為True
e.wait()
print(e.is_set()) # 檢視狀态
print('1')
False
True
1
很快就輸出了,說明沒有阻塞
wait是否阻塞,取決于目前的狀态
模拟紅綠燈
import time
def traffic_light():
while True:
print('紅燈亮')
time.sleep(2)
print('綠燈亮')
time.sleep(2)
break
traffic_light()
紅燈亮
綠燈亮
為了好看一點,加點顔色
import time
def traffic_light():
while True:
print('\033[1;31m紅燈亮\033[0m')
time.sleep(2)
print('\033[1;32m綠燈亮\033[0m')
time.sleep(2)
break
traffic_light()
輸出挺專業的哈
下面來建立車
import time
from multiprocessing import Process,Event
def traffic_light():
while True:
print('\033[1;31m紅燈亮\033[0m')
time.sleep(2)
print('\033[1;32m綠燈亮\033[0m')
time.sleep(2)
break
def car(i):
print('car%s通過路口'%i)
if __name__ == '__main__':
for i in range(1,6): # 建立5輛車
Process(target=car,args=(i,)).start()
car1通過路口
car2通過路口
car3通過路口
car5通過路口
car4通過路口
現在還沒有紅綠燈,加一個紅綠燈程序
import time
from multiprocessing import Process,Event
def traffic_light():
while True:
print('\033[1;31m紅燈亮\033[0m')
time.sleep(2)
print('\033[1;32m綠燈亮\033[0m')
time.sleep(2)
break
def car(i):
print('car%s通過路口'%i)
if __name__ == '__main__':
Process(target=traffic_light).start()
for i in range(1,6):
Process(target=car,args=(i,)).start()
發現5輛車都闖紅燈了...
加一個事件:
import time
import random
from multiprocessing import Process,Event
def traffic_light(e,count): # 交通燈
while True:
while True:
# 事件在建立的時候,e的狀态是False,相當于程式中的紅燈
print('\033[1;31m紅燈亮\033[0m')
time.sleep(2) # 紅燈亮2秒
# 這裡e.is_set()是False,是以not e.is_set()就是True
if not e.is_set():e.set() # 判斷為True時,變綠燈
print('\033[1;32m綠燈亮\033[0m')
time.sleep(2) # 綠燈亮2秒
# 這裡e.is_set()是True
if e.is_set():e.clear() # 判斷為True時,将狀态設定為False
count += 1 # 自增1
if count == 5: # 判斷為5時,跳出内層循環
break
break #跳出内層循環
def car(i,e): # 汽車,感覺狀态的變化
if not e.is_set(): # 目前這個事件狀态是False
print('car%s正在等待'%i) # 這輛車正在等待通過路口
e.wait() # 阻塞,直到有一個e.set行為。正在等紅燈
print('car%s通過路口'%i) # 等待狀态為True,才能通過
if __name__ == '__main__':
e = Event() # 建立事件,預設狀态為False
Process(target=traffic_light,args=(e,0)).start() # 啟動紅燈程序
for i in range(1,6): # 模拟5個人
time.sleep(random.randrange(0,5,2)) # 大于等于0且小于5之間的奇數
Process(target=car,args=(i,e)).start() # 啟動交通燈程序
從結果上來看,挺完美的。
每輛車獨立占用一個程序,交通燈也是一個程序,之前學的程序是隔離的,那麼car程序為什麼能感覺到?
是因為程序之間用了socket網絡通信
内部有一套通訊機制
注意:e.is_set() 狀态不是固定的,每隔2秒,會變化一次。
控制信号燈的子程序、事件對象e、car子程序,都依賴于狀态
wait隻判斷True和False
也就是紅燈停,綠燈行
等待2秒之後,就切換一個狀态
相當于執行了下面的代碼:
flag = False
while True:
if flag is True:
flag = False
else:
flag = True
time.sleep(2)
程序間通信——隊列和管道(multiprocess.Queue、multiprocess.Pipe)
程序間通信
IPC(Inter-Process Communication)
IPC說的就是程序之間的通信,它隻是縮寫而已
隊列
概念介紹
之前學的queue和現在要提到的queue之間的差別
import queue
它能維護一個先進先出的秩序,它不能進行IPC
from multiprocessing import Queue,Process
能維護一個先進先出的秩序,也能進行IPC
建立共享的程序隊列,Queue是多程序安全的隊列,可以使用Queue實作多程序之間的資料傳遞。
遵循先進先出原則
Queue([maxsize])
建立共享的程序隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實作。
Queue([maxsize])
建立共享的程序隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實作。另外,還需要運作支援線程以便隊列中的資料傳輸到底層管道中。
Queue的執行個體q具有以下方法:
q.get( [ block [ ,timeout ] ] )
傳回q中的一個項目。如果q為空,此方法将阻塞,直到隊列中有項目可用為止。block用于控制阻塞行為,預設為True. 如果設定為False,将引發Queue.Empty異常(定義在Queue子產品中)。timeout是可選逾時時間,用在阻塞模式中。如果在制定的時間間隔内沒有項目變為可用,将引發Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
将item放入隊列。如果隊列已滿,此方法将阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設定為False,将引發Queue.Empty異常(定義在Queue庫子產品中)。timeout指定在阻塞模式中等待可用空間的時間長短。逾時後将引發Queue.Full異常。
q.qsize()
傳回隊列中目前項目的正确數量。此函數的結果并不可靠,因為在傳回結果和在稍後程式中使用結果之間,隊列中可能添加或删除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
如果調用此方法時 q為空,傳回True。如果其他程序或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在傳回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
如果q已滿,傳回為True. 由于線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
方法介紹
q.close()
關閉隊列,防止隊列中加入更多資料。調用此方法時,背景線程将繼續寫入那些已入隊列但尚未寫入的資料,但将在此方法完成時馬上關閉。如果q被垃圾收集,将自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的資料結束信号或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生産者中的隊列不會導緻get()方法傳回錯誤。
q.cancel_join_thread()
不會再程序退出時自動連接配接背景線程。這可以防止join_thread()方法阻塞。
q.join_thread()
連接配接隊列的背景線程。此方法用于在調用q.close()方法後,等待所有隊列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序調用。調用q.cancel_join_thread()方法可以禁止這種行為。
其他方法(了解)
簡單用法:
from multiprocessing import Process,Queue
q = Queue() #建立共享的程序隊列
q.put(1) # 将一個值放入隊列
q.put(2)
q.put('aaa')
print(q.get()) # 傳回q中的一個項目
執行輸出:1
因為1先進去,是以它先出來。
from multiprocessing import Process,Queue
def wahaha(q):
print(q.get())
if __name__ == '__main__':
q = Queue() #建立共享的程序隊列
Process(target=wahaha,args=(q,)).start()
q.put(1)
因為主程式先執行
雙向通信
既能取值,也能增加值
import time
from multiprocessing import Process,Queue
def wahaha(q):
print(q.get()) # 取值
q.put('aaa') # 增加一個aaa
if __name__ == '__main__':
q = Queue() #建立共享的程序隊列
Process(target=wahaha,args=(q,)).start()
q.put(1)
time.sleep(0.5) # 等待0.5秒,讓子程式執行完
print(q.get()) # 取值
aaa
'''
multiprocessing子產品支援程序間通信的兩種主要形式:管道和隊列
都是基于消息傳遞實作的,但是隊列接口
'''
from multiprocessing import Queue
q=Queue(3)
#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3) # 如果隊列已經滿了,程式就會停在這裡,等待資料被别人取走,再将資料放入隊列。
# 如果隊列中的資料一直不被取走,程式就會永遠停在這裡。
try:
q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except: # 是以我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去,但是會丢掉這個消息。
print('隊列已經滿了')
# 是以,我們再放入資料之前,可以先看一下隊列的狀态,如果已經滿了,就不繼續put了。
print(q.full()) #滿了
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一樣,如果隊列已經空了,那麼繼續取就會出現阻塞。
try:
q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except: # 是以我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去。
print('隊列已經空了')
print(q.empty()) #空了
單看隊列用法
上面這個例子還沒有加入程序通信,隻是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。
import time
from multiprocessing import Process, Queue
def f(q):
q.put([time.asctime(), 'from Eva', 'hello']) #調用主函數中p程序傳遞過來的程序參數 put函數為向隊列中添加一條資料。
if __name__ == '__main__':
q = Queue() #建立一個Queue對象
p = Process(target=f, args=(q,)) #建立一個程序
p.start()
print(q.get())
p.join()
子程序發送資料給父程序
上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的資料。 接下來看一個稍微複雜一些的例子:
import os
import time
import multiprocessing
# 向queue中輸入資料的函數
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.asctime())
queue.put(info)
# 向queue中輸出資料的函數
def outputQ(queue):
info = queue.get()
print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))
# Main
if __name__ == '__main__':
multiprocessing.freeze_support()
record1 = [] # store input processes
record2 = [] # store output processes
queue = multiprocessing.Queue(3)
# 輸入程序
for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)
# 輸出程序
for i in range(10):
process = multiprocessing.Process(target=outputQ,args=(queue,))
process.start()
record2.append(process)
for p in record1:
p.join()
for p in record2:
p.join()
批量生産資料放入隊列再批量擷取結果 x
生産者消費者模型
在并發程式設計中使用生産者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生産線程和消費線程的工作能力來提高程式的整體處理資料的速度。
為什麼要使用生産者和消費者模式
線上程世界裡,生産者就是生産資料的線程,消費者就是消費資料的線程。在多線程開發當中,如果生産者處理速度很快,而消費者處理速度很慢,那麼生産者就必須等待消費者處理完,才能繼續生産資料。同樣的道理,如果消費者的處理能力大于生産者,那麼消費者就必須等待生産者。為了解決這個問題于是引入了生産者和消費者模式。
什麼是生産者消費者模式
生産者消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列裡取,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力。
消費者 消費資料 吃包子
生産者 産生資料的人 做包子
假如生産了10個包子,但是來了15個人,供不應求了。
這就産生了供銷沖突,那怎麼解決呢?
增加做包子的人
或者采用同步模式 :做一個包子 賣一包子
再舉一個例子:
生産資料 在淘寶買東西 --- 産生消費者行為資料
消費資料 阿裡巴巴 --- 即時性要求非常高 必須要快速把使用者生産的資料消費完
籠屜,隻能放100個包子
如果蒸包子的數量過多,沒人買了,那麼就需要減少蒸包子的人
如果蒸包子的根據買包子的人,來生産包子,就比較完美了
queue隊列就是籠屜
基于隊列實作生産者消費者模型
舉例
import time
import random
from multiprocessing import Process,Queue
def producer(q): #生産者
for i in range(5): # 生産5個包子
time.sleep(random.random()) # 模拟生産包子時間
q.put('包子%s'%i)
def consumer(q): # 消費者
for i in range(5): # 5個消費者
print(q.get()) # 買一個包子
time.sleep(random.random()) # 模拟吃包子時間
if __name__ == '__main__':
q = Queue() #建立共享的程序隊列
p1 = Process(target=producer,args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start() # 啟動程序
p2.start()
包子0
包子1
包子2
包子3
包子4
增加顔色,顯示更明顯
import time
import random
from multiprocessing import Process,Queue
def producer(q): #生産者
for i in range(1,6): # 生産5個包子
time.sleep(random.random()) # 模拟生産包子時間
print('\033[1;31m生産了包子%s\033[0m' % i)
def consumer(q): # 消費者
for i in range(1,6): # 5個消費者
print('\033[1;32m消費了包子%s\033[0m' % i)
time.sleep(random.uniform(1,2)) # 模拟吃包子時間
if __name__ == '__main__':
q = Queue() #建立共享的程序隊列
p1 = Process(target=producer,args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start() # 啟動程序
p2.start()
發現供需不平衡
需要改變供需平衡
import time
import random
from multiprocessing import Process,Queue
def producer(q): #生産者
for i in range(1,11): # 生産5個包子
time.sleep(random.random()) # 模拟生産包子時間
q.put('包子%s'%i) # 生産包子,将包子放入隊列,比如包子1
print('\033[1;31m生産了包子%s\033[0m' % i)
def consumer(q): # 消費者
for i in range(1,6): # 5個消費者
food = q.get() # 買一個包子
print('\033[1;32m消費了包子%s\033[0m' % food)
time.sleep(random.uniform(1,2)) # 模拟吃包子時間
if __name__ == '__main__':
q = Queue() #建立共享的程序隊列
p1 = Process(target=producer,args=(q,))
p2 = Process(target=consumer, args=(q,))
p3 = Process(target=consumer, args=[q]) # 增加一個消費者程序
p1.start() # 啟動程序
p2.start()
p3.start()
上面是通過消費者和包子數量對等,解決供需平衡問題的。
其他例子:
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生産者們:即廚師們
p1=Process(target=producer,args=(q,))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
#開始
p1.start()
c1.start()
print('主')
此時的問題是主程序永遠不會結束,原因是:生産者p在生産完後就結束了,但是消費者c在取空了q之後,則一直處于死循環中且卡在q.get()這一步。
解決方式無非是讓生産者在生産完畢後,往隊列中再發一個結束信号,這樣消費者在接收到結束信号後就可以break出死循環。
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到結束信号則結束
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))
q.put(None) #發送結束信号
if __name__ == '__main__':
q=Queue()
#生産者們:即廚師們
p1=Process(target=producer,args=(q,))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
#開始
p1.start()
c1.start()
print('主')
改良版——生産者消費者模型
注意:結束信号None,不一定要由生産者發,主程序裡同樣可以發,但主程序需要等生産者結束後才應該發送該信号
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到結束信号則結束
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
def producer(q):
for i in range(2):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生産者們:即廚師們
p1=Process(target=producer,args=(q,))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
#開始
p1.start()
c1.start()
p1.join()
q.put(None) #發送結束信号
print('主')
主程序在生産者生産完畢後發送結束信号None
但上述解決方式,在有多個生産者和多個消費者時,我們則需要用一個很low的方式去解決
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到結束信号則結束
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
def producer(name,q):
for i in range(2):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生産者們:即廚師們
p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨頭',q))
p3=Process(target=producer,args=('泔水',q))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
#開始
p1.start()
p2.start()
p3.start()
c1.start()
p1.join() #必須保證生産者全部生産完畢,才應該發送結束信号
p2.join()
p3.join()
q.put(None) #有幾個消費者就應該發送幾次結束信号None
q.put(None) #發送結束信号
print('主')
多個消費者的例子:有幾個消費者就需要發送幾次結束信号
明日默寫:
搶票
import json
import time
import random
from multiprocessing import Process,Lock
def check_ticket(i):
with open('ticket') as f:
ticket_count = json.load(f)
print('person%s查詢目前餘票 :'%i,ticket_count['count'])
def buy_ticket(i,lock):
check_ticket(i)
lock.acquire()
with open('ticket') as f:
ticket_count = json.load(f)
time.sleep(random.random())
if ticket_count['count'] > 0:
print('person%s購票成功'%i)
ticket_count['count'] -= 1
else:
print('餘票不足,person%s購票失敗'%i)
time.sleep(random.random())
with open('ticket','w') as f:
json.dump(ticket_count,f)
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=buy_ticket,args=[i,lock]).start()
紅綠燈
import time
import random
from multiprocessing import Process,Event
def traffic_light(e,count): # 交通燈
while True:
while True:
# 事件在建立的時候,e的狀态是False,相當于程式中的紅燈
print('\033[1;31m紅燈亮\033[0m')
time.sleep(2) # 紅燈亮2秒
# 這裡e.is_set()是False,是以not e.is_set()就是True
if not e.is_set():e.set() # 判斷為True時,變綠燈
print('\033[1;32m綠燈亮\033[0m')
time.sleep(2) # 綠燈亮2秒
# 這裡e.is_set()是True
if e.is_set():e.clear() # 判斷為True時,将狀态設定為False
count += 1 # 自增1
if count == 5: # 判斷為5時,跳出内層循環
break
break #跳出内層循環
def car(i,e): # 汽車,感覺狀态的變化
if not e.is_set(): # 目前這個事件狀态是False
print('car%s正在等待'%i) # 這輛車正在等待通過路口
e.wait() # 阻塞,直到有一個e.set行為。正在等紅燈
print('car%s通過路口'%i) # 等待狀态為True,才能通過
if __name__ == '__main__':
e = Event() # 建立事件,預設狀态為False
Process(target=traffic_light,args=(e,0)).start() # 啟動紅燈程序
for i in range(1,6): # 模拟5個人
time.sleep(random.randrange(0,5,2)) # 大于等于0且小于5之間的奇數
Process(target=car,args=(i,e)).start() # 啟動交通燈程序