天天看点

Celery的第四个demo--启动定时任务

一. 安装并启动redis

pip install redis
           

二. 在随便一个合适的路径新建一个文件夹名为fourth_celerydemo,文件夹下新建如下四个文件

celery.py

# 拒绝隐式引入,因为celery.py名字和celery的报名冲突
from __future__ import absolute_import
from celery import Celery

# app是Celery的实例,third_celerydemo.tasks这个模块
app = Celery("fourth_celerydemo", include=["third_celerydemo.tasks"])
#使用config_from_object来加载存放在celeryconfig.py里面的关于celery的配置
app.config_from_object("fourth_celerydemo.celeryconfig")

if __name__ = "__main__":
    app.start()
           

celeryconfig.py

# coding = utf-8

from kombu import Queue
from datetime import timedelta

# 指定消息中间件,又称为消息代理
BROKER_URL = 'redis://localhost:6379/1'
# 任务处理完保存状态信息和结果,以供查询
CELERY_RESULT_BACKEND = 'redis/localhost:6379/0'
# 客户端和消费者之间传输数据需要序列化和反序列化
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ["json"]

CELERY_QUEUES = ( # 定义任务队列
	Queue('default', routing_key='task.#'),  # 路由键以"task."开头的消息都进default队列
    Queue('web_tasks', routing_key='web.#'), # 路由键以"web."开头的消息都进web_tasks队列
)

CELERY_DEFAULT_EXCHANGE = 'tasks'  # 默认的交换机的名字为tasks
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' # 默认的交换类型是topic
CELERY_DEFAULT_ROUTING_KEY = 'task.default'  # 默认的路由键是task.default, 这个路由键符合上面的default队列

CELERY_ROUTES = {
    'fourth_celerydemo.tasks.add': { # tasks.add的消息会进入web_tasks队列
        'queue': 'web_tasks',
        'routing_key':'web.add',
    }
}

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'fourth_celerydemo.tasks.add',
        'schedule':timedelta(second=2),
        'args':(1,25)
    }
}
           

tasks.py

from __future__ import absolute_import
from fourth_celerydemo import app

@app.task
def add(x,y):
	return x + y
           

__init__.py

这个文件里面什么都不用写,只是存在这个

__init__

文件的作用是将这个文件夹为一个python模块

在一个终端启动Beat程序:

celery beat -A fourth_celerydemo
           

在另一个终端启动worker进程:

在这个fourth_celerydemo文件夹的上一层目录执行如下命令:

celery -A fourth_celerydemo worker -Q web_tasks -l info  //启动worker,worker就是消费者。 -l info表示的是log的级别, -Q web_tasks表示worker只会执行web_tasks队列里面的任务,这样就可以合理安排消费者数量,让web_tasks里面的任务优先级更高
           

之后就会看到每两秒都会自动执行一次tasks.add

注:Beat和celery一起启动的命令是:

celery -B -A fourth_celerydemo worker -l info
           

https://github.com/Ran-oops/celery_learning/tree/master/fourth_celerydemo