線程池是預先建立線程的一種技術。線程池在還沒有任務到來之前,
建立一定數量的線程,放入空閑隊列中。這些線程都是處于睡眠狀态,
即均為啟動,不消耗CPU,而隻是占用較小的記憶體空間。當請求到來之後,
緩沖池給這次請求配置設定一個空閑線程,把請求傳入此線程中運作,進行處理。
當預先建立的線程都處于運作狀态,即預制線程不夠,線程池可以自由建立一定數量的新線程,
用于處理更多的請求。當系統比較閑的時候,也可以通過移除一部分一直處于停用狀态的線程。
程序間的通信原理:OS提供了溝通的媒介供程序之間“對話”用。既然要溝通,如同人類社會的溝通一樣,
溝通要付出時間和金錢,計算機中也一樣,必然有溝通需要付出的成本。
出于所解決問題的特性,OS提供了多種溝通的方式,每種方式的溝通成本也不盡相同,
使用成本和溝通效率也有所不同。我們經常聽到的 管道、消息隊列、共享記憶體都是OS提供的供程序之間對話的方式。
Process(target, name, args, kwargs) name:
給
程序取
名字
預設為
Process-1,Process-2.....
p.name檢視程序名
args:以
元組的形式
給target函數傳參 kwargs: 字典 給對應鍵的值傳參 程序對象的其他
常用屬性方法 p.name p.start() p.join() p.pid: 擷取建立程序的
pid号
p.is_alive():判斷程序是處于alive狀态
p.daemon:
預設為Flase 如果
設定為True 主程序結束時
殺死所有子程序daemon屬性
一定要在start()前設定 設定daemon
為True一般
不需要加join()daemon不是真正意義上的守護程序
守護程序 不受終端控制 背景自動運作 生命周期長 多程序copy一個檔案拆分為兩個進行儲存 import os
from multiprocessing import Process
from time import sleep
#擷取檔案的大小
size = os.path.getsize("./timg.jpeg") # 擷取檔案的位元組數
# f = open("timg.jpeg",'rb')
#複制前半部分
def copy1(img):
f = open(img,'rb') # 二進制讀取要複制的檔案
n = size // 2
fw = open('1.jpeg','wb') # 二進制建立檔案
while True:
if n < 1024: # 判斷檔案大小是否大于1024位元組 如果小于則直接讀取寫入
data = f.read(n)
fw.write(data)
break
data = f.read(1024) # 否則每次循環讀取1024位元組并寫入
fw.write(data)
n -= 1024
f.close()
fw.close()
#複制後半部分
def copy2(img):
f = open(img,'rb') # 讀取檔案必須要每次讀取 如果在父程序中打開檔案流對像
# 子程序會通同時調用一個檔案流對像 由于檔案流對象特性會記錄遊标
# 如若先執行後半部複制這前半部會導緻讀取不到資料
fw = open('2.jpeg','wb')
f.seek(size // 2,0)
while True:
data = f.read(1024)
if not data:
break
fw.write(data)
fw.close()
f.close()
p1 = Process(target = copy1,args = ('timg.jpeg',)) # 建立子程序并讓子程序分别同時複制
p2 = Process(target = copy2,args = ('timg.jpeg',))
p1.start()
p2.start()
p1.join()
p2.join()
os.path.getsize('./1.txt'): 讀取檔案大小 注: 1.如果
多個子程序拷貝同一個 父程序的對象則多個
子程序 使用的是同一個對象(如檔案隊形,套接字,隊列,管道。。。)
2.如果在
建立子程序後單獨建立的對象,則多個
子程序各不相同 建立子自定義程序類1.編寫類
繼承Process2.在自定義類中
加載父類__init__以擷取父類屬性,
同時可以
自定義新的屬性3
.重寫run方法在調用start時自動執行該方法
示例:
from multiprocessing import Process
import time
class ClockProcess(Process):
def __init__(self,value):
#調用父類init
super().__init__()
self.value = value
#重寫run方法
def run(self):
for i in range(5):
time.sleep(self.value)
print("The time is {}".format(time.ctime()))
p = ClockProcess(2)
#自動執行run
p.start()
p.join()
程序的缺點: 程序在
建立和銷毀的過程中
消耗的
資源相對
較多 程序池技術: 産生原因:如果有大量的任務需要多程序完成,而調用周期比較短且需要頻繁建立
此時可能産生大量程序頻繁建立銷毀的情況 消耗計算機資源較大
使用方法:1.
建立程序池 在池内放入适當數量的程序2.
将事件封裝成函數。
放入到
程序池3.事件不斷運作,
直到所有放入程序池事件運作完成4.
關閉程序池 回收程序 from multiprocessing import pool pool(Process)功能:
建立程序池對象參數:程序數量
傳回值:程序池對象
pool = pool() pool.apply_async(fun, args, kwds)(異步執行) 将事件放入程序池内參數:
fun:要執行的
函數 args:以
形式為fun
傳參 kwds傳回值:
傳回一個事件對象,通過p.
get()函數可以擷取fun的傳回值
pool.close():功能:
,無法再加入新的事件,并等待已有事件結束執行
pool.join() 回收程序池 pool.apply(fun, args, kwds)(同步執行)
功能:将事件放入程序池内
fun:要執行的函數
args:以元組形式為fun傳參
kwds:以字典形式為fun傳參
沒有傳回值
示例: from multiprocessing import Pool
from time import sleep,ctime
def worker(msg):
sleep(2)
print(msg)
return ctime()
#建立程序池對象
pool = Pool(processes = 4)
result = []
for i in range(10):
msg = "hello %d"%i
#将事件放入程序池
r = pool.apply_async(func = worker,args = (msg,))
result.append(r)
#同步執行
# pool.apply(func = worker,args = (msg,))
#關閉程序池
pool.close()
#回收
pool.join()
#擷取事件函數傳回值
for i in result:
print(i.get())
pool.map(func, iter) 将 要執行的
事件放入程序池參數:
func要執行的
iter 可疊代對象 from multiprocessing import Pool
import time
def fun(n):
time.sleep(1)
print("執行 pool map事件",n)
return n ** 2
pool = Pool(4)
#在程序池放入6個事件
r = pool.map(fun,range(6)) # map高階函數 fun和iter執行6次
print("傳回值清單:",r)
pool.close()
pool.join()
程序間的通信(IPC) 由于 空間獨立 資源無法共享, 此時在
程序間通訊就
需要專門的通訊方法 通信方法: 管道、消息隊列、共享記憶體 信号、信号量、套接字 管道通信:在記憶體中
開辟一塊記憶體空間 形成管道結構 多個程序使用同一個管道,即可通過
對管道 讀寫操作進行通訊 multiprocessing --> Pipe fd1,fd2 = Pipe(duplex=True) 建立管道預設表示
雙向管道如果設定為
False則為
單向管道倆個管道對象的,分别表示管道的兩端
如果是
雙向管道則均可讀寫如果是
則
fd1隻讀 fd2隻寫 fd.recv()功能:從管道
讀取資訊傳回值:讀取到的内容
當
管道為空則阻塞 fd.send(data) 向管道寫入内容參數:要寫入的内容
管道滿時會阻塞 可以
寫入幾乎所有
Python所有資料類型 隊列通信:在
記憶體中
開辟隊列結構空間,多個程序可見,
多個程序操作同一個隊列對象可以
實作消息存取工作在取出時
必須按照存入
順序取出(
先進先出)
q = Queue(maxsize=0)功能:
建立隊列對象
參數:
maxsize預設表示根據系統配置設定空間
儲存消息如果
傳入一個正整數 表示最多
存放多少條消息傳回值:隊列對象
q.put(data,[block,timeout])功能:向隊列
存入消息參數:
data:存入消息(
支援Python資料類型block:預設
True表示當隊
滿時阻塞設定為
非阻塞timeout:當
block為True是表示
逾時檢測 data = q.get([block,timeout])功能:取出消息
參數:
block:設定為
當隊列為
空時阻塞設定為
timeout:
當
q.full() 判斷隊列是否為滿 q.empty()判斷隊列
是否為空 q.qsize()隊列中
消息的數量 q.close() 關閉隊列
共享記憶體通信:在
記憶體中開辟一段空間存儲資料對多個程序可見,
每次寫入共享記憶體中的内容
都會覆寫之前内容對記憶體的
讀操作不會改變記憶體中的内容 form multiprocessing import Value,Array shm = Value(ctype,obj) 共享記憶體共享
空間 ctype:字元串 要轉換的c語言的資料類型obj:共享記憶體的
初始資料傳回值:傳回共享記憶體對象
shm.value: 表示共享 的值 from multiprocessing import Process,Value
import time
import random
#建立共享記憶體
money = Value('i',6000)
#存錢
def deposite():
for i in range(100):
time.sleep(0.05)
#對value的修改就是對共享記憶體的修改
money.value += random.randint(1,200)
#花銷
def withdraw():
for i in range(100):
time.sleep(0.04)
#對value的修改就是對共享記憶體的修改
money.value -= random.randint(1,200)
d = Process(target = deposite)
w = Process(target = withdraw)
d.start()
w.start()
d.join()
w.join()
print(money.value)
shm = Array(ctype,obj) 開辟共享 ctype:要轉換的資料類型 obj:
要存入共享内容的的資料(
結構化資料 清單、字元串 表示要存入得内容要求
資料結構内 類型相同 整數要
開辟幾個單元的空間傳回值:
傳回共享記憶體對象
from multiprocessing import Process,Array
import time
#建立共享記憶體
shm = Array('c',b"hello") #字元類型要求是bytes
#開辟5個整形單元的共享記憶體空間
# shm = Array('i',5)
def fun():
for i in shm:
print(i)
shm[0] = b"H"
p = Process(target = fun)
p.start()
p.join()
print(shm.value) #從首位址列印字元串
# for i in shm:
# print(i)
三種程序間通信差別: 管道通信: 消息隊列: 共享記憶體: 開辟空間: 記憶體 記憶體 記憶體
讀寫方式:兩端讀寫 先進先出 每次覆寫上次内容
單向/雙向
效率:一般 一般 較快
應用:多用于父子程序 應用靈活廣泛 複雜,需要同步互斥
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcukTM4cDMzUTO10yN3EDN0MjMzIzMxgDM4EDMy0yNxYDMzQTMvwFOwgTMwIzLcdTM2AzM0EzLcd2bsJ2Lc12bj5ycn9Gbi52YugTMwIzcldWYtl2Lc9CX6MHc0RHaiojIsJye.png)