天天看點

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

哪些因素決定了chunksize方法的最佳參數multiprocessing.Pool.map()?該.map()方法似乎使用任意啟發式作為其預設的chunksize(如下所述);是什麼推動了這種選擇,是否有基于某些特定情況/設定的更周到的方法?

示例 - 說我是:

傳遞iterable到.map()擁有約1500萬個元素的元素;

24個核的機器上工作,使用預設processes = os.cpu_count()内multiprocessing.Pool()。

我天真的想法是給每24個勞工一個同樣大小的塊,即15_000_000 / 24625,000。大塊應該在充分利用所有勞工的同時減少營業額/管理費用。但似乎缺少給每個勞工提供大批量的一些潛在缺點。這是不完整的圖檔,我錯過了什麼?

我的部分問題源于ifchunksize=None:both.map()和.starmap()call的預設邏輯,.map_async()如下所示:

def_map_async(self,func,iterable,mapper,chunksize=None,callback=None,error_callback=None):# ... (materialize `iterable` to list if it's an iterator)ifchunksizeisNone:chunksize,extra=divmod(len(iterable),len(self._pool)*4)# ????ifextra:chunksize+=1iflen(iterable)==0:chunksize=0

背後的邏輯是divmod(len(iterable), len(self._pool) * 4)什麼?這意味着chunksize将更接近15_000_000 / (24 * 4) == 156_250。乘以len(self._pool)4的意圖是什麼?

這使得得到的chunksize比我上面的“天真邏輯”小4倍,其中包括将iterable的長度除以in的數量pool._pool。

最後,還有來自Python文檔的這個片段.imap(),進一步激發了我的好奇心:

chunksize參數與map()方法使用的參數相同。對于使用了一個較大的值很長iterableschunksize可以使工作完成多少不是使用預設值1速度更快。

解決方案

簡答

Pool的chunksize-algorithm是一種啟發式算法。它為您嘗試填充Pool方法的所有可想象的問題場景提供了一個簡單的解決方案。是以,無法針對任何特定方案進行優化。

該算法任意地将可疊代的塊分成大約比原始方法多四倍的塊。更多的塊意味着更多的開銷,但增加了排程靈活性。這個答案将如何表明,這會導緻平均較高的勞工使用率,但不能保證每個案例的總計算時間更短。

“很高興知道”你可能會想,“但是如何知道這對我的具體多處理問題有幫助?”嗯,事實并非如此。更誠實的簡短回答是,“沒有簡短的答案”,“多處理是複雜的”和“它取決于”。觀察到的症狀可能有不同的根源,即使是類似的情況。

這個答案試圖為您提供基本概念,幫助您更清楚地了解Pool的排程黑匣子。它還試圖為您提供一些基本工具,用于識别和避免潛在的懸崖,因為它們與塊狀結構有關。

目錄

第一部分

定義

并行化目标

并行化方案

Chunksize的風險> 1

Pool的Chunksize-Algorithm

量化算法效率

6.1模型

6.2并行計劃

6.3效率

6.3.1絕對配置設定效率(ADE)

6.3.2相對配置設定效率(RDE)

天真與池的大塊算法

現實檢查

結論

有必要首先澄清一些重要的術語。

1.定義

這裡的塊是iterable池方法調用中指定的-argument的一部分。如何計算chunksize以及它可能産生的影響,是這個答案的主題。

任務

在資料方面,任務在工作程序中的實體表示可以在下圖中看到。

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

該圖顯示了一個示例調用pool.map(),沿着一行代碼顯示,從multiprocessing.pool.worker函數中擷取,其中從inqueuegets中讀取的任務被解壓縮。worker是MainThreadpool-worker-process中的底層main-function。該func池中法規定-argument隻會比對的func内部-variableworker-function單呼的方法,如apply_async和imap用chunksize=1。對于具有chunksize-parameter的其餘池方法,處理函數func将是映射器函數(mapstar或starmapstar)。此函數将使用者指定的func參數映射到傳輸的可疊代塊( - >“map-tasks”)的每個元素上。這需要時間,定義任務也作為一個工作機關。

Taskel

雖然對于一個塊的整個處理使用“任務”一詞是由内部的代碼比對的multiprocessing.pool,但是沒有訓示如何對使用者指定的單個調用func(塊的一個元素作為參數)應該是提到。為了避免出現命名沖突引起的混淆(想想maxtasksperchildPool的__init__-method的參數),這個答案将把任務中的單個工作單元稱為taskel。

甲taskel(從任務+ ELEMENT)是一種内工作的最小機關的任務。它是使用func-merameter -parameter指定的函數的單次執行Pool,使用從傳輸的塊的單個元素獲得的參數調用。一個任務由taskels。chunksize

并行化開銷(PO)

PO由Python内部開銷和程序間通信(IPC)的開銷組成。Python中的每任務開銷帶有打包和解包任務及其結果所需的代碼。IPC開銷伴随着線程的必要同步以及不同位址空間之間的資料複制(需要兩個複制步驟:parent - > queue - > child)。IPC開銷的數量取決于作業系統,硬體和資料大小,這使得對影響的概括變得困難。

2.并行化目标

使用多處理時,我們的總體目标(顯然)是最小化所有任務的總處理時間。為實作這一總體目标,我們的技術目标需要優化硬體資源的使用率。

實作技術目标的一些重要子目标是:

最小化并行化開銷(最着名的,但不是唯一的:IPC)

所有cpu核心的高使用率

保持記憶體使用有限,以防止作業系統過度分頁(垃圾)

首先,任務需要在計算上足夠重(密集),以獲得我們必須為并行化支付的PO。PO的相關性随着每個任務的絕對計算時間的增加而減少。或者,換句話說,對于您的問題,每個任務的絕對計算時間越大,減少PO的需求越少。如果您的計算每個任務需要幾個小時,那麼相比之下,IPC開銷可以忽略不計。這裡主要關注的是在分發所有任務之後防止空閑工作程序。保持所有核心的負載意味着,我們盡可能地進行并行化。

3.并行化方案

哪些因素決定了multiprocessing.Pool.map()等方法的最佳chunksize參數

問題的主要因素是我們的單個任務組的計算時間可能會有多大差異。為此命名,最佳chunksize的選擇由...決定。

每個任務的計算時間的變異系數(CV)。

從這種變化的程度來看,規模上的兩種極端情景是:

所有任務都需要完全相同的計算時間。

任務可能需要幾秒或幾天才能完成。

為了更好的可記憶性,我将這些場景稱為:

密集的場景

廣泛的情景

密集的場景

In a Dense Scenario it would be desirable to distribute all taskels at once, to keep necessary IPC and context switching at a minimum. This means we want to create only as much chunks, as much worker processes there are. How already stated above, the weight of PO increases with shorter computation times per taskel.

For maximal throughput, we also want all worker processes busy until all tasks are processed (no idling workers). For this goal, the distributed chunks should be of equal size or close to.

Wide Scenario

The prime example for a Wide Scenario would be an optimization problem, where results either converge quickly or computation can take hours, if not days. Usually it is not predictable what mixture of "light taskels" and "heavy taskels" a task will contain in such a case, hence it's not advisable to distribute too many taskels in a task-batch at once. Distributing less taskels at once than possible, means increasing scheduling flexibility. This is needed here to reach our sub-goal of high utilization of all cores.

If Pool methods, by default, would be totally optimized for the Dense Scenario, they would increasingly create suboptimal timings for every problem located closer to the Wide Scenario.

4. Risks of Chunksize > 1

Consider this simplified pseudo-code example of a Wide Scenario-iterable, which we want to pass into a pool-method:

good_luck_iterable=[60,60,86400,60,86400,60,60,84600]

Instead of the actual values, we pretend to see the needed computation time in seconds, for simplicity only 1 minute or 1 day.

We assume the pool has four worker processes (on four cores) and chunksize is set to 2. Because the order will be kept, the chunks send to the workers will be these:

[(60,60),(86400,60),(86400,60),(60,84600)]

Since we have enough workers and the computation time is high enough, we can say, that every worker process will get a chunk to work on in the first place. (This does not have to be the case for fast completing tasks). Further we can say, the whole processing will take about 86400+60 seconds, because that's the highest total computation time for a chunk in this artificial scenario and we distribute chunks only once.

Now consider this iterable, which has only one element switching its position compared to the previous iterable:

bad_luck_iterable=[60,60,86400,86400,60,60,60,84600]

...and the corresponding chunks:

[(60,60),(86400,86400),(60,60),(60,84600)]

Just bad luck with the sorting of our iterable nearly doubled (86400+86400) our total processing time! The worker getting the vicious (86400, 86400)-chunk is blocking the second heavy taskel in its task from getting distributed to one of the idling workers already finished with their (60, 60)-chunks. We obviously would not risk such an unpleasant outcome if we set chunksize=1.

This is the risk of bigger chunksizes. With higher chunksizes we trade scheduling flexibility for less overhead and in cases like above, that's a bad deal.

How we will see in chapter 6. Quantifying Algorithm Efficiency, bigger chunksizes can also lead to suboptimal results for Dense Scenarios.

5. Pool's Chunksize-Algorithm

Below you will find a slightly modified version of the algorithm inside the source code. As you can see, I cut off the lower part and wrapped it into a function for calculating the chunksize argument externally. I also replaced 4 with a factor parameter and outsourced the len() calls.

# mp_utils.pydefcalc_chunksize(n_workers,len_iterable,factor=4):"""Calculate chunksize argument for Pool-methods.

Resembles source-code within `multiprocessing.pool.Pool._map_async`.

"""chunksize,extra=divmod(len_iterable,n_workers*factor)ifextra:chunksize+=1returnchunksize

To ensure we are all on the same page, here's what divmod does:

divmod(x, y) is a builtin function which returns (x//y, x%y).

x // y is the floor division, returning the down rounded quotient from x / y, while

x % y is the modulo operation returning the remainder from x / y.

Hence e.g. divmod(10, 3) returns (3, 1).

Now when you look at chunksize, extra = divmod(len_iterable, n_workers * 4), you will notice n_workers here is the divisor y in x / y and multiplication by 4, without further adjustment through if extra: chunksize +=1 later on, leads to an initial chunksize at least four times smaller (for len_iterable >= n_workers * 4) than it would be otherwise.

For viewing the effect of multiplication by 4 on the intermediate chunksize result consider this function:

defcompare_chunksizes(len_iterable,n_workers=4):"""Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize

for Pool's complete algorithm. Return chunksizes and the real factors by

which naive chunksizes are bigger.

"""cs_naive=len_iterable//n_workersor1# naive approachcs_pool1=len_iterable//(n_workers*4)or1# incomplete pool algo.cs_pool2=calc_chunksize(n_workers,len_iterable)real_factor_pool1=cs_naive/cs_pool1

real_factor_pool2=cs_naive/cs_pool2returncs_naive,cs_pool1,cs_pool2,real_factor_pool1,real_factor_pool2

The function above calculates the naive chunksize (cs_naive) and the first-step chunksize of Pool's chunksize-algorithm (cs_pool1), as well as the chunksize for the complete Pool-algorithm (cs_pool2). Further it calculates the real factors rf_pool1 = cs_naive / cs_pool1 and rf_pool2 = cs_naive / cs_pool2, which tell us how many times the naively calculated chunksizes are bigger than Pool's internal version(s).

Below you see two figures created with output from this function. The left figure just shows the chunksizes for n_workers=4 up until an iterable length of 500. The right figure shows the values for rf_pool1. For iterable length 16, the real factor becomes >=4(for len_iterable >= n_workers * 4) and it's maximum value is 7 for iterable lengths 28-31. That's a massive deviation from the original factor 4 the algorithm converges to for longer iterables. 'Longer' here is relative and depends on the number of specified workers.

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

Remember chunksize cs_pool1 still lacks the extra-adjustment with the remainder from divmod contained in cs_pool2 from the complete algorithm.

The algorithm goes on with:

ifextra:chunksize+=1

Now in cases were there is a remainder (an extra from the divmod-operation), increasing the chunksize by 1 obviously cannot work out for every task. After all, if it would, there would not be a remainder to begin with.

How you can see in the figures below, the "extra-treatment" has the effect, that the real factor for rf_pool2 now converges towards 4 from below 4 and the deviation is somewhat smoother. Standard deviation for n_workers=4 and len_iterable=500 drops from 0.5233 for rf_pool1 to 0.4115 for rf_pool2.

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

Eventually, increasing chunksize by 1 has the effect, that the last task transmitted only has a size of len_iterable % chunksize or chunksize.

The more interesting and how we will see later, more consequential, effect of the extra-treatment however can be observed for the number of generated chunks (n_chunks).

For long enough iterables, Pool's completed chunksize-algorithm (n_pool2 in the figure below) will stabilize the number of chunks at n_chunks == n_workers * 4.

In contrast, the naive algorithm (after an initial burp) keeps alternating between n_chunks == n_workers and n_chunks == n_workers + 1 as the length of the iterable grows.

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

Below you will find two enhanced info-functions for Pool's and the naive chunksize-algorithm. The output of this functions will be needed in the next chapter.

# mp_utils.pyfromcollectionsimportnamedtupleChunkinfo=namedtuple('Chunkinfo',['n_workers','len_iterable','n_chunks','chunksize','last_chunk'])defcalc_chunksize_info(n_workers,len_iterable,factor=4):"""Calculate chunksize numbers."""chunksize,extra=divmod(len_iterable,n_workers*factor)ifextra:chunksize+=1# `+ (len_iterable % chunksize > 0)` exploits that `True == 1`n_chunks=len_iterable//chunksize+(len_iterable%chunksize>0)# exploit `0 == False`last_chunk=len_iterable%chunksizeorchunksizereturnChunkinfo(n_workers,len_iterable,n_chunks,chunksize,last_chunk)

Don't be confused by the probably unexpected look of calc_naive_chunksize_info. The extra from divmod is not used for calculating the chunksize.

defcalc_naive_chunksize_info(n_workers,len_iterable):"""Calculate naive chunksize numbers."""chunksize,extra=divmod(len_iterable,n_workers)ifchunksize==0:chunksize=1n_chunks=extra

last_chunk=chunksizeelse:n_chunks=len_iterable//chunksize+(len_iterable%chunksize>0)last_chunk=len_iterable%chunksizeorchunksizereturnChunkinfo(n_workers,len_iterable,n_chunks,chunksize,last_chunk)

6. Quantifying Algorithm Efficiency

Now, after we have seen how the output of Pool's chunksize-algorithm looks different compared to output from the naive algorithm...

How to tell if Pool's approach actually improves something?

And what exactly could this something be?

As shown in the previous chapter, for longer iterables (a bigger number of taskels), Pool's chunksize-algorithm approximately divides the iterable into four times more chunks than the naive method. Smaller chunks mean more tasks and more tasks mean more Parallelization Overhead (PO), a cost which must be weighed against the benefit of increased scheduling-flexibility (recall "Risks of Chunksize>1").

For rather obvious reasons, Pool's basic chunksize-algorithm cannot weigh scheduling-flexibility against PO for us. IPC-overhead is OS-, hardware- and data-size dependent. The algorithm cannot know on what hardware we run our code, nor does it have a clue how long a taskel will take to finish. It's a heuristic providing basic functionality for all possible scenarios. This means it cannot be optimized for any scenario in particular. As mentioned before, PO also becomes increasingly less of a concern with increasing computation times per taskel (negative correlation).

When you recall the Parallelization Goals from chapter 2, one bullet-point was:

high utilization across all cpu-cores

The previously mentioned something, Pool's chunksize-algorithm can try to improve is the minimization of idling worker-processes, respectively the utilization of cpu-cores.

A repeating question on SO regarding multiprocessing.Pool is asked by people wondering about unused cores / idling worker-processes in situations where you would expect all worker-processes busy. While this can have many reasons, idling worker-processes towards the end of a computation are an observation we can often make, even with Dense Scenarios (equal computation times per taskel) in cases where the number of workers is not a divisor of the number of chunks (n_chunks % n_workers > 0).

The question now is:

How can we practically translate our understanding of chunksizes into something which enables us to explain observed worker-utilization, or even compare the efficiency of different algorithms in that regard?

6.1 Models

For gaining deeper insights here, we need a form of abstraction of parallel computations which simplifies the overly complex reality down to a manageable degree of complexity, while preserving significance within defined boundaries. Such an abstraction is called a model. An implementation of such a "Parallelization Model" (PM) generates worker-mapped meta-data (timestamps) as real computations would, if the data were to be collected. The model-generated meta-data allows predicting metrics of parallel computations under certain constraints.

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

One of two sub-models within the here defined PM is the Distribution Model (DM). The DM explains how atomic units of work (taskels) are distributed over parallel workers and time, when no other factors than the respective chunksize-algorithm, the number of workers, the input-iterable (number of taskels) and their computation duration is considered. This means any form of overhead is not included.

For obtaining a complete PM, the DM is extended with an Overhead Model (OM), representing various forms of Parallelization Overhead (PO). Such a model needs to be calibrated for each node individually (hardware-, OS-dependencies). How many forms of overhead are represented in a OM is left open and so multiple OMs with varying degrees of complexity can exist. Which level of accuracy the implemented OM needs is determined by the overall weight of PO for the specific computation. Shorter taskels lead to a higher weight of PO, which in turn requires a more precise OM if we were attempting to predict Parallelization Efficiencies (PE).

6.2 Parallel Schedule (PS)

The Parallel Schedule is a two-dimensional representation of the parallel computation, where the x-axis represents time and the y-axis represents a pool of parallel workers. The number of workers and the total computation time mark the extend of a rectangle, in which smaller rectangles are drawn in. These smaller rectangles represent atomic units of work (taskels).

Below you find the visualization of a PS drawn with data from the DM of Pool's chunksize-algorithm for the Dense Scenario.

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

The x-axis is sectioned into equal units of time, where each unit stands for the computation time a taskel requires.

The y-axis is divided into the number of worker-processes the pool uses.

A taskel here is displayed as the smallest cyan-colored rectangle, put into a timeline (a schedule) of an anonymized worker-process.

A task is one or multiple taskels in a worker-timeline continuously highlighted with the same hue.

Idling time units are represented through red colored tiles.

The Parallel Schedule is partitioned into sections. The last section is the tail-section.

The names for the composed parts can be seen in the picture below.

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

In a complete PM including an OM, the Idling Share is not limited to the tail, but also comprises space between tasks and even between taskels.

6.3 Efficiencies

Note:

Since earlier versions of this answer, "Parallelization Efficiency (PE)" has been renamed to "Distribution Efficiency (DE)".

PE now refers to overhead-including efficiency.

The Models introduced above allow quantifying the rate of worker-utilization. We can distinguish:

Distribution Efficiency (DE) - calculated with help of a DM (or a simplified method for the Dense Scenario).

Parallelization Efficiency (PE) - either calculated with help of a calibrated PM (prediction) or calculated from meta-data of real computations.

It's important to note, that calculated efficiencies do not automatically correlate with faster overall computation for a given parallelization problem. Worker-utilization in this context only distinguishes between a worker having a started, yet unfinished taskel and a worker not having such an "open" taskel. That means, possible idling during the time span of a taskel is not registered.

All above mentioned efficiencies are basically obtained by calculating the quotient of the division Busy Share / Parallel Schedule. The difference between DE and PE comes with the Busy Share

occupying a smaller portion of the overall Parallel Schedule for the overhead-extended PM.

This answer will further only discuss a simple method to calculate DE for the Dense Scenario. This is sufficiently adequate to compare different chunksize-algorithms, since...

... the DM is the part of the PM, which changes with different chunksize-algorithms employed.

... the Dense Scenario with equal computation durations per taskel depicts a "stable state", for which these time spans drop out of the equation. Any other scenario would just lead to random results since the ordering of taskels would matter.

6.3.1 Absolute Distribution Efficiency (ADE)

This basic efficiency can be calculated in general by dividing the Busy Share through the whole potential of the Parallel Schedule:

Absolute Distribution Efficiency (ADE) = Busy Share / Parallel Schedule

For the Dense Scenario, the simplified calculation-code looks like this:

# mp_utils.pydefcalc_ade(n_workers,len_iterable,n_chunks,chunksize,last_chunk):"""Calculate Absolute Distribution Efficiency (ADE).

`len_iterable` is not used, but contained to keep a consistent signature

with `calc_rde`.

"""ifn_workers==1:return1potential=(((n_chunks//n_workers+(n_chunks%n_workers>1))*chunksize)+(n_chunks%n_workers==1)*last_chunk)*n_workers

n_full_chunks=n_chunks-(chunksize>last_chunk)taskels_in_regular_chunks=n_full_chunks*chunksize

real=taskels_in_regular_chunks+(chunksize>last_chunk)*last_chunk

ade=real/potentialreturnade

If there is no Idling Share, Busy Share will be equal to Parallel Schedule, hence we get an ADE of 100%. In our simplified model, this is a scenario where all available processes will be busy through the whole time needed for processing all tasks. In other words, the whole job gets effectively parallelized to 100 percent.

But why do I keep referring to PE as absolute PE here?

To comprehend that, we have to consider a possible case for the chunksize (cs) which ensures maximal scheduling flexibility (also, the number of Highlanders there can be. Coincidence?):

___________________________________~ ONE ~___________________________________

If we, for example, have four worker-processes and 37 taskels, there will be idling workers even with chunksize=1, just because n_workers=4 is not a divisor of 37. The remainder of dividing 37 / 4 is 1. This single remaining taskel will have to be processed by a sole worker, while the remaining three are idling.

Likewise, there will still be one idling worker with 39 taskels, how you can see pictured below.

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

When you compare the upper Parallel Schedule for chunksize=1 with the below version for chunksize=3, you will notice that the upper Parallel Schedule is smaller, the timeline on the x-axis shorter. It should become obvious now, how bigger chunksizes unexpectedly also can lead to increased overall computation times, even for Dense Scenarios.

But why not just use the length of the x-axis for efficiency calculations?

Because the overhead is not contained in this model. It will be different for both chunksizes, hence the x-axis is not really directly comparable. The overhead can still lead to a longer total computation time like shown in case 2 from the figure below.

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

6.3.2 Relative Distribution Efficiency (RDE)

The ADE value does not contain the information if a better distribution of taskels is possible with chunksize set to 1. Better here still means a smaller Idling Share.

To get a DE value adjusted for the maximum possible DE, we have to divide the considered ADE through the ADE we get for chunksize=1.

Relative Distribution Efficiency (RDE) = ADE_cs_x / ADE_cs_1

Here is how this looks in code:

# mp_utils.pydefcalc_rde(n_workers,len_iterable,n_chunks,chunksize,last_chunk):"""Calculate Relative Distribution Efficiency (RDE)."""ade_cs1=calc_ade(n_workers,len_iterable,n_chunks=len_iterable,chunksize=1,last_chunk=1)ade=calc_ade(n_workers,len_iterable,n_chunks,chunksize,last_chunk)rde=ade/ade_cs1returnrde

RDE, how defined here, in essence is a tale about the tail of a Parallel Schedule. RDE is influenced by the maximum effective chunksize contained in the tail. (This tail can be of x-axis length chunksize or last_chunk.)

This has the consequence, that RDE naturally converges to 100% (even) for all sorts of "tail-looks" like shown in the figure below.

python邏輯量有什麼_Python多處理:了解`chunksize`背後的邏輯

A low RDE ...

is a strong hint for optimization potential.

naturally gets less likely for longer iterables, because the relative tail-portion of the overall Parallel Schedule shrinks.

find Part II of this answer here below.