天天看點

python multiprocessing 通信問題 -- Manager

由于程序之間不共享記憶體,是以程序之間的通信不能像線程之間直接引用,因而需要采取一些政策來完成程序之間的資料通信。

本文記錄使用 Manager 來完成程序間通信的方式。

首先描述需求:

    場景:頂層邏輯負責管理,我們定義為C,由C啟動A、B兩個程序聯合完成功能

    需求:A、B聯合工作過程中的資料通信

python multiprocessing 通信問題 -- Manager

解決思路:

    利用頂層C建立一個 Manager,由 Manager 提供資料池分發給A、B使用,進而完成兩個程序之間的通信。

#-*-encoding:utf-8-*-
from multiprocessing import Process, Manager
from time import sleep


def thread_a_main(sync_data_pool):  # A 程序主函數,存入100+的數
    for ix in range(100, 105):
        sleep(1)
        sync_data_pool.append(ix)


def thread_b_main(sync_data_pool):  # B 程序主函數,存入300+的數
    for ix in range(300, 309):
        sleep(0.6)
        sync_data_pool.append(ix)


def _test_case_000():  # 測試用例
    manager = Manager()  # multiprocessing 中的 Manager 是一個工廠方法,直接擷取一個 SyncManager 的執行個體
    sync_data_pool = manager.list()  # 利用 SyncManager 的執行個體來建立同步資料池
    Process(target=thread_a_main, args=(sync_data_pool, )).start()  # 建立并啟動 A 程序
    Process(target=thread_b_main, args=(sync_data_pool, )).start()  # 建立并啟動 B 程序
    for ix in range(6):  # C 程序(主程序)中實時的去檢視資料池中的資料
        sleep(1)
        print(sync_data_pool)


if '__main__' == __name__:  # 養成好習慣,将測試用例單獨列出
    _test_case_000()
           

輸出結果:

[]
[300, 100, 301]
[300, 100, 301, 101, 302, 303]
[300, 100, 301, 101, 302, 303, 102, 304]
[300, 100, 301, 101, 302, 303, 102, 304, 305, 103, 306]
[300, 100, 301, 101, 302, 303, 102, 304, 305, 103, 306, 104, 307, 308]
           

從結果來看, 我們的300+和100+的資料并行的被插入到同一資料池中了,也就是說,通信的目的達到了

隻是從目前的方式來看,接收方必須主動的去查詢資料池,類似于信箱的方式了

除了注釋中的内容外,還需要注意:

    1、manager 要在頂層建立

    2、同步資料池是通過Manager自身的工廠方法建立的,這裡 manager.list() 調用一次即産生一個新的資料池,而不是傳回同一個資料池執行個體,是以資料池的執行個體需要做好管理

    3、可以用的資料格式不僅list,可以選擇如下(參考multiprocessing.managers)

class SyncManager(BaseManager):
    '''
    Subclass of `BaseManager` which supports a number of shared object types.

    The types registered are those intended for the synchronization
    of threads, plus `dict`, `list` and `Namespace`.

    The `multiprocessing.Manager()` function creates started instances of
    this class.
    '''

SyncManager.register('Queue', Queue.Queue)
SyncManager.register('JoinableQueue', Queue.Queue)
SyncManager.register('Event', threading.Event, EventProxy)
SyncManager.register('Lock', threading.Lock, AcquirerProxy)
SyncManager.register('RLock', threading.RLock, AcquirerProxy)
SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
                     AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
SyncManager.register('Pool', Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
SyncManager.register('Value', Value, ValueProxy)
SyncManager.register('Array', Array, ArrayProxy)
SyncManager.register('Namespace', Namespace, NamespaceProxy)

# types returned by methods of PoolProxy
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
SyncManager.register('AsyncResult', create_method=False)
           

【2018/6/16】在kivy應用中使用了Manager,經過PyInstaller釋出的時候,遇到兩個問題,記錄于此。

1、PyCharm上運作正常,PyInstaller編譯後的程式在啟動時不斷的建立新視窗【已解決】

    問題原因:沒有添加 freeze_support

    解決思路:from multiprocessing import freeze_support, 在 main 的第一行添加 freeze_support() 解決該問題

    鳴謝:感謝 Filip Radović 提供的解決思路

    PO一下我當時提的ISSUE作為參考:https://github.com/kivy/kivy/issues/5807

2、PyCharm上運作正常,QPythonL加載後,Manager().start()遇到 EOFError

    原因分析:EOFError指的是 End of flow,就是說讀取到結尾了。而對于此場景,Manager().start()會向Pipe寫入内容,除非寫入失敗,寫入失敗意味着連接配接失敗,對否??(目前尚未證明/解決,純屬個人猜測,後續更新)