天天看點

[源碼解析] 并行分布式架構 Celery 之 worker 啟動 (1)

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。Celery 是調用其Worker 元件來完成具體任務處理。本文我們講解worker的啟動過程。

目錄

[源碼解析] 并行分布式架構 Celery 之 worker 啟動 (1)

0x00 摘要

0x01 Celery的架構

0x02 示例代碼

0x03 邏輯概述

0x04 Celery應用

4.1 添加子command

4.2 入口點

4.3 緩存屬性cached_property

0x05 Celery 指令

0x06 worker 子指令

0x07 Worker application

0xFF 參考

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。Celery 是調用其Worker 元件來完成具體任務處理。

是以我們本文就來講解 worker 的啟動過程。

前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。

[源碼分析] 消息隊列 Kombu 之 mailbox

[源碼分析] 消息隊列 Kombu 之 Hub

[源碼分析] 消息隊列 Kombu 之 Consumer

[源碼分析] 消息隊列 Kombu 之 Producer

[源碼分析] 消息隊列 Kombu 之 啟動過程

[源碼解析] 消息隊列 Kombu 之 基本架構

以及

源碼解析 并行分布式架構 Celery 之架構 (2)

[源碼解析] 并行分布式架構 Celery 之架構 (2)

下面我們再回顧下 Celery 的結構。Celery的架構圖如下所示:

其實網上難以找到調試Celery worker的辦法。我們可以去其源碼看看,發現如下:

是以我們可以模仿來進行,使用如下啟動worker,進行調試。

當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel, 這個連接配接可以有多個channel。然後,worker就會去borker的隊列裡面取相應的task來進行消費了,這也是典型的消費者生産者模式。

這個worker主要是有四部分組成的,task_pool, consumer, scheduler, mediator。其中,task_pool主要是用來存放的是一些worker,當啟動了一個worker,并且提供并發參數的時候,會将一些worker放在這裡面。

celery預設的并發方式是prefork,也就是多程序的方式,這裡隻是celery對multiprocessing pool進行了輕量的改造,然後給了一個新的名字叫做prefork,這個pool與多程序的程序池的差別就是這個task_pool隻是存放一些運作的worker。

consumer也就是消費者,主要是從broker那裡接受一些message,然後将message轉化為<code>celery.worker.request.Request</code> 的一個執行個體。

Celery 在适當的時候,會把這個請求包裝進Task中,Task就是用裝飾器app_celery.task()裝飾的函數所生成的類,是以可以在自定義的任務函數中使用這個請求參數,擷取一些關鍵的資訊。此時,已經了解了task_pool和consumer。

接下來,這個worker具有兩套資料結構,這兩套資料結構是并行運作的,他們分别是 'ET時刻表' 、就緒隊列。

就緒隊列:那些 立刻就需要運作的task, 這些task到達worker的時候會被放到這個就緒隊列中等待consumer執行。

我們下面看看如何啟動Celery。

程式首先會來到Celery類,這是Celery的應用。

可以看到主要就是:各種類名稱,TLS, 初始化之後的各種signal。

位置在:celery/app/base.py,其定義如下:

對于我們的示例代碼,入口是:

celery/bin/celery.py 會進行添加 子command,我們可以看出來。

這些 Commnd 是可以在指令行作為子指令直接使用的。

每一個都是command。我們以worker為例,具體如下:

然後會引入Celery 指令入口點 Celery。

Celery 中,大量的成員變量是被cached_property修飾的。

使用 cached_property修飾過的函數,就變成是對象的屬性,該對象第一次引用該屬性時,會調用函數,對象第二次引用該屬性時就直接從詞典中取了,即 Caches the return value of the get method on first call。

很多知名Python項目都自己實作過 cached_property,比如Werkzeug,Django。

因為太有用,是以 Python 3.8 給 functools 子產品添加了 cached_property 類,這樣就有了官方的實作。

Celery 的代碼舉例如下:

是以,最終,Celery的内容應該是這樣的:

具體部分成員變量舉例如下圖:

Celery的指令總入口為celery方法,具體在:celery/bin/celery.py。

代碼縮減版如下:

在方法中,會周遊celery.commands,拓展param,具體如下。這些 commands 就是之前剛剛提到的子Command:

Work子指令是 Command 總指令的一員,也是我們直接在指令行加入 worker 參數時候,調用到的子指令。

worker 子指令繼承了click.BaseCommand,為。

定義在celery/bin/worker.py。

是以如下代碼間接調用到 worker 指令:

定義如下:

此時流程如下圖,可以看到,從 Celery 應用就進入到了具體的 worker 指令:

此時在該函數中會執行個體化app的Worker,Worker application 就是 worker 的執行個體。此時的app就是前面定義的Celery類的執行個體app。

定義在:celery/app/base.py。

此時subclass_with_self利用了Python的type動态生成類執行個體的屬性。

此時就已經從 worker 指令 得到了一個celery.apps.worker:Worker的執行個體,然後調用該執行個體的start方法,此時首先分析一下Worker類的執行個體化的過程。

我們先回顧下:

我們的執行從 worker_main 這個程式入口,來到了 Celery 應用。然後進入了 Celery Command,然後又進入到了 Worker 子Command,具體如下圖。

下面就會正式進入 worker,Celery 把 worker 的正式邏輯成為 work as a program。

我們在下文将接下來繼續看後續 work as a program 的啟動過程。

Celery 源碼學習(二)多程序模型

celery原理初探

celery源碼分析-wroker初始化分析(上)

celery源碼分析-worker初始化分析(下)

celery worker初始化--DAG實作

python celery多worker、多隊列、定時任務

celery 詳細教程-- Worker篇

使用Celery

Celery 源碼解析一:Worker 啟動流程概述

Celery 源碼解析二:Worker 的執行引擎

Celery 源碼解析三:Task 對象的實作

Celery 源碼解析四:定時任務的實作

Celery 源碼解析五:遠端控制管理

Celery 源碼解析六:Events 的實作

Celery 源碼解析七:Worker 之間的互動

Celery 源碼解析八:State 和 Result