天天看點

Celery-4.1 使用者指南: Configuration and defaults (配置和預設值)

這篇文檔描述了可用的配置選項。

如果你使用預設的加載器,你必須建立 celeryconfig.py 子產品并且保證它在python路徑中。

配置檔案示例

以下是配置示例,你可以從這個開始。它包括運作一個基本Celery應用的所有基礎設定。

## Broker settings.
broker_url = 'amqp://guest:guest@localhost:5672//'

# List of modules to import when the Celery worker starts.
imports = ('myapp.tasks',)

## Using the database to store task state and results.
result_backend = 'db+sqlite:///results.db'

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
      

新的小寫設定

4.0 版本引入了新的小寫設定名稱和機構環境。

與以前版本的不同,除了設定項名稱變為小寫字母外,還有一個字首的重命名,例如 celerybeat_ 變為 beat_,celeryd_ 變為 worker,以及很多頂級 celery_ 設定重命名成了 task_ 字首。

Celery 仍然能讀取老的配置檔案,是以并不倉促遷移到新的設定格式。

Setting name Replace with
CELERY_ACCEPT_CONTENT accept_content
CELERY_ENABLE_UTC enable_utc
CELERY_IMPORTS imports
CELERY_INCLUDE include
CELERY_TIMEZONE timezone
CELERYBEAT_MAX_LOOP_INTERVAL beat_max_loop_interval
CELERYBEAT_SCHEDULE beat_schedule
CELERYBEAT_SCHEDULER beat_scheduler
CELERYBEAT_SCHEDULE_FILENAME beat_schedule_filename
CELERYBEAT_SYNC_EVERY beat_sync_every
BROKER_URL broker_url
BROKER_TRANSPORT broker_transport
BROKER_TRANSPORT_OPTIONS broker_transport_options
BROKER_CONNECTION_TIMEOUT broker_connection_timeout
BROKER_CONNECTION_RETRY broker_connection_retry
BROKER_CONNECTION_MAX_RETRIES broker_connection_max_retries
BROKER_FAILOVER_STRATEGY broker_failover_strategy
BROKER_HEARTBEAT broker_heartbeat
BROKER_LOGIN_METHOD broker_login_method
BROKER_POOL_LIMIT broker_pool_limit
BROKER_USE_SSL broker_use_ssl
CELERY_CACHE_BACKEND cache_backend
CELERY_CACHE_BACKEND_OPTIONS cache_backend_options
CASSANDRA_COLUMN_FAMILY cassandra_table
CASSANDRA_ENTRY_TTL cassandra_entry_ttl
CASSANDRA_KEYSPACE cassandra_keyspace
CASSANDRA_PORT cassandra_port
CASSANDRA_READ_CONSISTENCY cassandra_read_consistency
CASSANDRA_SERVERS cassandra_servers
CASSANDRA_WRITE_CONSISTENCY cassandra_write_consistency
CELERY_COUCHBASE_BACKEND_SETTINGS couchbase_backend_settings
CELERY_MONGODB_BACKEND_SETTINGS mongodb_backend_settings
CELERY_EVENT_QUEUE_EXPIRES event_queue_expires
CELERY_EVENT_QUEUE_TTL event_queue_ttl
CELERY_EVENT_QUEUE_PREFIX event_queue_prefix
CELERY_EVENT_SERIALIZER event_serializer
CELERY_REDIS_DB redis_db
CELERY_REDIS_HOST redis_host
CELERY_REDIS_MAX_CONNECTIONS redis_max_connections
CELERY_REDIS_PASSWORD redis_password
CELERY_REDIS_PORT redis_port
CELERY_RESULT_BACKEND result_backend
CELERY_MAX_CACHED_RESULTS result_cache_max
CELERY_MESSAGE_COMPRESSION result_compression
CELERY_RESULT_EXCHANGE result_exchange
CELERY_RESULT_EXCHANGE_TYPE result_exchange_type
CELERY_TASK_RESULT_EXPIRES result_expires
CELERY_RESULT_PERSISTENT result_persistent
CELERY_RESULT_SERIALIZER result_serializer
CELERY_RESULT_DBURI Use result_backend instead.
CELERY_RESULT_ENGINE_OPTIONS database_engine_options
[…]_DB_SHORT_LIVED_SESSIONS database_short_lived_sessions
CELERY_RESULT_DB_TABLE_NAMES database_db_names
CELERY_SECURITY_CERTIFICATE security_certificate
CELERY_SECURITY_CERT_STORE security_cert_store
CELERY_SECURITY_KEY security_key
CELERY_TASK_ACKS_LATE task_acks_late
CELERY_TASK_ALWAYS_EAGER task_always_eager
CELERY_TASK_ANNOTATIONS task_annotations
CELERY_TASK_COMPRESSION task_compression
CELERY_TASK_CREATE_MISSING_QUEUES task_create_missing_queues
CELERY_TASK_DEFAULT_DELIVERY_MODE task_default_delivery_mode
CELERY_TASK_DEFAULT_EXCHANGE task_default_exchange
CELERY_TASK_DEFAULT_EXCHANGE_TYPE task_default_exchange_type
CELERY_TASK_DEFAULT_QUEUE task_default_queue
CELERY_TASK_DEFAULT_RATE_LIMIT task_default_rate_limit
CELERY_TASK_DEFAULT_ROUTING_KEY task_default_routing_key
CELERY_TASK_EAGER_PROPAGATES task_eager_propagates
CELERY_TASK_IGNORE_RESULT task_ignore_result
CELERY_TASK_PUBLISH_RETRY task_publish_retry
CELERY_TASK_PUBLISH_RETRY_POLICY task_publish_retry_policy
CELERY_TASK_QUEUES task_queues
CELERY_TASK_ROUTES task_routes
CELERY_TASK_SEND_SENT_EVENT task_send_sent_event
CELERY_TASK_SERIALIZER task_serializer
CELERYD_TASK_SOFT_TIME_LIMIT task_soft_time_limit
CELERYD_TASK_TIME_LIMIT task_time_limit
CELERY_TRACK_STARTED task_track_started
CELERYD_AGENT worker_agent
CELERYD_AUTOSCALER worker_autoscaler
CELERYD_CONCURRENCY worker_concurrency
CELERYD_CONSUMER worker_consumer
CELERY_WORKER_DIRECT worker_direct
CELERY_DISABLE_RATE_LIMITS worker_disable_rate_limits
CELERY_ENABLE_REMOTE_CONTROL worker_enable_remote_control
CELERYD_HIJACK_ROOT_LOGGER worker_hijack_root_logger
CELERYD_LOG_COLOR worker_log_color
CELERYD_LOG_FORMAT worker_log_format
CELERYD_WORKER_LOST_WAIT worker_lost_wait
CELERYD_MAX_TASKS_PER_CHILD worker_max_tasks_per_child
CELERYD_POOL worker_pool
CELERYD_POOL_PUTLOCKS worker_pool_putlocks
CELERYD_POOL_RESTARTS worker_pool_restarts
CELERYD_PREFETCH_MULTIPLIER worker_prefetch_multiplier
CELERYD_REDIRECT_STDOUTS worker_redirect_stdouts
CELERYD_REDIRECT_STDOUTS_LEVEL worker_redirect_stdouts_level
CELERYD_SEND_EVENTS worker_send_task_events
CELERYD_STATE_DB worker_state_db
CELERYD_TASK_LOG_FORMAT worker_task_log_format
CELERYD_TIMER worker_timer
CELERYD_TIMER_PRECISION worker_timer_precision

配置訓示

通用設定

  • accept_content

    預設值: {‘json’} (set, list, or tuple).

    允許的内容類型/序列化器的白名單

如果接收到一個消息,其内容類型不再上述清單中,它将會被丢棄并抛出一個錯誤。

預設情況下,任意内容類型都是啟用的,包括pickle以及yaml,是以確定不受信任的第三方不能通路你的消息中間件。檢視安全這一節擷取更多資訊。

示例:

# using serializer name
accept_content = ['json']

# or the actual content-type (MIME)
accept_content = ['application/json']
      

時間與日期設定

  • enable_utc

    2.5 版本新特性。

    預設值:從 3.0 版本開始預設啟用

  一旦啟用,消息中的日期和時間将會轉化成 UTC 時區。

  注意2.5版本以下的工作單元将會認為所有消息都使用的本地時區,是以隻有在所有的工作單元都更新了的情況下再啟用這個特性。

  • timezone

    2.5版本新特性

    預設值: “UTC”

  設定Celery使用一個自定義的時區。這個時區值可以是pytz庫支援的任意時區。

  如果沒有設定,UTC時區将被使用。為了向後相容,還提供了一個 enable_utc設定,如果他設定成假,将使用系統本地時區。

任務設定

  • task_annotations

    這個設定可以用來在配置檔案中重寫任意任務屬性。這個設定可以是一個字典,擷取一個annotation對象的清單,這個清單對任務進行過濾,對比對的任務名稱起作用,并傳回待更改屬性的一個映射。

以下将更改 tasks.add 任務的 rate_limit 屬性:

task_annotations = {‘tasks.add’: {‘rate_limit’: ‘10/s’}}       

或者對所有的任務更改:

task_annotations = {‘*’: {‘rate_limit’: ‘10/s’}} 
      

你還可以更改方法,例如 on_failure 處理函數:

def my_on_failure(self, exc, task_id, args, kwargs, einfo):
  print(‘Oh no! Task failed: {0!r}’.format(exc))

task_annotations = {‘*’: {‘on_failure’: my_on_failure}} 
      

如果你需要更靈活的控制,那麼你可以使用對象而不是字典來選擇任務來進行注解:

class MyAnnotate(object):

    def annotate(self, task):
        if task.name.startswith('tasks.'):
            return {'rate_limit': '10/s'}

task_annotations = (MyAnnotate(), {other,})
      
  • task_compression

    預設值: None

    任務消息的預設壓縮算法。可以是 gzip、bzip2(如果可用),或者任意在 Kombu 壓縮模式系統資料庫中注冊的自定義壓縮算法。

   預設發送未壓縮的消息。

  • task_protocol

    預設值:2(從4.0版本開始)

    設定預設的任務消息協定版本。支援的協定:1 和 2

   協定 2 在 3.1.24 以及 4.x+ 被支援

  • task_serializer

    預設值:“json”(從4.0版本開始,更早:pickle)

    一個表示使用的預設序列化方法的字元串。可以是 json(預設)、pickle、 yaml、msgpack,或者任意在 kombu.serialization.registry 中注冊過的自定義序列化方法。

  另見:

    Serializers

  • task_publish_retry

    2.2版本新特性

    預設值:啟用

   決定當連接配接丢失或者其他連接配接錯誤時任務消息的釋出是否會重試,檢視 task_publish_retry_policy。

  • task_publish_retry_policy

    預設值:檢視 Message Sending Retry。

   定義當連接配接丢失或者其他連接配接錯誤時任務消息的釋出重試政策。

任務執行設定

  • task_always_eager

    預設值:禁用

    如果設定成 True,所有的任務都将在本地執行知道任務傳回。apply_async() 以及Task.delay()将傳回一個 EagerResult 執行個體,模拟AsyncResult執行個體的API和行為,除了這個結果是已經計算過的之外。

   也就是說,任務将會在本地執行而不是發送到隊列。

  • task_eager_propagates

    如果設定成 True,本地執行的任務(使用 task.apply(),或者 task_always_eager 被啟用)将傳遞異常。

   這與使用 apply() 帶 throw=True 參數有同樣的效果。

  • task_remote_tracebacks

    如果啟用了,當重新抛出任務錯誤時,任務結果将會包括工作單元的堆棧資訊。

   它需要 tblib 庫,可以通過 pip 安裝:

    $ pip install celery[tblib] 
      

  

 檢視 Bundles 擷取關于組合多個擴充需求的資訊。

  • task_ignore_result

    是否存儲任務傳回值(tombstones)。如果你隻是想在發生錯誤的時候記錄傳回值,可以設定:task_store_errors_even_if_ignored

  • task_store_errors_even_if_ignored

    如果設定了,即使 Task.ignore_result 啟用了,工作單元也會愛結果後端中存儲所有的任務錯誤。

  • task_track_started

    如果設定成真,當任務被工作單元執行時,任務将報告它的狀态為started。預設值是假,因為通常行為是不做這種粒度級别的彙報。任務會處于

    pending、finished 或者 waiting to be retried。當有長時間任務,并且需要知道目前正在運作什麼任務時,有一個

    started狀态将會很有用。

  • task_time_limit

    預設值:沒有時間限制

    任務的硬時間限制,以秒為機關。如果這個時間限制被超過,處理任務的工作單元程序将會被殺死并使用一個新的替代。

  • task_soft_time_limit

    任務的軟時間限制,以秒為機關

  當這個時間限制超過後,SoftTimeLimitExceeded異常将會被抛出。例如,任務可以捕獲這個異常在硬時間限制到達之前對環境進行清理:

from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        return do_work()
    except SoftTimeLimitExceeded:
        cleanup_in_a_hurry()
      
  • task_acks_late

    延遲确認意味着任務消息将在任務執行完成之後再進行确認,而不是剛開始時(預設行為)。

    FAQ: Shoud I use retry or acks_late

  • task_reject_on_worker_lost

    即使 task_acks_late 被啟用,當處理任務的工作單元異常退出或者收到信号而退出時工作單元将會确認任務消息。

   将這個設定成真可以讓消息重新入隊,是以任務将會被再執行,在同一個工作單元或者另外一個工作單元。

   告警:

    啟用這個可能導緻消息循環;確定你知道你在做什麼

  • task_default_rate_limit

    預設值:沒有速率限制

    任務的全局預設速率限制

   當任務沒有一個自定義的速率限制時,這個值将被使用

   另見:

    

worker_disable_rate_limits 設定可以禁用所有的速率限制

任務結果後端設定

  • result_backend

    預設值:預設不啟用結果後端

    用來存儲結果的後端。可以是下列之一:

    1. rpc

      以 AMQP 消息形式發送結果。檢視 RPC 後端設定

    2. database

      使用一個 SQLAlchemy 支援的結構化資料庫。檢視資料庫後端設定

    3. redis

      使用 Redis 存儲結果。檢視 Redis 後端設定

    4. cache

      使用 Memcached 存儲結果。檢視 Cache 後端設定

    5. cassandra

      使用 Cassandra 存儲結果。檢視 Cassandra 後端設定

    6. elasticsearch

      使用 Elasticsearch 存儲結果。檢視 Elasticsearch 後端設定

    7. ironcache

      使用 IronCache 存儲結果。檢視 IronCache 後端設定

    8. couchbase

      使用 Couchbase 存儲結果。檢視 Couchbase 後端設定

    9. couchdb

      使用 CouchDB 存儲結果。檢視 CouchDB 後端設定

    10. filesystem

      使用共享檔案夾存儲結果。檢視 File-system 後端設定

    11. consul

      使用 Consul K/V 存儲結果。檢視 Consul K/V 後端設定

  • result_serializer

    預設值:從4.0版本開始使用 json(更早:pickle)

    檢視 Serializers 擷取支援的序列化格式的資訊。

  • result_compression

    預設值:無壓縮

    結果值得可選壓縮方法。支援 task_seralizer 設定相同的選項。

  • result_expires

    預設值:1天後過期

    存儲的結果被删除的時間(秒數,或者一個 timedelta 對象)

   (有一個内建的周期性任務将删除過期的任務結果(celery.backend_cleanup),前提是 celery beat 已經被啟用。這個任務每天上午4點運作。

   值 None 或者 0 意思是結果永不删除(取決于後端聲明))

   注意:

    目前這個特性隻支援 AMQP, database, cache, Redis 這些存儲後端。當使用 database 存儲後端,celery beat必須執行使得過期結果被删除。

  • result_cache_max

    預設值:預設禁用

    啟用結果的用戶端緩存。

   對于老的 amqp 後端,存儲結果一旦被消費它将不再可用,此時這個特性将起到作用。

   這是老的結果被删除之前總的結果緩存的數量。值 0 或者 None 意味着沒有限制,并且值 -1 将禁用緩存。

Database 後端設定

Database URL 示例

使用一個資料庫存儲後端,你必須配置 result_backend 設定為一個連接配接的URL,并且帶 db+ 字首:

result_backend = 'db+scheme://user:password@host:port/dbname'
      
# sqlite (filename)
result_backend = 'db+sqlite:///results.sqlite'

# mysql
result_backend = 'db+mysql://scott:tiger@localhost/foo'

# postgresql
result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase'

# oracle
result_backend = 'db+oracle://scott:[email protected]:1521/sidname'
      

檢視 Supported Databases 擷取支援的資料庫的一個表,檢視 Connection String 擷取相關的連接配接字元串(這是 db+ 字首後帶的URI的一部分)

  • database_engine_options

    預設值:{} (空映射)

    你可以使用 sqlalchmey_engine_options 設定聲明額外的 SQLAchemy 資料庫引擎選項:

# echo enables verbose logging from SQLAlchemy.
app.conf.database_engine_options = {'echo': True}
      
  • database_short_lived_sessions

    預設禁用短會話。如果啟用了,他們會急劇的降低性能,特别是對于處理很多任務的系統。當工作單元的流量很低,緩存的資料庫連接配接會由于空閑而變為無用,進而會導緻工作單元出錯,這種情況下這個選項是有用的。例如:間歇性的錯誤如(OperationalError)(2006,

    ‘MySQL server has gone away’)通過啟用短會話能解決。這個選項隻影響資料庫後端。

  • database_table_names

    預設值:{} (空映射)

    當 SQLAlchemy 設定成結果後端, Celery 自動建立兩個表來存儲任務的中繼資料。這個設定允許你自定義表名稱:

# use custom table names for the database result backend.
database_table_names = {
    'task': 'myapp_taskmeta',
    'group': 'myapp_groupmeta',
}
      

RPC 後端設定

  • result_persistent

    預設值:預設被禁用(瞬态消息)

    如果設定成 True,結果消息将被持久化。這意味着消息中間件重新開機後消息不會丢失。

  配置示例:

result_backend = 'rpc://'
result_persistent = False
      

Cache 後端設定

注意:

  緩存後端支援 pylibmc 和 python-memcached 庫。後者隻有在 pylibmc 沒有安裝時才會被使用。

使用一個 Memcached 伺服器:

result_backend = 'cache+memcached://127.0.0.1:11211/'
      

使用多個 Memcached 伺服器:

result_backend = """
    cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()
      

“memory” 後端隻在記憶體中存儲緩存:

result_backend = 'cache'
cache_backend = 'memory'
      
  • cache_backend_options

    你可以使用 cache_backend_options 設定 pylibmc 選項:

cache_backend_options = {
    'binary': True,
    'behaviors': {'tcp_nodelay': True},
}
      
  • cache_backend

    這個設定不再使用了,因為現在可以直接在 result_backend 中設定後端存儲。

Redis 後端設定

配置後端 URL

注意:

  Redis 後端需要 Redis 庫。

可以使用 pip 安裝這個包:

$ pip install celery[redis]
      

檢視 Bundles 擷取組合多個擴充需求的資訊

後端需要 result_backend 設定成一個 Redis URL:

result_backend = 'redis://:password@host:port/db'
      

例如:

result_backend = 'redis://localhost/0'
      

等同于:

result_backend = 'redis://'
      

URL 的字段如下定義:

1. password

  連接配接資料庫的密碼

2. host

  Redis 伺服器的主機名或者IP位址(例如:localhost)

3. port

  Redis 伺服器的端口。預設是 6379

4. db

  使用的資料庫編号。預設是0。db 可以包含一個可選的斜杠

  • redis_backend_us_ssl

    Redis後端支援 SSL。這個選項的合法值與 broker_use_ssl 相同

  • redis_max_connections

    預設值:無顯示

    Redis 連接配接池的最大可用連接配接數,這些連接配接用來發送和接收結果

  • redis_socket_connect_timeout

    5.0.1版本新特性

    預設值:None

從存儲後端連接配接到Redis伺服器的連接配接的Socket逾時時間(以秒為機關,int/float)

  • redis_socket_timeout

    預設值:120秒

    對 Redis 伺服器的讀寫操作的 Socket 逾時時間(以秒為機關,int/float),由存儲後端使用

Cassandra 後端設定

  Cassandra 後端驅動 cassandra-driver。

使用 pip 安裝:

$ pip install celery[cassandra]
      

檢視 Bundles 擷取關于組合擴充需求的資訊。

後端需要配置下列配置指令

  • cassandra_servers

    預設值: [] (空清單)

    Cassandra 伺服器清單。例如:

cassandra_servers = ['localhost']
      
  • cassandra_port

    預設值:9042.

    連接配接到Cassandra伺服器的端口

  • cassandra_keyspace

    預設值: None.

    存儲結果的 key-space。例如:

cassandra_keyspace = 'tasks_keyspace'
      
  • cassandra_table

    存儲結果的表(列族)。例如:

cassandra_table = 'tasks'
      
  • cassandra_read_consistency

    使用的讀一緻性。值可以是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE

  • cassandra_write_consistency

    使用的寫一緻性。值可以是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE

  • cassandra_entry_ttl

    狀态項的 Time-to-live。添加過後一段時間他們将會過期并且被删除。值 None (預設) 意味着他們永不過期

  • cassandra_auth_provider

    使用的 cassandra.auth 子產品中的 AuthProvider。 值可以是 PlainTextAuthProvider 或者 SaslAuthProvider

  • cassandra_auth_kwargs

    預設值: {} (空映射)

    傳遞給 authentication provider 的命名參數。例如:

cassandra_auth_kwargs = {
    username: 'cassandra',
    password: 'cassandra'
}
      

配置示例:

cassandra_servers = ['localhost']
cassandra_keyspace = 'celery'
cassandra_table = 'tasks'
cassandra_read_consistency = 'ONE'
cassandra_write_consistency = 'ONE'
cassandra_entry_ttl = 86400
      

Elasticsearch 後端設定

使用 Elasticsearch 作為結果後端,你隻需要将result_backend設定成正确的 URL。

result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'
      
  • elasticsearch_retry_on_timeout

    預設值: False

    逾時後是否應該觸發在另一個節點重試?

  • elasticsearch_max_retries

    預設值: 3

    異常被傳遞前的最大重試次數

  • elasticsearch_timeout

    預設值: 10.0 秒

    elasticsearch 使用的全局逾時時間

Riak 後端設定

Riak 後端需要 riak 庫

使用 pip 進行安裝:

$ pip install celery[riak]
      

檢視 Bundles 擷取組合多擴充需求的資訊。

後端需要result_backend設定成一個 Riak URL:

result_backend = 'riak://host:port/bucket'
      
result_backend = 'riak://localhost/celery
      
result_backend = 'riak://'
      

URL 的字段定義如下:

1. host

  Riak 伺服器的主機名或者IP位址(例如 localhost)

2. port

  使用 protobuf 協定的Riak 伺服器端口,預設是 8087

3. bucket

  使用的Bucket名稱。預設是 celery。bucket 名稱需要是一個隻包含ASCII字元的字元串。

另外,這個後端可以使用如下配置指令進行配置:

  • riak_backend_settings

    這是一個支援如下鍵的映射:

    1. host

      Riak 伺服器的主機名或者IP位址(例如 localhost)

    2. port

      Riak 伺服器端口。預設是 8087

    3. bucket

      使用的Bucket名稱。預設是 celery。bucket 名稱需要是一個隻包含ASCII字元的字元串。

    4. protocol

      連接配接到 Riak 伺服器使用的協定。這不可以通過 result_backend 配置

AWS DynamoDB 後端設定

  Dynamodb 後端需要 boto3 庫

$ pip install celery[dynamodb]
      

後端需要 result_backend 設定成一個 DynamoDB URL:

result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m'
      

例如,聲明 AWS 區域以及表名稱:

result_backend = 'dynamodb://@us-east-1/celery_results
      

或者從環境中擷取 AWS 配置參數,使用預設表名稱(celery)以及聲明讀寫吞吐量:

result_backend = 'dynamodb://@/?read=5&write=5'
      

或者在本地使用 DynamoDB 的可下載下傳版本:

result_backend = 'dynamodb://@localhost:8000
      

URL 中的字段如下定義:

  1. aws_access_key_id & aws_secret_access_key

    通路 AWS API 資源的認證資訊。這可以通過 boto3 從不同的源擷取到

  2. region

    AWS 區域,例如: us-east-1 或者本地版本的 localhost。檢視 boto3 庫文檔擷取更多的資訊。

  3. 如果你使用的本地版本,這是本地DynamoDB示例監聽的端口。如果你沒有把區域設定成 localhost,這個設定選項将無效
  4. table

    使用的表名。預設是 celery。檢視 DynamoDB 命名規則擷取允許的字元以及表名長度的資訊。

  5. read & write

    所建立的 DynamoBD 表的讀寫能力單元。預設的讀寫值都是 1。更多的細節可以從 Provisioned Throughput documentation 中擷取到。

IronCache 後端設定

IronCache 後端需要 iron_celery 庫:

$ pip install iron_celery
      

IronCache 通過在 result_backend 中配置的 URL 進行聲明,例如:

result_backend = 'ironcache://project_id:token@'
      

或者更改緩存名稱:

ironcache:://project_id:token@/awesomecache
      

更多的資訊,檢視 https://github.com/iron-io/iron_celery

Couchbase 後端設定

Couchbase 後端需要 couchbase 庫

$ pip install celery[couchbase]
      

檢視 Bundle 擷取組合多擴充需求的步驟。

後端可以通過 result_backend 設定成一個 Couchbase URL:

result_backend = 'couchbase://username:password@host:port/bucket'
      

   預設值:{} (空映射)

  這是一個支援如下鍵的映射:

  1. Couchbase 伺服器的主機名。預設是 localhost
  2. port

    Couchbase 伺服器監聽的端口。預設是 8091

  3. bucket

    Couchbase 伺服器預設寫入的桶。預設是default

  4. username

    Couchbase 伺服器認證的使用者名(可選)

  5. password

    Couchbase 伺服器認證的密碼(可選)

CouchDB 後端設定

CouchDB 後端需要 pycouchdb 庫:

使用 pip 安裝這個包:

$ pip install celery[couchdb]
      

檢視 Bundles 擷取更多關于組合多擴充需求的資訊

後端可以通過 result_backend 配置成一個 CouchDB URL:

result_backend = 'couchdb://username:password@host:port/container'
      

URL 由以下部分組成:

  1. username
  2. host
  3. container

    CouchDB 伺服器寫入的預設容器。預設是 default

File-system 後端設定

後端可以通過一個檔案 URL 配置,例如:

CELERY_RESULT_BACKEND = 'file:///var/celery/results'
      

配置的目錄需要被共享,并且所有使用該後端的伺服器都可寫。

如果你在單獨的一個系統上使用 Celery,你不需要任何進一步的配置就可以簡單的使用這個後端。對于大型的叢集,你可以使用 NFS、GlusterFS、CIFS、HDFS(使用FUSE),或者其他檔案系統。

Consul K/V 存儲後端設定

Consul 後端可以通過 URL 配置:

CELERY_RESULT_BACKEND = ‘consul://localhost:8500/’
      

後端将在 Consul K/V 存儲中作為單獨鍵存儲結果

後端使用Consul 中的 TTLs 支援結果的自動過期

消息路由

  • task_queues

    預設值: None (預設隊列的配置)

    多數使用者不願聲明這個配置,而是使用 automatic routing facilites。

如果你真的需要配置進階路由,這個設定應該是一個 kombu.Queue 對象的清單,工作單元可以從中消費。

注意工作單元可以通過 -Q 選項覆寫這個設定,或者這個清單中的單獨隊列可以通過 -X 選項進行排除。

檢視 Basics 擷取更多的資訊。

預設值是 celery 隊列的一個隊列/消息交換器/綁定的鍵,消息互動類型是direct。

檢視 task_routes

  • task_routes

    一個路由器的清單,或者單個路路由,用來路由任務到相應的隊列。當決定一個任務的最終目的,路由器将按聲明順序進行輪詢。

一個路由器可以通過如下方式聲明:

  1. 函數,簽名格式為

    (name, args, kwargs, options, task=None, **kwargs)

  2. 字元串,提供到路由函數的路徑
  3. 字典,包含路由聲明,它将會轉化成一個

    celery.routes.MapRoute

    執行個體
  4. 一個

    (pattern, route)

    元組的清單,它将會轉化成一個

    celery.routes.MapRoute

    執行個體
task_routes = {
    'celery.ping': 'default',
    'mytasks.add': 'cpu-bound',
    'feed.tasks.*': 'feeds',                           # <-- glob pattern
    re.compile(r'(image|video)\.tasks\..*'): 'media',  # <-- regex
    'video.encode': {
        'queue': 'video',
        'exchange': 'media'
        'routing_key': 'media.video.encode',
    },
}

task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default})
其中,myapp.tasks.route_task 可以是:

def route_task(self, name, args, kwargs, options, task=None, **kw):
        if task == 'celery.ping':
            return {'queue': 'default'}
      

route_task 可以傳回一個字元串或者一個字典。一個字元串表示 task_queues 中的一個隊列名,而字典表示一個自定義的路由。

當發送消息,路由被按順序詢問。第一個傳回非 None 值得路由将被使用。消息選項此時将與找到的路由設定合并,其中路由器的設定要優先。

例如: apply_async() 有這些參數:

Task.apply_async(immediate=False, exchange='video',
                 routing_key='video.compress')
      

并且有一個路由器傳回:

{'immediate': True, 'exchange': 'urgent'}
      

那麼最終的消息選項将是:

immediate=True, exchange='urgent', routing_key='video.compress'
      

(以及Task類中定義的任意預設消息選項)

當進行合并時,task_routes 中定義的值會優先于 task_queues 中定義的值。

對于如下設定:

task_queues = {
    'cpubound': {
        'exchange': 'cpubound',
        'routing_key': 'cpubound',
    },
}

task_routes = {
    'tasks.add': {
        'queue': 'cpubound',
        'routing_key': 'tasks.add',
        'serializer': 'json',
    },
}
      

tasks.add 的最終路由選項将變為:

{'exchange': 'cpubound',
 'routing_key': 'tasks.add',
 'serializer': 'json'}
      

檢視路由器擷取更多的示例。

  • task_queue_ha_policy

    消息中間件: RabbitMQ

    預設值:None

    這将設定一個隊列的HA政策,并且值可以是一個字元串(通常是 all)

task_queue_ha_policy = 'all'
      

使用 all 将複制隊列到所有的目前節點,或者你指定一個節點的清單:

task_queue_ha_policy = ['rabbit@host1', 'rabbit@host2']      

使用一個清單将隐示設定 x-ha-policy為‘nodes,x-ha-policy-params` 為給定的節點清單

檢視 http://www.rabbitmq.com/ha.html 擷取更多的資訊

  • task_queue_max_priority

    預設值: None

    檢視 RabbitMQ Message Priorities

  • worker_direct

    預設值: 禁用

這個選項使得每個工作單元又一個專門的隊列,是以任務可以路由到指定的工作單元。

每個工作單元的隊列名稱是基于工作單元主機名和一個 .dq字尾自動産生的,使用 C.dq 消息互動器。

例如:節點名稱為 [email protected] 的工作單元的隊列名稱為:

[email protected]
      

此時,你可以通過指定主機名為路由鍵并且使用 C.dq 消息互動器來将任務路由到指定的節點。

task_routes = {
    'tasks.add': {'exchange': 'C.dq', 'routing_key': '[email protected]'}
}
      
  • task_create_missing_queues

    預設值:啟用

    如果啟用(預設),任何聲明的未在 task_queues 中未定義的隊列都将自動被建立。檢視 Automaci routing。

  • task_default_queue

    預設值: celery

    如果消息沒有聲明路由或者自定義的隊列,apply_async 預設使用的隊列名稱。

這個隊列必須在 task_queues 中。如果 task_queues 沒有聲明,那麼他将自動建立一個隊列項,而這個設定值就作為隊列的名稱。

另見:

修改預設隊列的名稱

  • task_default_exchange

    預設值:”celery”

    當 task_queues 設定中指定鍵沒有聲明自定義的消息互動器,那麼這個預設的消息互動器将被使用。

  • task_default_exchange_type

    預設值:”direct”

    當 task_queues 設定中指定鍵沒有聲明自定義的消息互動器類型,那麼這個預設的消息互動器類型将被使用。

  • task_default_routing_key

    預設值:”celery”

    當 task_queues 設定中指定鍵沒有聲明自定義的路由鍵,那麼這個預設的路由鍵将被使用。

  • task_default_delivery_mode

    預設值:”presistent”

  可以是瞬态的(消息不寫硬碟),或者持久的(寫硬碟)

消息中間件設定

  • broker_url

    預設值:”amqp://”

    預設的消息中間件URL。這必須是一個如下形式的URL:

transport://userid:password@hostname:port/virtual_host
      

其中隻有模式部分是必須的,其餘部分都是可選的,預設會設定為對應傳輸中間件的預設值。

傳輸部分是使用的消息中間件的實作,預設是 amqp,(如果安裝了librabbitmq會使用這個庫,否則使用pyamqp)。還有其他可用的選擇,包括 redis://、 sqs://、 qpid://。

模式部分可以是你自己的傳輸中間件實作的全限定路徑:

broker_url = 'proj.transports.MyTransport://localhost'
      

可以配置多個消息中間件,使用相同的傳輸協定也行。消息中間件可以通過當個字元串聲明,不同的消息中間件URL之間用冒号分隔:

broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'
      

或者作為一個清單:

broker_url = [
    'transport://userid:password@localhost:port//',
    'transport://userid:password@hostname:port//'
]
      

這些消息中間件将被用于broker_failover_strategy

檢視Kombu 文檔中的 URLs 章節擷取更多的資訊。

  • broker_read_url / broker_write_url

    預設值:broker_url的設定值

    這些設定可以配置而不用 broker_url 的設定,可以為消息中間件聲明不同的連接配接參數,用來消費和生成消息。

broker_read_url = 'amqp://user:[email protected]:56721'
broker_write_url = 'amqp://user:[email protected]:56722'
      

所有選項都可以聲明成一個清單,作為故障恢複的可選值,檢視 broker_url 擷取更多的資訊

  • broker_failover_strategy

    預設值:“round-robin”

    消息中間件連接配接對象的預設故障恢複政策。如果提供了,将映射到 kombu.connection.failover_strategies 中的一個鍵,或者引用任何方法,從給定的清單中産生一個項。

# Random failover strategy
def random_failover_strategy(servers):
    it = list(servers)  # don't modify callers list
    shuffle = random.shuffle
    for _ in repeat(None):
        shuffle(it)
        yield it[0]

broker_failover_strategy = random_failover_strategy
      
  • broker_heartbeat

    支援的傳輸層協定:pyamqp

    預設值:120.0(與伺服器協商)

  注意:這個值隻被工作單元使用,用戶端此時不使用心跳。

  因為單純使用 TCP/IP 并不總是及時探測到連接配接丢失,是以 AMQP 定義了心跳,用戶端和消息中間件用來檢測連接配接是否關閉。

  心跳會被監控,如果心跳值是 10 秒,那麼檢測心跳的時間間隔是 10 除以broker_heartbeat_checkrate (預設情況下,這個值是心跳值的兩倍,是以對于10秒心跳,心跳每隔5秒檢測一次)

  • broker_heartbeat_checkrate

    預設值:2.0

工作單元會間隔監控消息中間件沒有丢失過多的心跳。這個檢測的速率是用 broker_heartbeat 值除以這個設定值得到的,是以如果心跳是 10.0 并且這個設定值是預設的2.0,那麼這個監控将每隔5秒鐘執行一次(心跳發送速率的兩倍)

  • broker_use_ssl

    支援的傳輸層協定: pyamqp, redis

    預設值: 禁用

在消息中間件連接配接上使用SSL

這個選項的合法值依據使用的傳輸協定的不同而不同

  • pyamqp

    如果設定成True,連接配接将依據預設的SSL設定啟用SSL。如果設定成一個字典,将依據給定的政策配置SSL連接配接。使用的格式是 python 的 ssl.wrap_socket() 選項。

注意SSL套接字一般會在消息中間件的一個單獨的端口上服務。

以下示例提供了用戶端證書,并且使用一個自定義的認證授權來驗證伺服器證書:

import ssl

broker_use_ssl = {
  'keyfile': '/var/ssl/private/worker-key.pem',
  'certfile': '/var/ssl/amqp-server-cert.pem',
  'ca_certs': '/var/ssl/myca.pem',
  'cert_reqs': ssl.CERT_REQUIRED
}
      

告警:

  使用 broker_use_ssl=True 時請小心。可能你的預設配置根本不會驗證伺服器證書。請閱讀python的 ssl module security considerations。

  • redis

    設定必須是一個字典,包括如下鍵:

ssl_cert_reqs (required): one of the SSLContext.verify_mode values: 
ssl.CERT_NONE
ssl.CERT_OPTIONAL
ssl.CERT_REQUIRED
ssl_ca_certs (optional): path to the CA certificate
ssl_certfile (optional): path to the client certificate
ssl_keyfile (optional): path to the client key
      
  • broker_pool_limit

    2.3版本新特性

    預設值:10

    連接配接池中可以打開最大連接配接數。

    從2.5版本開始連接配接池被預設啟用,預設限制是10個連接配接。這個數值可以依據使用一個連接配接的 threads/green-threads

(eventlet/gevent) 數量進行更改。例如:運作 eventlet 啟動 1000 個

greenlets,他們使用一個連接配接到消息中間件,如果發生競态條件,那麼你應該開始增加這個限制。

    如果設定成None或者0,連接配接池将會被禁用,并且每次使用連接配接都會重建立立連接配接并關閉。

  • broker_connection_timeout

    預設值:4.0

    放棄與AMQP伺服器建立連接配接之前預設等待的逾時時間。當使用 gevent 時該設定被禁用。

  • broker_connection_retry

    如果與 AMQP 消息中間件的連接配接斷開,将自動重建立立連接配接

   每次重試中間等待的時間會遞增,并且在 broker_connection_max_retries 未達到之前會一隻重試

  • broker_connection_max_retries

    預設值:100

    放棄與 AMQP 伺服器重建立立連接配接之前的最大重試次數

   如果設定成 0 或者 None,将一直重試

  • broker_login_method

    預設值:AMQPLAIN

    設定自定義的 amqp 登陸方法

  • broker_transport_options

    2.2 版本新特性

    預設值:{} (空映射)

   傳遞給底層傳輸中間件的一個附加選項的字典

   設定可見逾時時間的示例如下(Redis 與 SQS 傳輸中間件支援):

  broker_transport_options = {‘visibility_timeout’: 18000} # 5 hours 
      

工作單元

  • imports

    預設值:[] (空清單)

    當工作單元啟動時導入的一系列子產品

   這用來聲明要導入的子產品,但是它還可用來導入信号處理函數和附加的遠端控制指令,等等。

   這些子產品将會以原來聲明的順序導入

  • include

    預設值:[] (空清單)

    語義上與 imports 相同,但是可以作為将不同導入分類的一種手段

這個設定中的子產品是在 imports 設定中的子產品導入之後才導入

  • worker_concurrency

    預設值:CPU核數

    執行任務的并發工作單元 process/threads/green 數量

   如果你大部分操作是I/O操作,你可以設定更多的程序(線程),但是大部分情況下都是以CPU數作為定界,嘗試讓這個值接近你機器的CPU核數。如果沒有設定,目前機器的 CPU核數将會被使用

  • worker_prefetch_multiplier

    預設值:4

    工作單元一次預擷取多少個消息是這個設定值乘以并發程序的數量。預設值是 4(每個程序4條消息)。但是,預設設定通常是好的選擇 -

    如果你有長時間任務等待在隊列中,并且你必須啟動工作單元,注意第一個工作單元初始時将收到4倍的消息量。是以任務可能在工作單元間不會平均分布

   禁用這個選項,隻要将 worker_prefetch_multiplier 設定成 1。設定成 0 将允許工作單元持續消費它想要的盡可能多的消息。

   更詳細的資訊,請閱讀 Prefetch Limits

    帶 ETA/countdown 的任務不會受 prefetch 限制的影響

  • worker_lost_wait

    預設值:10.0 秒

    有些情況下,工作單元可能在沒有适當清理的情況下就被殺死,并且工作單元可能在終止前已經釋出了一個結果。這個值聲明了在抛出 WorkerLostError 異常之前我們會在丢失的結果值上等待多久

  • worker_max_tasks_per_child

    一個工作單元程序在被一個新的程序替代之前可以執行的最大任務數

  • worker_max_memory_per_child

    預設值:沒有限制。類型:int(kilobytes)

    一個工作單元程序在被一個新的程序替代之前可以消耗的最大預留記憶體(機關KB)。如果單獨一個任務就導緻工作單元超過這個限制,目前的任務會執行完成,并且之後這個程序将會被更新替代。

    示例:

  worker_max_memory_per_child = 12000  # 12MB
      
  • worker_disable_rate_limits

    預設值:禁用(啟用速率限制)

    即使任務顯示設定了速率,仍然禁用所有速率限制

  • worker_state_db

    存儲工作單元狀态的檔案名稱(如取消的任務)。可以是相對或者絕對路徑,但是注意字尾.db 可能會被添加到檔案名後(依賴于python 的版本)

   也可以通過celery worker –statedb 參數設定

  • worker_timer_precision

    預設值:1.0秒

    設定重新檢測排程器之前ETA排程器可以休息的最大秒數

   設定成1意味着排程器精度将為1秒。如果你需要毫秒精度,你可以設定成 0.1

  • worker_enable_remote_control

    預設值:預設啟用

    聲明工作單元的遠端控制是否啟用

事件

  • worker_send_task_events

    發送任務相關的事件,使得任務可以使用類似flower 的工作監控到。為工作單元的 -E 參數設定預設值

  • task_send_sent_event

    預設值:預設禁用

   如果啟用,對于每個任務都将有一個 task-sent 事件被發送,是以任務在被消費前就能被追蹤。

  • event_queue_ttl

    支援的傳輸中間件: amqp

    預設值:5.0 秒

    發送到一個監控用戶端事件隊列的消息的過期時間(x-message-ttl),以秒為機關(int/float)。

   例如:如果這個值設定為10,被遞送到這個隊列的消息将會在10秒後被删除

  • event_queue_expires

    預設值:60.0 秒

    一個監控用戶端事件隊列被删除前的過期時間(x-expires)。

  • event_queue_prefix

    預設值: “celeryev”.

    事件接收隊列名稱的字首

  • event_serializer

    預設值: “json”.

    當發送事件消息時使用的消息序列化格式

遠端控制指令

  • control_queue_ttl

    預設值: 300.0

  • control_queue_expires

    預設值: 10.0

日志

  • worker_hijack_root_logger

    預設值: 預設啟用 (hijack root logger).

   預設情況下,任意前面配置的根日志器的處理函數都将被移除。如果你想自定義日志處理函數,那麼你可以通過設定 worker_hijack_root_logger = False 來禁用這個行為。

日志可以通過連接配接到 celery.signals.setup_logging 進行定制化

  • worker_log_color

    預設值: 如果應用執行個體日志輸出到一個終端,這個将啟用

    啟用/禁用Celery 應用日志輸出的顔色

  • worker_log_format

    預設值:

    [%(asctime)s: %(levelname)s/%(processName)s] %(message)s  日志資訊的格式
          

  檢視python 日志子產品擷取更多關于日志的資訊

  • worker_task_log_format
[%(asctime)s: %(levelname)s/%(processName)s]
    [%(task_name)s(%(task_id)s)] %(message)s
      

任務中記錄日志使用的格式。檢視python 日志子產品擷取更多關于日志的資訊

  • worker_redirect_stdouts

    預設值: 預設啟用

    如果啟用來,标準輸出和标準錯誤輸出将重定向到目前日志器

工作單元和 beat 将使用到

  • worker_redirect_stdouts_level

    預設值:WARNING

    标準輸出和标準錯誤輸出的日志級别。可以是DEBUG, INFO, WARNING, ERROR, or CRITICAL

安全

  • security_key

    預設值: None.

    2.5 版本新特性

包含私鑰的檔案的相對或者絕對路徑,私鑰用來在使用消息簽名時對消息進行簽名。

  • security_certificate

    預設值:None.

  包含X.509認證的檔案的相對或者絕對路徑,認證用來在使用消息簽名時對消息進行簽名。

  • security_cert_store

  包含用來進行消息簽名的X.509認證的目錄。可以使用檔案名模式比對(例如:/etc/certs/*.pem)

自定義元件類 (進階)

  • worker_pool

    預設值:”prefork” (celery.concurrency.prefork:TaskPool).

    工作單元使用的池類的名稱

  • Eventlet/Gevent

    永遠不要使用這個選項來選擇用eventlet 還是 gevent。你必須對工作單元使用-P選項,確定應急更新檔不會應用過遲,導緻出現奇怪的現象。

  • worker_pool_restarts

   如果啟用,工作單元池可以使用 pool_restart 遠端控制指令進行重新開機

  • worker_autoscaler

    預設值: “celery.worker.autoscale:Autoscaler”.

使用的自動擴充類的名稱

  • worker_consumer

    預設值:”celery.worker.consumer:Consumer”.

    工作單元使用的消費類的名稱

  • worker_timer

    預設值:”kombu.async.hub.timer:Timer”.

    工作單元使用的 ETA 排程器類的名稱。預設值是被池具體實作設定。

Beat 設定 (celery beat)

  • beat_schedule

    預設值: {} (空映射)

    beat排程的周期性任務。檢視Entries

  • beat_scheduler

    預設值:”celery.beat:PersistentScheduler”.

    預設的排程器類。如果同時使用django-celery-beat擴充,可以設定成 “django_celery_beat.schedulers:DatabaseScheduler”

也可以通過celery beat 的 -S 參數進行設定

  • beat_schedule_filename

    預設值: “celerybeat-schedule”.

    存儲周期性任務最後運作時間的檔案的名稱,這個檔案被PersistentScheduler使用。可以是相對或者絕對路徑,但是注意字尾.db可能添加到檔案名後(依賴于python版本)

   也可以通過 celery beat 的 –schedule 參數進行設定

  beat_sync_every

  預設值:0.

  另一個資料庫同步發起前可以執行的周期性任務的數量。值0(預設)表示基于時間同步 - 預設是3分鐘,由scheduler.sync_every确定。如果設定成1,beat将在每個任務消息發送後發起同步。

  beat_max_loop_interval

預設值: 0.

轉自:https://blog.csdn.net/libing_thinking/article/details/78812472