天天看點

[904]python多程序之間共享資料

一、Python multiprocessing 跨程序對象共享

在mp庫當中,跨程序對象共享有三種方式,

第一種僅适用于原生機器類型,即python.ctypes當中的類型,這種在mp庫的文檔當中稱為shared memory 方式,即通過共享記憶體共享對象;

另外一種稱之為server process , 即有一個伺服器程序負責維護所有的對象,而其他程序連接配接到該程序,通過代理對象操作伺服器程序當中的對象;

最後一種在mp文檔當中沒有單獨提出,但是在其 中多次提到,而且是mp庫當中最重要的一種共享方式,稱為inheritance ,即繼承,對象在 父程序當中建立,然後在父程序是通過multiprocessing.Process建立子程序之後,子程序自動繼承了父程序當中的對象,并且子程序對這 些對象的操作都是反映到了同一個對象。

這三者共享方式各有特色,在這裡進行一些簡單的比較。

首先是共享方式所應對的對象類型,看這個表:

共享方式 支援的類型
Shared memory ctypes當中的類型,通過RawValue,RawArray等包裝類提供
Inheritance 系統核心對象,以及基于這些對象實作的對象。包括Pipe, Queue, JoinableQueue, 同步對象(Semaphore, Lock, RLock, Condition, Event等等)
Server process 所有對象,可能需要自己手工提供代理對象(Proxy)

這個表總結了三種不同的共享方式所支援的類型,下面一個個展開讨論。

其中最單純簡單的就是shared memory這種方式,隻有ctypes當中的資料類型可以通過這種方式共享。由于mp庫本身缺少命名的機制,即在一個程序當中建立的對象,無法在另外一 個程序當中通過名字來引用,是以,這種共享方式依賴于繼承,對象應該由父程序建立,然後由子程序引用。關于這種機制的例子,可以參見Python文檔 當中的例子 Synchronization types like locks, conditions and queues,參考其中的test_sharedvalues函數。

然後是繼承方式。首先關于繼承方式需要有說明,繼承本質上并不是一種對象共享的機制,對象共享隻是其副作用。子程序從父程序繼承來的對象并不一定是 共享的。繼承本質上是父程序fork出的子程序自動繼承父程序的記憶體狀态和對象描述符。是以,實際上子程序複制 了一份 父程序的對象,隻不過,當這個對象包裝了一些系統核心對象的描述符的時候,拷貝這個對象(及其包裝的描述符)實作了對象的共享。是以,在上面的表當中,隻 有系統核心對象,和基于這些對象實作的對象,才能夠通過繼承來共享。通過繼承共享的對象在linux平台上沒有任何限制,但是在Windows上面由于沒 有fork的實作,是以有一些額外的限制條件 ,是以,在Windows上面,繼承方式是幾乎無法用的。

最後就是Server Process這種方式。這種方式可以支援的類型比另外兩種都多,因為其模型是這樣的:

[904]python多程式之間共享資料

在這個模型當中,有一個manager程序,負責管理實際的對象。真正的對象也是在manager程序的記憶體空間當中。所有需要通路該對象的程序都 需要先連接配接到該管理程序,然後擷取到對象的一個代理對象(Proxy object),通常情況下,這個代理對象提供了實際對象的公共函 數 的代理,将函數參數進行pickle,然後通過連接配接傳送到管理程序當中,管理程序将參數unpickle之後,轉發給相應的實際對象 的函數,傳回值(或者異常)同樣經過管理程序pickle之後,通過連接配接傳回到客戶程序,再由proxy對象進行unpickle,傳回給調用者或者抛出 異常。

很明顯,這個模型是一個典型的RPC(遠端過程調用)的模型。因為每個客戶程序實際上都是在通路manager程序當中的對象,是以完全可以通過這 個實作對象共享。

manager和proxy之間的連接配接可以是基于socket的網絡連接配接,也可以是unix pipe。如果是使用基于socket的連接配接方式,在使用proxy之前,需要調用manager對象的connect函數與遠端的manager程序建 立連接配接。由于manager程序會打開端口接收該連接配接,是以必要的身份驗證是需要的,否則任何人都可以連上manager弄亂你的共享對象。mp庫通過 authkey的方式來進行身份驗證。

在實作當中,manager程序通過multiprocessing.Manager類或者BaseManager的子類實作。 BaseManager提供了函數register注冊一個函數來擷取共享對象的proxy。這個函數會被客戶程序調用,然後在manager程序當中執 行。這個函數可以傳回一個共享的對象(對所有的調用傳回同一個對象),或者可以為每一個調用建立一個新的對象,通過前者就可以實作多個程序共享一個對象。 關于這個的用法可以參考Python文檔 當中的例子“Demonstration of how to create and use customized managers and proxies”。

典型的導出一個共享對象的代碼是:

ObjectType object_
class ObjectManager(multiprocessing.managers.BaseManager): pass
ObjectManager.register("object", lambda: object_)
           

注意上面介紹proxy對象的時候,我提到的“公共函數”四個字。每個proxy對象隻會導出實際對象的公共函數。這裡面有兩個含義,一個是“公 共”,即所有非下劃線開頭的成員,另一個是“函數”,即所有callable的成員。這就帶來一些限制,一是無法導出屬性,二是無法導出一些公共的特殊函 數,例如

__get__

,

__next__

等等。對于這個mp庫有一套處理,即自定義proxy對象。首先是BaseManager的register可以提供一個 proxy_type作為第三個參數,這個參數指定了哪些成員需要被導出。詳細的使用方法可以參見文檔當中的第一個例子。

另外manager還有一些細節的問題需要注意。由于Proxy對象不是線程安全的,是以如果需要在一個多線程程式當中使用proxy,mp庫會為 每個線程建立一個proxy對象,而每個proxy對象都會對server process建立一個連接配接,而manager那邊對于每個連接配接都建立一個單獨的線程來為其服務。這樣帶來的問題就是,如果客戶程序有很多線程,很容易會 導緻manager程序的fd數目達到ulimit的限制,即使沒有達到限制,也會因為manager程序當中有太多線程而嚴重影響manager的性 能。解決方案可以是一個程序内cache,隻有一個單獨的線程可以建立proxy對象通路共享對象,其餘線程隻能通路該程序當中的cache。

一旦manager因為達到ulimit限制或者其他異常,manager會直接退出,遺憾的是,這時候已經建立的proxy會試圖重新連接配接 manager – 但是它已經不存在了。這個會導緻客戶程序hang在對proxy的函數調用上,這個時候,目前除了殺掉程序沒有找到别的辦法。

另外proxy使用socket的方式比較tricky,是以和内置的socket庫有很多沖突,比如 socket.setdefaulttimeout(Python Issue 6056 )。在setdefaulttimeout調用了之後,程序當中所有通過socket子產品建立的socket都是被設定為unblock模式的,但是mp 庫并不知道這一點,而且它總是假設socket都是block模式的,于是,一旦調用了setdefaulttimeout,所有對于proxy的函數調 用都會抛出OSError,錯誤代碼為11,錯誤原因是非常有誤導性的“Resource temporarily unavailable”,實際上就是EAGAIN。這個錯誤可以通過我提供的一個patch 來補救(這個patch當中還包含其他的一些修複,是以請自行檢視并修改該patch)。

由于以上的一些原因,server process模式作為一個對象的共享模式,能夠提供最為靈活的共享方式,但是也有最多的問題。這個在使用過程當中就靠自己去衡量了。目前我們的系統對于 資料可靠性方面要求不高,丢失資料是可以接受的,但是也隻用這種模式來維護統計值,不敢用來維護更多的東西。

二、Python多程序寫入同一檔案

最近用python的正規表達式處理了一些文本資料,需要把結果寫到檔案裡面,但是由于檔案比較大,是以運作起來花費的時間很長。但是打開任務管理器發現CPU隻占用了25%,上網找了一下原因發現是由于一個叫GIL的存在,使得Python在同一時間隻能運作一個線程,是以隻占用了一個CPU,由于我的電腦是4核的,是以CPU使用率就是25%了。

既然多線程沒有什麼用處,那就可以使用多程序來處理,畢竟多程序是可以不受GIL影響的。Python提供了一個multiprocessing的多程序庫,但是多程序也有一些問題,比如,如果程序都需要寫入同一個檔案,那麼就會出現多個程序争用資源的問題,如果不解決,那就會使檔案的内容順序雜亂。這就需要涉及到鎖了,但是加鎖一般會造成程式的執行速度下降,而且如果程序在多處需要向檔案輸出,也不好把這些代碼整個都鎖起來,如果都鎖起來,那跟單程序還有什麼差別。有一個解決辦法就是把向檔案的輸出都整合到一塊去,在這一塊集中加個鎖,這樣問題就不大了。不過還有一種更加優雅的解決方式:使用multiprocessing庫的回調函數功能。

具體思路跟把檔案輸出集中在一起也差不多,就是把程序需要寫入檔案的内容作為傳回值傳回給惠和的回調函數,使用回調函數向檔案中寫入内容。這樣做在windows下面還有一個好處,在windows環境下,python的多程序沒有像linux環境下的多程序一樣,linux環境下的multiprocessing庫是基于fork函數,父程序fork了一個子程序之後會把自己的資源,比如檔案句柄都傳遞給子程序。但是在windows環境下沒有fork函數,是以如果你在父程序裡打開了一個檔案,在子程序中寫入,會出現

ValueError: I/O operation on closed file

這樣的錯誤,而且在windows環境下最好加入

if __name__ == '__main__'

這樣的判斷,以避免一些可能出現的RuntimeError或者死鎖。

下面是代碼:

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time,datetime


def mycallback(x):
    list1.append(x)

def sayHi(num):
    time.sleep(num)
    return num


if __name__ == '__main__':
    starttime = datetime.datetime.now()
    pool = Pool(4)
    list1 = []
    for i in range(4):
        pool.apply_async(sayHi, (i,), callback=mycallback)
    pool.close()
    pool.join()
    print(list1)
    endtime = datetime.datetime.now()
    print('times:',(endtime - starttime).seconds)
           

三、Python 程序之間共享資料(全局變量)

程序之間共享資料(數值型):

# -*- coding:utf-8 -*-
import multiprocessing


def func(num):
    num.value = 10.78  # 子程序改變數值的值,主程序跟着改變


if __name__ == "__main__":
    # d表示數值,主程序與子程序共享這個value。(主程序與子程序都是用的同一個value)
    num = multiprocessing.Value("d", 10.0)
    print(num.value)
    p = multiprocessing.Process(target=func, args=(num,))
    p.start()
    p.join()
    print(num.value)
           

程序之間共享資料(數組型):

# -*- coding:utf-8 -*-
import multiprocessing


def func(num):
    num[2] = 9999  # 子程序改變數組,主程序跟着改變


if __name__ == "__main__":
    num = multiprocessing.Array("i", [1, 2, 3, 4, 5])  # 主程序與子程序共享這個數組
    print(num[:])
    p = multiprocessing.Process(target=func, args=(num,))
    p.start()
    p.join()
    print(num[:])
           

程序之間共享資料(dict,list):

# -*- coding:utf-8 -*-
import multiprocessing


def func(mydict, mylist):
    mydict["index1"] = "aaaaaa"  # 子程序改變dict,主程序跟着改變
    mydict["index2"] = "bbbbbb"
    mylist.append(11)  # 子程序改變List,主程序跟着改變
    mylist.append(22)
    mylist.append(33)


if __name__ == "__main__":
    with multiprocessing.Manager() as MG:  # 重命名
        mydict = multiprocessing.Manager().dict()  # 主程序與子程序共享這個字典
        mylist = multiprocessing.Manager().list(range(5))  # 主程序與子程序共享這個List
        p = multiprocessing.Process(target=func, args=(mydict, mylist))
        p.start()
        p.join()
        print(mylist)
        print(mydict)
           

來源:https://www.cnblogs.com/xiaxuexiaoab/p/8558519.html

https://docs.python.org/3/library/multiprocessing.html