天天看點

Celery-4.1 使用者指南: Extensions and Bootsteps

自定義消息消費者

你可能想要嵌入自定義的 Kombu 消費者來手動處理你的消息。

為了達到這個目的,celery 提供了一個

ConsumerStep

bootstep 類,你隻需要定義

get_consumers

方法,它必須傳回一個

kombu.Consumer

對象的清單,當連接配接建立時,這些對象将會啟動。

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

def send_me_a_message(who, producer=None):
    with app.producer_or_acquire(producer) as producer:
        producer.publish(
            {'hello': who},
            serializer='json',
            exchange=my_queue.exchange,
            routing_key='routing_key',
            declare=[my_queue],
            retry=True,
        )

if __name__ == '__main__':
    send_me_a_message('world!')
           

注意:

Kombu Consumer 使用了兩種不同的消息回掉分發機制。第一種是接收一個回調函數清單,回調函數簽名是

(body, message)

,另一種接收一個

on_message

參數,一個簽名為

(message,)

的回調函數。後一種不是自動解碼和反序列化負載。

def get_consumers(self, channel):
    return [Consumer(channel, queues=[my_queue],
                     on_message=self.on_message)]


def on_message(self, message):
    payload = message.decode()
    print(
        'Received message: {0!r} {props!r} rawlen={s}'.format(
        payload, props=message.properties, s=len(message.body),
    ))
    message.ack()
           

Blueprints

Bootsteps 是一個給工作單元添加功能的技術。一個 bootstep 是一個自定義的類,它自定義了一些在工作單元的不同階段執行的操作。每個 bootstep 屬于一個 blueprint,并且工作單元目前定義了兩個 blueprints: Worker 和 Consumer。

圖A: Worker 和 Consumer 中的 Bootsteps。從底至上,Worker blueprint 中的第一步是 Timer,最後一步是啟動 Consumer blueprint,然後是建立與消息中間件的連接配接并且開始消費消息。

Celery-4.1 使用者指南: Extensions and Bootsteps

工作單元

工作單元是開啟的第一個blueprint,并且随着它啟動一些主要元件,如 event loop, processing pool, ETA任務的定時器以及其他定時事件。

當工作單元完全啟動,它将繼續啟動 Consumer blueprint,用來設定任務怎麼被執行、連接配接到消息中間件以及啟動消息消費者。

WorkController

是核心的工作單元實作,并且包含了一些你能在自定義的bootstep中使用的方法和屬性。

屬性

  • app

    目前 app 應用執行個體

  • hostname

    工作單元節點名稱 (例如: [email protected])

  • blueprint

    工作單元 Blueprint

  • hub

    消息循環對象(Hub)。你可以用來在事件循環中注冊回調函數。

這隻在啟用了異步IO的傳輸層(amqp, redis)上有支援,此時

worker.use_eventloop

屬性應該被設定。

你的工作單元 bootstep 必須需要 Hub bootstep 來使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}
           
  • pool

    目前的

    process/eventlet/gevent/thread

    池。檢視

    celery.concurrency.base.BasePool

你的工作單元 bootstep 必須需要 Pool bootstep 來使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}
           
  • timer

    排程函數的定時器。

你的工作單元 bootstep 必須需要 Timer bootstep 來使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}
           
  • statedb

    資料庫

    <celery.worker.state.Persistent>

    用來在工作單元重新開機之間持久化狀态。

statedb

參數被啟用時它才被定義。

你的工作單元 bootstep 必須需要 State bootstep 來使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Statedb'}
           
  • autoscacler

    Autoscaler

    用來自動擴充和收縮池中的程序數。

    autoscaler

    參數被啟用時它才被定義。

    你的工作單元 bootstep 必須需要 Autoscaler bootstep 來使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoscaler:Autoscaler',)
           
  • autoreloader

    Autoreloader

    用來在檔案系統發生改變時自動重新加載代碼。

    autoreloader

    參數被啟用時它才被定義。

    你的工作單元 bootstep 必須需要 Autoreloader bootstep 來使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoreloader:Autoreloader',)
           

工作單元bootstep示例

工作單元

bootstep

示例:

from celery import bootsteps

class ExampleWorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}

    def __init__(self, worker, **kwargs):
        print('Called when the WorkController instance is constructed')
        print('Arguments to WorkController: {0!r}'.format(kwargs))

    def create(self, worker):
        # this method can be used to delegate the action methods
        # to another object that implements ``start`` and ``stop``.
        return self

    def start(self, worker):
        print('Called when the worker is started.')

    def stop(self, worker):
        print('Called when the worker shuts down.')

    def terminate(self, worker):
        print('Called when the worker terminates')
           

每個方法都将

WorkController

執行個體作為第一個參數進行傳遞。

另一個示例使用定時器在規定的時間間隔進行喚醒:

from celery import bootsteps


class DeadlockDetection(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}

    def __init__(self, worker, deadlock_timeout=):
        self.timeout = deadlock_timeout
        self.requests = []
        self.tref = None

    def start(self, worker):
        # run every 30 seconds.
        self.tref = worker.timer.call_repeatedly(
            , self.detect, (worker,), priority=,
        )

    def stop(self, worker):
        if self.tref:
            self.tref.cancel()
            self.tref = None

    def detect(self, worker):
        # update active requests
        for req in worker.active_requests:
            if req.time_start and time() - req.time_start > self.timeout:
                raise SystemExit()
           

消費者

消費者blueprint建立一個與消息中間件的連接配接,并且每次連接配接丢失時将重新開始。消費者bootsteps 包括工作單元心跳、遠端控制指令消費者,以及最重要的任務消費者。

當你建立自定義的消費者 bootsteps,你必須考慮到它必須能夠重新開機你的blueprint。一個附加的

shutdown

方法必須在每一個消費者 bootstep 中定義,這個方法在工作單元被關閉時調用。

屬性

  • app

    目前 app 應用執行個體

  • hostname

    工作單元節點名稱 (例如: [email protected])

  • blueprint

    工作單元 Blueprint

  • hub

    消息循環對象(Hub)。你可以用來在事件循環中注冊回調函數。

這隻在啟用了異步IO的傳輸層(amqp, redis)上有支援,此時

worker.use_eventloop

屬性應該被設定。

你的工作單元 bootstep 必須需要 Hub bootstep 來使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}
           
  • connection

    目前的消息中間件連接配接

    kombu.Connection

一個消費者bootstep必須需要

Connection

bootstep來使用它:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.connection:Connection'}
           
  • event_dispatcher

    一個

    app.events.Dispatcher

    對象可以用來發送事件。

一個消費者bootstep必須需要

Events

bootstep來使用它:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.events:Events'}
           
  • gossip

    工作單元到工作單元的廣播通信(

    Gossip

    )

一個消費者bootstep必須需要

Gossip

bootstep來使用它:

class RatelimitStep(bootsteps.StartStopStep):
    """Rate limit tasks based on the number of workers in the
    cluster."""
    requires = {'celery.worker.consumer.gossip:Gossip'}

    def start(self, c):
        self.c = c
        self.c.gossip.on.node_join.add(self.on_cluster_size_change)
        self.c.gossip.on.node_leave.add(self.on_cluster_size_change)
        self.c.gossip.on.node_lost.add(self.on_node_lost)
        self.tasks = [
            self.app.tasks['proj.tasks.add']
            self.app.tasks['proj.tasks.mul']
        ]
        self.last_size = None

    def on_cluster_size_change(self, worker):
        cluster_size = len(list(self.c.gossip.state.alive_workers()))
        if cluster_size != self.last_size:
            for task in self.tasks:
                task.rate_limit =  / cluster_size
            self.c.reset_rate_limits()
            self.last_size = cluster_size

    def on_node_lost(self, worker):
        # may have processed heartbeat too late, so wake up soon
        # in order to see if the worker recovered.
        self.c.timer.call_after(, self.on_cluster_size_change)
           

回調

- gossip.on.node_join

當一個新的節點加入到叢集中時調用,提供一個工作單元執行個體參數

  • gossip.on.node_leave

當一個新的節點離開到叢集中時調用(關閉時),提供一個工作單元執行個體參數

  • gossip.on.node_lost

    當叢集中工作單元的心跳丢失(心跳沒有及時收到或者處理),提供一個工作單元執行個體參數

  • pool

    目前 process/eventlet/gevent/thread 池

檢視

celery.concurrency.base.BasePool

  • timer

    定時器

    celery.utils.timer2.Schedule

    用來排程函數
  • heart

    負責發送工作單元事件心跳 (Heart)

消費者bootstep必須需要

Heartbeat

bootstep來使用它:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.heart:Heart'}
           
  • task_consumer

    kombu.Consumer

    對象用來消費任務消息

消費者bootstep必須需要

Tasks

bootstep來使用它:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}
           
  • strategies

    每個已經注冊的任務在這個映射中都有一項, 值用來執行一個進來的該類型的消息 (任務執行政策)。這個映射是在消費者啟動時由 Tasks bootstep 産生的:

for name, task in app.tasks.items():
    strategies[name] = task.start_strategy(app, consumer)
    task.__trace__ = celery.app.trace.build_tracer(
        name, task, loader, hostname
    )
           

消費者bootstep必須需要

Tasks

bootstep來使用它:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}
           
  • task_buckets

    一個根據任務類型查找速率限制的

    defaultdict

    。這個字典中的項可以為

    None

    (沒有限制),或者

    TokenBucket

    執行個體,實作了 consume(tokens) 和 expected_time(tokens)。

TokenBucket

實作了 token bucket 算法,隻要遵循相同接口并且定義了者兩個方法的任何算法都可以被使用。

  • qos

    QoS

    對象可以用來修改任務通道目前的 prefetch 值:
# increment at next cycle
consumer.qos.increment_eventually()
# decrement at next cycle
consumer.qos.decrement_eventually()
consumer.qos.set()
           

方法

  • consumer.reset_rate_limits()

    為所有注冊的任務類型更新 task_buckets 映射

  • consumer.bucket_for_task(type, Bucket=TokenBucket)

    使用

    task.rate_limit

    屬性為一個任務建立速率限制bucket。
  • consumer.add_task_queue(name, exchange=None, exchange_type=None,

    routing_key=None, **options):

    添加新的被消費隊列。當連接配接重新開機這也存在

  • consumer.cancel_task_queue(name)

    停止從指定名稱的隊列消費消息。當連接配接重新開機這也存在

  • apply_eta_task(request)

    基于

    request.eta attribute

    屬性排程一個 ETA 任務。(Request)

安裝步驟

可以通過修改

app.steps['worker']

app.steps['consumer']

添加新的 bootstep:

>>> app = Celery()
>>> app.steps['worker'].add(MyWorkerStep)  # < add class, don't instantiate
>>> app.steps['consumer'].add(MyConsumerStep)

>>> app.steps['consumer'].update([StepA, StepB])

>>> app.steps['consumer']
{step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}
           

因為執行順序是由結果圖(

Step.requires

)決定,是以在這裡步驟的順序不重要。

為了說明你這麼安裝 bootsteps 以及他麼如何工作,如下示例step列印一些無用的調試資訊。它可以作為工作單元bootstep和消費者bootstep被添加:

from celery import Celery
from celery import bootsteps

class InfoStep(bootsteps.Step):

    def __init__(self, parent, **kwargs):
        # here we can prepare the Worker/Consumer object
        # in any way we want, set attribute defaults, and so on.
        print('{0!r} is in init'.format(parent))

    def start(self, parent):
        # our step is started together with all other Worker/Consumer
        # bootsteps.
        print('{0!r} is starting'.format(parent))

    def stop(self, parent):
        # the Consumer calls stop every time the consumer is
        # restarted (i.e., connection is lost) and also at shutdown.
        # The Worker will call stop at shutdown only.
        print('{0!r} is stopping'.format(parent))

    def shutdown(self, parent):
        # shutdown is called by the Consumer at shutdown, it's not
        # called by Worker.
        print('{0!r} is shutting down'.format(parent))

    app = Celery(broker='amqp://')
    app.steps['worker'].add(InfoStep)
    app.steps['consumer'].add(InfoStep)
           

啟動安裝了這個步驟的工作單元将顯示如下日志:

<Worker: [email protected] (initializing)> is in init
<Consumer: [email protected] (initializing)> is in init
[-5- ::,: WARNING/MainProcess]
    <Worker: [email protected] (running)> is starting
[-5- ::,: WARNING/MainProcess]
    <Consumer: [email protected] (running)> is starting
<Consumer: [email protected] (closing)> is stopping
<Worker: [email protected] (closing)> is stopping
<Consumer: [email protected] (terminating)> is shutting down
           

工作單元初始化後,

print

語句将被重定向到日志子系統,是以

is starting

這一行打上了時間戳。你可以注意到在關閉時将不會出現這種現象,因為

stop

shutdown

方法在一個信号處理函數中被調用,并且在其中使用日志是不安全的。使用python日志子產品記錄日志不是可重入的:意味着你不能中斷這個函數之後又調用它。有一點重要的是

stop

shutdown

方法是可重入的。

啟動工作單元時使用

--loglevel=debug

選項将顯示給我們關于啟動過程的更詳細的資訊:

[-- ::,: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[-- ::,: DEBUG/MainProcess] | Worker: Building graph...
<celery.apps.worker.Worker object at x101ad8410> is in init
[-- ::,: DEBUG/MainProcess] | Worker: New boot order:
    {Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
[-- ::,: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[-- ::,: DEBUG/MainProcess] | Consumer: Building graph...
<celery.worker.consumer.Consumer object at x101c2d8d0> is in init
[-- ::,: DEBUG/MainProcess] | Consumer: New boot order:
    {Connection, Mingle, Events, Gossip, InfoStep, Agent,
     Heart, Control, Tasks, event loop}
[-- ::,: DEBUG/MainProcess] | Worker: Starting Hub
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Worker: Starting Pool
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Worker: Starting InfoStep
[-- ::,: WARNING/MainProcess]
    <celery.apps.worker.Worker object at x101ad8410> is starting
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Worker: Starting Consumer
[-- ::,: DEBUG/MainProcess] | Consumer: Starting Connection
[-- ::,: INFO/MainProcess] Connected to amqp://[email protected]:5672//
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Consumer: Starting Mingle
[-- ::,: INFO/MainProcess] mingle: searching for neighbors
[-- ::,: INFO/MainProcess] mingle: no one here
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Consumer: Starting Events
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Consumer: Starting Gossip
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Consumer: Starting InfoStep
[-- ::,: WARNING/MainProcess]
    <celery.worker.consumer.Consumer object at x101c2d8d0> is starting
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Consumer: Starting Heart
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Consumer: Starting Control
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Consumer: Starting Tasks
[-- ::,: DEBUG/MainProcess] basic.qos: prefetch_count->
[-- ::,: DEBUG/MainProcess] ^-- substep ok
[-- ::,: DEBUG/MainProcess] | Consumer: Starting event loop
[-- ::,: WARNING/MainProcess] [email protected] ready.
           

指令行程式

添加新的指令行選項

Command-specific 選項

通過修改應用執行個體的

user_options

屬性,你可以給 worker、beat和events 添加指令行選項。

Celery 使用 argparse 子產品來解析指令行參數,是以要添加自定義指令行參數,你需要聲明一個回調函數,參數為

argparse.ArgumentParser

執行個體,然後添加參數。請檢視

argparse

文檔擷取更多關于支援字段的資訊。

給 celery worker 添加一個自定義的選項的示例:

from celery import Celery

app = Celery(broker='amqp://')

def add_worker_arguments(parser):
    parser.add_argument(
        '--enable-my-option', action='store_true', default=False,
        help='Enable custom option.',
    ),
app.user_options['worker'].add(add_worker_arguments)
           

所有的bootsteps都将會收到這個參數作為

Bootstep.__init__

函數的關鍵字參數。

from celery import bootsteps

class MyBootstep(bootsteps.Step):

    def __init__(self, worker, enable_my_option=False, **options):
        if enable_my_option:
            party()

app.steps['worker'].add(MyBootstep)
           

預加載選項

celery 總指令支援預加載的概念。這些特殊的選項将傳遞給所有的子指令,并且在main解析步驟之外被解析。

一個預設的預加載選項的清單可以在 API 引用中找到:

celery.bin.base

你也可以添加新的預加載選項,例如聲明一個配置模闆:

from celery import Celery
from celery import signals
from celery.bin import Option

app = Celery()

def add_preload_options(parser):
    parser.add_argument(
        '-Z', '--template', default='default',
        help='Configuration template to use.',
    )
app.user_options['preload'].add(add_preload_options)

@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
    use_template(options['template'])
           

添加新的celery 子指令

新的子指令可以被添加進celery 總指令,隻要使用 setuptools entry-points 即可。

Entry-points 是一個特殊的元資訊,它可以添加到你的包 setup.py 程式,安裝後,使用 pkg_resources 子產品從系統中讀取。

celery 會識别 celery.commands entry-points 來安裝額外的子指令,entry-point的值必須指向一個合法的

celery.bin.base.Command

子類。很不幸的是,文檔有限,但是你可以

celery.bin

包中的各種指令中找到靈感。

下面是 Flower 監控擴充如何添加子指令

celery flower

的示例,通過在 setup.py 中添加一個 entry-point:

setup(
    name='flower',
    entry_points={
        'celery.commands': [
           'flower = flower.command:FlowerCommand',
        ],
    }
)
           

指令的定義是用等号分隔的兩部分組成,第一個部分是子明了的名稱(flower),第二部分是一個全限定的指令實作類的符号路徑:

子產品路徑和屬性名稱應該使用冒号分隔,如上所示。

在子產品 flower/command.py 中,指令類如下定義所示:

from celery.bin.base import Command


class FlowerCommand(Command):

    def add_arguments(self, parser):
        parser.add_argument(
            '--port', default=, type='int',
            help='Webserver port',
        ),
        parser.add_argument(
            '--debug', action='store_true',
        )

    def run(self, port=None, debug=False, **kwargs):
        print('Running our command')
           

工作單元 API

Hub - 工作單元異步消息循環

supported transports: amqp, redis

3.0版本新特性。

當使用amqp或者redis消息中間件時,工作單元使用異步I/O。最終的目的是所有的傳輸中間件都使用事件循環,但是這需要時間,是以其他的傳輸中間件仍然使用基于線程的解決方案。

  • hub.add(fd, callback, flags)
  • hub.add_reader(fd, callback, *args)

    添加回調函數,當fd可讀時調用

回調函數将保持注冊狀态直到使用 hub.remove(fd)顯示的移除,或者由于檔案描述符不在合法而被自動删除。

注意對于一個給定的檔案描述符一次隻能注冊一個回調函數,是以第二次調用

add

方法将自動移除掉前面為這個檔案描述符注冊的回調函數。

檔案描述符是一個類似檔案的對象,支援

fileno

方法,後者它也可以是檔案描述符數字(int)。

  • hub.add_writer(fd, callback, *args)

    添加回調函數,當fd可寫時被調用。檢視上述

    hub.add_reader()

  • hub.remove(fd)

    從循環中移除掉檔案描述符fd的所有回調函數。

定時器 - 排程事件

  • timer.call_after(secs, callback, args=(), kwargs=(),

    priority=0)

  • timer.call_repeatedly(secs, callback, args=(), kwargs=(),

    priority=0)

  • timer.call_at(eta, callback, args=(), kwargs=(),

    priority=0)