天天看點

Celery的第四個demo--啟動定時任務

一. 安裝并啟動redis

pip install redis
           

二. 在随便一個合适的路徑建立一個檔案夾名為fourth_celerydemo,檔案夾下建立如下四個檔案

celery.py

# 拒絕隐式引入,因為celery.py名字和celery的報名沖突
from __future__ import absolute_import
from celery import Celery

# app是Celery的執行個體,third_celerydemo.tasks這個子產品
app = Celery("fourth_celerydemo", include=["third_celerydemo.tasks"])
#使用config_from_object來加載存放在celeryconfig.py裡面的關于celery的配置
app.config_from_object("fourth_celerydemo.celeryconfig")

if __name__ = "__main__":
    app.start()
           

celeryconfig.py

# coding = utf-8

from kombu import Queue
from datetime import timedelta

# 指定消息中間件,又稱為消息代理
BROKER_URL = 'redis://localhost:6379/1'
# 任務處理完儲存狀态資訊和結果,以供查詢
CELERY_RESULT_BACKEND = 'redis/localhost:6379/0'
# 用戶端和消費者之間傳輸資料需要序列化和反序列化
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ["json"]

CELERY_QUEUES = ( # 定義任務隊列
	Queue('default', routing_key='task.#'),  # 路由鍵以"task."開頭的消息都進default隊列
    Queue('web_tasks', routing_key='web.#'), # 路由鍵以"web."開頭的消息都進web_tasks隊列
)

CELERY_DEFAULT_EXCHANGE = 'tasks'  # 預設的交換機的名字為tasks
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' # 預設的交換類型是topic
CELERY_DEFAULT_ROUTING_KEY = 'task.default'  # 預設的路由鍵是task.default, 這個路由鍵符合上面的default隊列

CELERY_ROUTES = {
    'fourth_celerydemo.tasks.add': { # tasks.add的消息會進入web_tasks隊列
        'queue': 'web_tasks',
        'routing_key':'web.add',
    }
}

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'fourth_celerydemo.tasks.add',
        'schedule':timedelta(second=2),
        'args':(1,25)
    }
}
           

tasks.py

from __future__ import absolute_import
from fourth_celerydemo import app

@app.task
def add(x,y):
	return x + y
           

__init__.py

這個檔案裡面什麼都不用寫,隻是存在這個

__init__

檔案的作用是将這個檔案夾為一個python子產品

在一個終端啟動Beat程式:

celery beat -A fourth_celerydemo
           

在另一個終端啟動worker程序:

在這個fourth_celerydemo檔案夾的上一層目錄執行如下指令:

celery -A fourth_celerydemo worker -Q web_tasks -l info  //啟動worker,worker就是消費者。 -l info表示的是log的級别, -Q web_tasks表示worker隻會執行web_tasks隊列裡面的任務,這樣就可以合理安排消費者數量,讓web_tasks裡面的任務優先級更高
           

之後就會看到每兩秒都會自動執行一次tasks.add

注:Beat和celery一起啟動的指令是:

celery -B -A fourth_celerydemo worker -l info
           

https://github.com/Ran-oops/celery_learning/tree/master/fourth_celerydemo