天天看點

Python網絡程式設計(程序池、程序間的通信)

線程池的原理:

       線程池是預先建立線程的一種技術。線程池在還沒有任務到來之前,

       建立一定數量的線程,放入空閑隊列中。這些線程都是處于睡眠狀态,

       即均為啟動,不消耗CPU,而隻是占用較小的記憶體空間。當請求到來之後,

       緩沖池給這次請求配置設定一個空閑線程,把請求傳入此線程中運作,進行處理。

       當預先建立的線程都處于運作狀态,即預制線程不夠,線程池可以自由建立一定數量的新線程,

       用于處理更多的請求。當系統比較閑的時候,也可以通過移除一部分一直處于停用狀态的線程。

程序間的通信原理:

         OS提供了溝通的媒介供程序之間“對話”用。既然要溝通,如同人類社會的溝通一樣,

         溝通要付出時間和金錢,計算機中也一樣,必然有溝通需要付出的成本。

         出于所解決問題的特性,OS提供了多種溝通的方式,每種方式的溝通成本也不盡相同,

         使用成本和溝通效率也有所不同。我們經常聽到的 管道、消息隊列、共享記憶體都是OS提供的供程序之間對話的方式。

Process(target, name, args, kwargs) name

       給

程序

預設

Process-1

,Process-2..... 

 p.name

檢視程序名

args:

       以

元組

的形式

給target函數傳參 kwargs: 字典 給對應鍵的值傳參 程序對象

的其他

常用屬性方法 p.name  p.start()   p.join()     p.pid: 擷取

建立程序的

pid

p.is_alive():

        判斷程序是處于alive狀态

p.daemon

:

        預設為Flase 如果

設定為True 主程序結束

殺死所有子程序

daemon屬性

一定要在start()前設定 設定

daemon

為True

一般

不需要加join()

daemon不是真正意義上的守護程序

守護程序 不受終端控制 背景自動運作  生命周期長 多程序copy一個檔案拆分為兩個進行儲存
 import os 
from multiprocessing import Process 
from time import sleep

#擷取檔案的大小
size = os.path.getsize("./timg.jpeg")  # 擷取檔案的位元組數
# f = open("timg.jpeg",'rb')
#複制前半部分
def copy1(img):
    f = open(img,'rb')  # 二進制讀取要複制的檔案
    n = size // 2
    fw = open('1.jpeg','wb')  # 二進制建立檔案

    while True:
        if n < 1024:  # 判斷檔案大小是否大于1024位元組 如果小于則直接讀取寫入
            data = f.read(n)
            fw.write(data)
            break
        data = f.read(1024)  # 否則每次循環讀取1024位元組并寫入
        fw.write(data)
        n -= 1024
    f.close()
    fw.close()

#複制後半部分
def copy2(img):
    f = open(img,'rb')  # 讀取檔案必須要每次讀取 如果在父程序中打開檔案流對像 
                        # 子程序會通同時調用一個檔案流對像 由于檔案流對象特性會記錄遊标
                        # 如若先執行後半部複制這前半部會導緻讀取不到資料
    fw = open('2.jpeg','wb')
    f.seek(size // 2,0)
    while True:
        data = f.read(1024)
        if not data:
            break 
        fw.write(data)
    fw.close()
    f.close()

p1 = Process(target = copy1,args = ('timg.jpeg',))  # 建立子程序并讓子程序分别同時複制
p2 = Process(target = copy2,args = ('timg.jpeg',))
p1.start()
p2.start()
p1.join()
p2.join()


                
os.path.getsize('./1.txt'):     讀取檔案大小 注:

    1.如果

多個子程序拷貝同一個 父程序的對象

則多個

子程序 使用的是同一個對象

(如檔案隊形,套接字,隊列,管道。。。)

    2.如果在

建立子程序後單獨建立的對象

,則多個

子程序各不相同 建立子自定義程序類

    1.編寫類

繼承Process

    2.在自定義類中

加載父類__init__以擷取父類屬性

      同時可以

自定義新的屬性

    3

.重寫run方法

在調用start時自動執行該方法

示例:

from multiprocessing import Process 
import time 

class ClockProcess(Process):
    def __init__(self,value):
        #調用父類init
        super().__init__()
        self.value = value 
    #重寫run方法
    def run(self):
        for i in range(5):
            time.sleep(self.value)
            print("The time is {}".format(time.ctime()))

p = ClockProcess(2)
#自動執行run
p.start()

p.join()
           
程序的缺點:

    程序在

建立和銷毀

的過程中

消耗

資源

相對

較多 程序池技術: 産生原因:

        如果有大量的任務需要多程序完成,而調用周期比較短且需要頻繁建立

此時可能産生大量程序頻繁建立銷毀的情況  消耗計算機資源較大

使用方法:

        1.

建立程序池 在池内放入适當數量的程序

2.

将事件封裝成函數

放入

程序池

3.事件不斷運作,

直到所有放入程序池事件運作完成

4.

關閉程序池 回收程序 from multiprocessing import pool pool(Process)

       功能:

建立程序池對象

       參數:程序數量

       傳回值:程序池對象

    pool = pool() pool.apply_async(fun, args, kwds)(異步執行) 将事件放入程序池内

       參數:

fun

:要執行的

函數 args

:以

形式為fun

傳參 kwds

       傳回值:

          傳回一個事件對象,通過p.

get()

函數可以擷取fun的傳回值

pool.close():

        功能:

,無法再加入新的事件,并等待已有事件結束執行

pool.join() 回收程序池 pool.apply(fun, args, kwds)

(同步執行)

       功能:将事件放入程序池内

          fun:要執行的函數

  args:以元組形式為fun傳參

  kwds:以字典形式為fun傳參

       沒有傳回值

示例:
 from multiprocessing import Pool 
from time import sleep,ctime 

def worker(msg):
    sleep(2)
    print(msg)
    return ctime()

#建立程序池對象
pool = Pool(processes = 4)

result = []
for i in range(10):
    msg = "hello %d"%i 
    #将事件放入程序池
    r = pool.apply_async(func = worker,args = (msg,))
    result.append(r)
    
    #同步執行
    # pool.apply(func = worker,args = (msg,))

#關閉程序池
pool.close()
#回收
pool.join()

#擷取事件函數傳回值
for i in result:
    print(i.get())
                
pool.map(func, iter)

要執行的

事件放入程序池

參數:

func

  要執行的

iter 可疊代對象
 from multiprocessing import Pool
import time 

def fun(n):
    time.sleep(1)
    print("執行 pool map事件",n)
    return n ** 2 

pool = Pool(4)

#在程序池放入6個事件
r = pool.map(fun,range(6))  # map高階函數 fun和iter執行6次
print("傳回值清單:",r)

pool.close()
pool.join()                
程序間的通信(IPC) 由于 空間獨立 資源無法共享,

    此時在

程序間通訊

需要專門的通訊方法 通信方法:        管道、消息隊列、共享記憶體        信号、信号量、套接字        管道通信:

           在記憶體中

開辟一塊記憶體空間 形成管道結構 多個程序使用同一個管道

,即可通過

對管道 讀寫操作進行通訊  multiprocessing --> Pipe    fd1,fd2 = Pipe(duplex=True) 建立管道

           預設表示

雙向管道

   如果設定為

False

則為

單向管道

           倆個管道對象的,分别表示管道的兩端

   如果是

雙向管道則均可讀寫

   如果是

fd1隻讀 fd2隻寫  fd.recv()

        功能:從管道

讀取資訊

傳回值:讀取到的内容

   當

管道為空則阻塞 fd.send(data) 向管道寫入内容

參數:要寫入的内容

管道滿時會阻塞

寫入

幾乎所有

Python所有資料類型 隊列通信:

   在

記憶體

開辟隊列結構空間

,多個程序可見,

多個程序操作同一個隊列

對象可以

實作消息存取工作

   在取出時

必須

按照存入

順序取出

先進先出

 q = Queue(maxsize=0)

     功能:

建立隊列對象

     參數:

maxsize

預設表示根據系統配置設定空間

儲存消息

如果

傳入一個正整數 表示

最多

存放多少條消息

     傳回值:隊列對象

q.put(data,[block,timeout])

      功能:向隊列

存入消息

      參數:

          data:存入消息(

支援Python資料類型

  block:預設

True

表示當隊

滿時阻塞

         設定為

非阻塞

  timeout:當

block為True

是表示

逾時檢測 data = q.get([block,timeout])

       功能:取出消息

       參數:

            block:設定為

當隊列為

空時阻塞

           設定為

    timeout:

         當

 q.full() 判斷隊列是否為滿  q.empty()

  判斷隊列

是否為空 q.qsize()

隊列中

消息的數量  q.close()  關閉

隊列

        共享記憶體通信:

    在

記憶體中開辟一段空間存儲資料

對多個程序可見,

每次寫入

共享記憶體中的内容

都會覆寫之前内容

    對記憶體的

讀操作不會改變記憶體中的内容 form multiprocessing import Value,Array shm = Value(ctype,obj) 共享記憶體

共享

空間 ctype:字元串  要轉換的c語言的資料類型

    obj:共享記憶體的

初始資料

傳回值:傳回共享記憶體對象

shm.value: 表示共享 的值
 from  multiprocessing import Process,Value 
import time 
import random  

#建立共享記憶體
money = Value('i',6000)

#存錢
def deposite():
    for i in range(100):
        time.sleep(0.05)
        #對value的修改就是對共享記憶體的修改
        money.value += random.randint(1,200)
#花銷
def withdraw():
    for i in range(100):
        time.sleep(0.04)
        #對value的修改就是對共享記憶體的修改
        money.value -= random.randint(1,200)

d = Process(target = deposite)
w = Process(target = withdraw)

d.start()
w.start()

d.join()
w.join()
print(money.value)                
shm = Array(ctype,obj) 開辟共享 ctype:要轉換的資料類型

    obj:

        要存入共享内容的的資料(

結構化資料 清單、字元串 表示要存入得内容

要求

資料結構内 類型相同 整數

開辟幾個單元的空間

傳回值:

傳回

共享記憶體對象 

 from multiprocessing import Process,Array
import time 

#建立共享記憶體
shm = Array('c',b"hello") #字元類型要求是bytes

#開辟5個整形單元的共享記憶體空間
# shm = Array('i',5)

def fun():
    for i in shm:
        print(i)
    shm[0] = b"H"

p = Process(target = fun)
p.start()
p.join()

print(shm.value) #從首位址列印字元串
# for i in shm:
#     print(i)

                
三種程序間通信差別: 管道通信:                           消息隊列:                          共享記憶體: 開辟空間:

     記憶體                                      記憶體                                    記憶體

讀寫方式:

兩端讀寫                               先進先出                        每次覆寫上次内容

                  單向/雙向 

效率: 

          一般                                     一般                                     較快

應用:

   多用于父子程序                        應用靈活廣泛                   複雜,需要同步互斥

Python網絡程式設計(程式池、程式間的通信)