将要执行异步任务脚本 tasks.py:
from celery import Celery
from celery import group
# host='10.32.21.52', port=6379, db=3
app = Celery('tasks', backend = 'redis://10.32.21.52:6379/14', broker='redis://10.32.21.52:6379/15')
@app.task
def add(x, y):
return x + y
@app.task
def tsum(ite):
return sum(ite)
@app.task(trail=True)
def A(how_many):
return group(B.s(i) for i in range(how_many))()
@app.task(trail=True)
def B(i):
return pow2.delay(i)
@app.task(trail=True)
def pow2(i):
return i ** 2
其中:app中对celery进行配置;详细的配置可以参考文档:
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configuration启动任务:
celery -A tasks worker --loglevel=info
启动后有如图所示的输出,显示服务器及配置信息和任务运行日志;
通过脚本调用任务:
from tasks import add
from tasks import tsum
from tasks import A
from celery import chord
result = add.delay(4, 4)
~tsum.map([range(10), range(100)])
~add.starmap([item for item in zip(range(10), range(10))])
# chord(add.s(i, i) for i in xrange(100))(tsum.s()).get()
chord((add.s(i, i) for i in xrange(100)), tsum.s())()
add.chunks(zip(xrange(100), xrange(100)), 10)()
print(result.backend)
print result.get()
result = A.delay(10)
写的任务太简单了,比单机还慢,只能上抓取任务试试了~
celery通过装饰器把各个任务抽象为消息发送给第三方的插件调度,如:redis;然后把任务分配给各个启动了celery的服务的机器去执行任务,有空研究一下源码再继续分享。