Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。Celery 是調用其Worker 元件來完成具體任務處理。本文我們講解worker的啟動過程。
目錄
[源碼解析] 并行分布式架構 Celery 之 worker 啟動 (1)
0x00 摘要
0x01 Celery的架構
0x02 示例代碼
0x03 邏輯概述
0x04 Celery應用
4.1 添加子command
4.2 入口點
4.3 緩存屬性cached_property
0x05 Celery 指令
0x06 worker 子指令
0x07 Worker application
0xFF 參考
Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。Celery 是調用其Worker 元件來完成具體任務處理。
是以我們本文就來講解 worker 的啟動過程。
前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。
[源碼分析] 消息隊列 Kombu 之 mailbox
[源碼分析] 消息隊列 Kombu 之 Hub
[源碼分析] 消息隊列 Kombu 之 Consumer
[源碼分析] 消息隊列 Kombu 之 Producer
[源碼分析] 消息隊列 Kombu 之 啟動過程
[源碼解析] 消息隊列 Kombu 之 基本架構
以及
源碼解析 并行分布式架構 Celery 之架構 (2)
[源碼解析] 并行分布式架構 Celery 之架構 (2)
下面我們再回顧下 Celery 的結構。Celery的架構圖如下所示:
其實網上難以找到調試Celery worker的辦法。我們可以去其源碼看看,發現如下:
是以我們可以模仿來進行,使用如下啟動worker,進行調試。
當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel, 這個連接配接可以有多個channel。然後,worker就會去borker的隊列裡面取相應的task來進行消費了,這也是典型的消費者生産者模式。
這個worker主要是有四部分組成的,task_pool, consumer, scheduler, mediator。其中,task_pool主要是用來存放的是一些worker,當啟動了一個worker,并且提供并發參數的時候,會将一些worker放在這裡面。
celery預設的并發方式是prefork,也就是多程序的方式,這裡隻是celery對multiprocessing pool進行了輕量的改造,然後給了一個新的名字叫做prefork,這個pool與多程序的程序池的差別就是這個task_pool隻是存放一些運作的worker。
consumer也就是消費者,主要是從broker那裡接受一些message,然後将message轉化為<code>celery.worker.request.Request</code> 的一個執行個體。
Celery 在适當的時候,會把這個請求包裝進Task中,Task就是用裝飾器app_celery.task()裝飾的函數所生成的類,是以可以在自定義的任務函數中使用這個請求參數,擷取一些關鍵的資訊。此時,已經了解了task_pool和consumer。
接下來,這個worker具有兩套資料結構,這兩套資料結構是并行運作的,他們分别是 'ET時刻表' 、就緒隊列。
就緒隊列:那些 立刻就需要運作的task, 這些task到達worker的時候會被放到這個就緒隊列中等待consumer執行。
我們下面看看如何啟動Celery。
程式首先會來到Celery類,這是Celery的應用。
可以看到主要就是:各種類名稱,TLS, 初始化之後的各種signal。
位置在:celery/app/base.py,其定義如下:
對于我們的示例代碼,入口是:
celery/bin/celery.py 會進行添加 子command,我們可以看出來。
這些 Commnd 是可以在指令行作為子指令直接使用的。
每一個都是command。我們以worker為例,具體如下:
然後會引入Celery 指令入口點 Celery。
Celery 中,大量的成員變量是被cached_property修飾的。
使用 cached_property修飾過的函數,就變成是對象的屬性,該對象第一次引用該屬性時,會調用函數,對象第二次引用該屬性時就直接從詞典中取了,即 Caches the return value of the get method on first call。
很多知名Python項目都自己實作過 cached_property,比如Werkzeug,Django。
因為太有用,是以 Python 3.8 給 functools 子產品添加了 cached_property 類,這樣就有了官方的實作。
Celery 的代碼舉例如下:
是以,最終,Celery的内容應該是這樣的:
具體部分成員變量舉例如下圖:
Celery的指令總入口為celery方法,具體在:celery/bin/celery.py。
代碼縮減版如下:
在方法中,會周遊celery.commands,拓展param,具體如下。這些 commands 就是之前剛剛提到的子Command:
Work子指令是 Command 總指令的一員,也是我們直接在指令行加入 worker 參數時候,調用到的子指令。
worker 子指令繼承了click.BaseCommand,為。
定義在celery/bin/worker.py。
是以如下代碼間接調用到 worker 指令:
定義如下:
此時流程如下圖,可以看到,從 Celery 應用就進入到了具體的 worker 指令:
此時在該函數中會執行個體化app的Worker,Worker application 就是 worker 的執行個體。此時的app就是前面定義的Celery類的執行個體app。
定義在:celery/app/base.py。
此時subclass_with_self利用了Python的type動态生成類執行個體的屬性。
此時就已經從 worker 指令 得到了一個celery.apps.worker:Worker的執行個體,然後調用該執行個體的start方法,此時首先分析一下Worker類的執行個體化的過程。
我們先回顧下:
我們的執行從 worker_main 這個程式入口,來到了 Celery 應用。然後進入了 Celery Command,然後又進入到了 Worker 子Command,具體如下圖。
下面就會正式進入 worker,Celery 把 worker 的正式邏輯成為 work as a program。
我們在下文将接下來繼續看後續 work as a program 的啟動過程。
Celery 源碼學習(二)多程序模型
celery原理初探
celery源碼分析-wroker初始化分析(上)
celery源碼分析-worker初始化分析(下)
celery worker初始化--DAG實作
python celery多worker、多隊列、定時任務
celery 詳細教程-- Worker篇
使用Celery
Celery 源碼解析一:Worker 啟動流程概述
Celery 源碼解析二:Worker 的執行引擎
Celery 源碼解析三:Task 對象的實作
Celery 源碼解析四:定時任務的實作
Celery 源碼解析五:遠端控制管理
Celery 源碼解析六:Events 的實作
Celery 源碼解析七:Worker 之間的互動
Celery 源碼解析八:State 和 Result