Celery的架構
- Celery包含如下元件:
- Celery Beat:任務排程器,Beat程序會讀取配置檔案的内容,周期性地将配置中到期需要執行的任務發送給任務隊列(一般用于定時任務使用)。
- Celery Worker:執行任務的消費者,通常會在多台伺服器運作多個消費者來提高執行效率。
- Broker:消息代理,或者叫作消息中間件,接受任務生産者發送過來的任務消息,存進隊列再按序分發給任務消費方(本方案使用redis)。
- Producer:調用了Celery提供的API、函數或者裝飾器而産生任務并交給任務隊列處理的都是任務生産者。
- Result Backend:任務處理完後儲存狀态資訊和結果,以供查詢。(本方案使用redis來存儲結果)
Celery的架構圖如圖所示。
根據celery架構圖将分布式設計方案如下。
- 任務釋出者:産品在背景添加股票完成之後,點選回測,然後分發任務都不同的機器。
- 任務排程:按照設定的時間調用delay方法。
- 消息代理:使用redis來存儲股票代碼。每次執行的股票代碼都是從redis讀取。
- 任務消費者:在不同的機器上開啟worker,執行排程的股票進行回測。
- 回測結果:存放在redis中,然後讀取出渲染在背景上展示出來。
需要用到
from kombu import Queue
from flask import Flask
部分代碼如下:
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_QUEUES = ( # 定義任務隊列
Queue("default", routing_key="distributed.#"),
Queue("tasks_A", routing_key="A.#"),
Queue("tasks_B", routing_key="B.#"),
)
CELERY_ROUTES = (
[
("web_management.web.trade.distributed.add", {"queue": "default"}),
("web_management.web.trade.distributed.taskA", {"queue": "tasks_A"}),
("web_management.web.trade.distributed.taskB", {"queue": "tasks_B"}),
],
)
CELERY_RESULT_SERIALIZER = "json"
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://' + ppt.redis_ip + ':' + ppt.redis_port + '/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://' + ppt.redis_ip + ':' + ppt.redis_port + '/1'
app.config['CELERY_QUEUES'] = CELERY_QUEUES
app.config['CELERY_TIMEZONE'] = CELERY_TIMEZONE
app.config['CELERY_ROUTES'] = CELERY_ROUTES
app.config['CELERY_RESULT_SERIALIZER'] = CELERY_RESULT_SERIALIZER
app.config['CELERY_TASK_RESULT_EXPIRES'] = CELERY_TASK_RESULT_EXPIRES
celery = Celery('distributed', backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
開啟work的方式
celery worker -A web_management.web.trade.distributed.celery -Q tasks_A --concurrency=30 -l info -Q後面為方法對應的隊列名稱 --concurrency= 後面 為開啟的worker數目,也可以使用 -c= 具體開啟是數目根據你的電腦CPU個數确定,小于等于cpu個數即可
備注
- celery 使用4.1.1版本
kombu 使用4.2.0版本
*先安裝kombu,然後安裝celery
- 可以解決如圖問題