天天看點

【Multiprocessing】Python多程序記憶體共享資料隊列SMQueue

0x00 前言

自從先前研究了下Python的多程序計算(原文連結)之後,

深深地感受到多程序處理的美好,并決定運用到模型訓練時,

作為 feed data 的資料處理子產品使用,實作工具類 SharedMemoryQueue。

但是衆所周知,python的程序間通訊比較複雜,更别提大量程序的時候……想想就頭疼……

但是python本身就是萬金油,為啥我非要用python的機制呢,用python調用C不就好啦!

簡要記錄一下解決方案(我覺得聰明的小夥伴看到這個import,就已經懂了一半了):

from ctypes import c_int, c_float, cast, POINTER
from multiprocessing.sharedctypes import Array, Value
                

0x01 基本思路

在C語言中,聲明變量時是可以開一塊記憶體區域的,

并且,在知道這塊記憶體指針的情況下可以通過指針調用這一塊資料。

然後對于目前需求中這種 “生産者—消費者” 的模式:

1)生産者為多個,即多個程序分别生産資料,誰生産好了誰就寫一塊;

2)消費者為一個,即一個程序每次需求資料,從已生産資料中拿一塊;

那麼就很明了了,隊列這種資料結構就可以解決這個問題。

舉個例子說明一下具體流程(即生産者—消費者模式):

對于【A(即預設每塊資料占用記憶體量) x 10 (即隊列最大長度)】的記憶體,

開多個程序生成資料(生産者),對于每個程序,生成之後把不足預設占用記憶體的部分padding(補零),

然後寫入隊列(這裡需要寫入鎖,展現為

with write_lock

),将寫入指針後移一個 A 的偏移位址,

當偏移位址超過預設最大長度(此處為

10

)的時候,回到前面第一塊記憶體的位置(取模

write_idx % queue_size

操作),

特别的,當

write_idx - read_idx

等于預設隊列最大長度即隊列已滿的時候進入等待狀态,

對于讀取資料的程序(消費者),直接向隊列讀取目前讀取(此處為了防止多個消費者也加了讀取鎖),

如果資料的使用不需要padding,讀取完畢後記得把先前padding的部分去掉,并将讀取指針後移一個 A 的偏移位址。

0x02 Source Code

Code 思路來源于 @lihongwei / @lhw446

個人進行了Feature的增加與修改,包括但不限于: 冷啟動 與 逾時重置的處理,

主要修改在基于該隊列的HVDprocessor,就放在下篇裡再寫吧~

# coding=utf8
# ========================================================
#   Copyright (C) 2017-2018 All rights reserved.
# 
#   filename : shared_memory_queue.py
#   author   : lihongwei / [email protected]
#   update   : chendian / [email protected]
#   date     : 2017-11-28
#   desc     : a shared memory queue for data processor
# ========================================================

import time
import numpy as np
import multiprocessing
from ctypes import c_int, c_float, cast, POINTER
from multiprocessing.sharedctypes import Array, Value


class SMQueue(object):
    ''' a shared memory queue for data processor '''

    # pylint: disable=protected-access
    def __init__(self, queue_size, f_data_size, i_data_size):
        queue_size += 1  # plus 1 is for the one consumer space
        f_cdatasets = Array('f', np.zeros((queue_size * f_data_size), dtype=np.float32))
        i_cdatasets = Array('i', np.zeros((queue_size * i_data_size), dtype=np.int32))
        self.f_cbuffer = f_cdatasets._obj._wrapper
        self.i_cbuffer = i_cdatasets._obj._wrapper
        self.read_idx = Value('i', 0)
        self.write_idx = Value('i', 0)
        self.queue_size = queue_size
        self.f_data_size = f_data_size
        self.i_data_size = i_data_size
        self.read_lock = multiprocessing.Lock()
        self.write_lock = multiprocessing.Lock()

    def get(self, time_gap=0.1, time_out=1234, cold_boot=False):
        ''' get f_data, i_data from queue '''
        with self.read_lock:
            time_cnt = 0
            while self.read_idx.value == self.write_idx.value:
                time_cnt += 1
                if time_cnt >= time_out and not cold_boot:
                    return None, None
                time.sleep(time_gap)
            index = self.read_idx.value % self.queue_size
            f_buffer_ptr = cast(self.f_cbuffer.get_address() + index * self.f_data_size * 4,
                                POINTER(c_float))
            i_buffer_ptr = cast(self.i_cbuffer.get_address() + index * self.i_data_size * 4,
                                POINTER(c_int))
            f_data = np.ctypeslib.as_array(f_buffer_ptr, shape=(self.f_data_size, ))
            i_data = np.ctypeslib.as_array(i_buffer_ptr, shape=(self.i_data_size, ))
            self.read_idx.value += 1
            return f_data, i_data

    def put(self, f_data, i_data, time_gap=0.1):
        ''' put f_data and i_data to queue '''
        with self.write_lock:
            # only use queue_size-1 space
            while self.write_idx.value - self.read_idx.value == self.queue_size - 1:
                time.sleep(time_gap)
            index = self.write_idx.value % self.queue_size
            f_buffer_ptr = cast(self.f_cbuffer.get_address() + index * self.f_data_size * 4,
                                POINTER(c_float))
            i_buffer_ptr = cast(self.i_cbuffer.get_address() + index * self.i_data_size * 4,
                                POINTER(c_int))
            o_f_data = np.ctypeslib.as_array(f_buffer_ptr, shape=(self.f_data_size,))
            o_i_data = np.ctypeslib.as_array(i_buffer_ptr, shape=(self.i_data_size,))
            o_f_data[:] = f_data
            o_i_data[:] = i_data
            self.write_idx.value += 1

    def push(self, f_data, i_data, time_gap=0.1):
        self.put(f_data, i_data, time_gap)

    def reset(self):
        self.read_idx = Value('i', 0)
        self.write_idx = Value('i', 0)

    def queue_info(self):
        return "{}/{}".format(self.write_idx.value - self.read_idx.value, self.queue_size)