天天看點

APScheduler定時任務架構一.APScheduler簡介二.APScheduler基本使用

文章目錄

  • 一.APScheduler簡介
  • 二.APScheduler基本使用
    • 1.安裝
    • 2.基本概念
    • 3.選擇正确的排程器、任務存儲器、執行器和觸發器
    • 4.觸發器詳解
      • (1)date
      • (2)interval
      • (3)cron
    • 5.配置排程器
    • 6.啟動排程器
    • 7.添加任務
    • 8.删除任務
    • 9.暫停和恢複工作
    • 10.擷取預定任務清單
    • 11.修改任務
    • 12.關閉排程器
    • 13.暫停/恢複任務處理
    • 13.限制同時執行的任務執行個體數
    • 14.丢失任務的執行與合并
    • 15.排程器事件
    • 16.故障排除

一.APScheduler簡介

Advanced Python Scheduler (APScheduler) 是一個 Python 庫,可讓您安排 Python 代碼稍後執行,可以隻執行一次,也可以定期執行。您可以随意添加新工作或删除舊工作。如果您将任務存儲在資料庫中,它們也将在排程器重新啟動後幸存下來并保持其狀态。當排程器重新啟動時,它将運作它在離線時應該運作的所有任務。

除此之外,APScheduler 可以用作跨平台、特定于應用程式的平台特定排程器的替代品,例如 cron 守護程式或 Windows 任務排程器。但是請注意,APScheduler 本身不是守護程式或服務,也不附帶任何指令行工具。它主要用于在現有應用程式中運作。也就是說,APScheduler 确實為您提供了一些建構塊來建構排程器服務或運作專用排程器程序。

APScheduler 具有三個可以使用的内置排程系統:

  • Cron 式排程(具有可選的開始/結束時間)
  • 基于間隔的執行(以均勻間隔運作任務,具有可選的開始/結束時間)
  • 一次性延遲執行(在設定的日期/時間運作一次任務)

您可以混合搭配排程系統和以您喜歡的任何方式存儲任務的後端。支援的用于存儲任務的後端包括:

Memory,SQLAlchemy (任何由SQLAlchemy支援的RDBMS都可以工作),MongoDB,Redis,RethinkDB,ZooKeeper

APScheduler還內建了一些常見的Python架構,比如:

asyncio,gevent,Tornado,Twisted,Qt (使用PyQt, PySide2或PySide)

有第三方解決方案可以将 APScheduler 與其他架構內建:

Django,Flask

二.APScheduler基本使用

1.安裝

pip安裝:

pip install apscheduler
           

源碼安裝(https://pypi.python.org/pypi/APScheduler/):

python setup.py install
           

2.基本概念

APScheduler 有四種元件:

觸發器(triggers):觸發器包含排程邏輯,描述一個任務何時被觸發,按日期或按時間間隔或按 cronjob 表達式三種方式觸發。每個任務都有它自己的觸發器,用于确定下一次運作任務的時間,除了初始配置之外,觸發器是完全無狀态的。

任務存儲器(job stores):任務存儲器包含預定的任務,指定了任務被存放的位置,預設情況下任務儲存在記憶體,也可将任務儲存在各種資料庫中,當任務的資料在儲存到持久任務存儲時,它會被序列化,并在從它加載回來時會進行反序列化。任務存儲(預設情況除外)不會将任務資料儲存在記憶體中,而是充當在後端儲存、加載、更新和搜尋任務的中間人。任務存儲絕不能在排程器之間共享。

執行器(executors):執行器負責處理任務的運作,通常将指定的任務(調用函數)送出到線程池或程序池中運作,當任務完成時,執行器通知排程器,然後排程器發出适當的事件。

排程器(schedulers):任務排程器,屬于控制角色,通過它配置任務存儲器、執行器和觸發器,添加、修改和删除任務。排程器協調觸發器、任務存儲器、執行器的運作,通常隻有一個排程器運作在應用程式中,開發人員通常不需要直接處理任務存儲器、執行器或觸發器。相反,排程器提供了适當的接口來處理所有這些。任務存儲和執行器的配置是通過排程器完成的。

3.選擇正确的排程器、任務存儲器、執行器和觸發器

選擇排程器:對排程器的選擇主要取決于你的程式設計環境以及你将使用 APScheduler 的目的

BlockingScheduler:當排程器是程序中唯一運作的東西時使用。

BackgroundScheduler:當不使用以下任何架構時使用,并且希望排程器在應用程式的背景運作。

AsyncIOScheduler:當應用程式使用 asyncio 子產品時使用

GeventScheduler: 當應用程式使用 gevent時使用

TornadoScheduler:當正在建構 Tornado 應用程式時使用

TwistedScheduler:當正在建構 Twisted 應用程式時使用

QtScheduler: 當正在建構 Qt 應用程式時使用

選擇任務存儲器:你就需要确定是否需要任務持久化。如果你總是在應用程式開始時重新建立任務,那麼你可以使用預設值 ( MemoryJobStore)。但是,如果你需要在排程器重新啟動或應用程式崩潰時保持你的任務,那麼就要選擇持久化的任務儲存器。但是,如果你是自由選擇,則推薦使用SQLAlchemyJobStore并搭配PostgreSQL作為背景資料庫,因為它可以提供強大的資料完整性保護功能。

選擇執行器:這個同樣要看你的實際需求,預設的ThreadPoolExecutor線程池執行器方案可以滿足大部分需求。如果你的工作負載涉及 CPU 密集型操作,則應考慮ProcessPoolExecutor程序池執行器方案,以此改為使用多個 CPU 核心來充分利用多核算力。你甚至可以同時使用兩者,将ProcessPoolExecutor程序池執行器添加為輔助執行器。

選擇觸發器:配置一個任務,就需要設定一個任務觸發器。觸發器可以設定運作任務時計算日期/時間的邏輯。APScheduler 帶有三種内置觸發器類型:

date: 當你想在某個時間點隻運作一次任務時使用

interval: 當你想以固定的時間間隔運作任務時使用

cron:當你想在一天中的特定時間定期運作任務時使用

也可以将多個觸發器組合成一個觸發器,該觸發器可以設定同時滿足所有觸發器條件而觸發,或者滿足一項即觸發。有關更多資訊,請參閱的文檔:combining triggers

4.觸發器詳解

(1)date

作用:在給定的日期時間觸發一次。如果run_date留白,則使用目前時間。

date類:

from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

def my_job(text):
    print(text)

# 設定執行時間
sched.add_job(my_job, 'date', run_date=date(2021, 8, 30), args=['test'])

sched.start()
           

其中run_date參數可以是date類型、datetime類型或文本類型。

datetime類:

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

def my_job(text):
    print(text)

# 設定執行時間
sched.add_job(my_job, 'date', run_date=datetime(2021, 8, 30, 17, 10, 0), args=['test'])

sched.start()
           

運作結果:

APScheduler定時任務架構一.APScheduler簡介二.APScheduler基本使用

文本類:

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()


def my_job(text):
    print(text)


# 設定執行時間
sched.add_job(my_job, 'date', run_date='2021-08-30 17:27:00', args=['test'])

sched.start()
           

運作結果:

APScheduler定時任務架構一.APScheduler簡介二.APScheduler基本使用

未指定時間,則立即執行:

sched.add_job(my_job, args=['test'])
           

(2)interval

作用:當你想以固定的時間間隔運作任務時使用

class apscheduler.triggers.interval.IntervalTrigger(weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, end_date=None, timezone=None, jitter=None)
           

在指定的時間間隔内觸發,如果指定,則從start_date開始,否則datetime.now() + 間隔。

參數:

周( int ) – 等待的周數

days ( int ) – 等待的天數

小時( int ) – 等待的小時數

分鐘( int ) – 等待的分鐘數

seconds ( int ) – 等待的秒數

start_date ( datetime|str ) – 間隔計算的起點

end_date ( datetime|str ) – 要觸發的最晚可能日期/時間

timezone ( datetime.tzinfo|str ) – 用于日期/時間計算的時區

jitter ( int|None ) –jitter最多延遲作業執行幾秒鐘

from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler


def job_func():
    print("Hello World")


sched = BlockingScheduler()

# 設定周期開始時間start_date和結束時間end_date,及每10s觸發
sched.add_job(job_func, 'interval', start_date="2021-08-30 17:48:20", end_date="2021-08-30 17:49:00", seconds=10)

sched.start()
           

運作結果:

APScheduler定時任務架構一.APScheduler簡介二.APScheduler基本使用

也能通過scheduled_job()裝飾器實作:

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()


@sched.scheduled_job('interval', start_date="2021-08-30 17:48:20", end_date="2021-08-30 17:49:00", seconds=10)
def job_func():
    print("Hello World")

sched.start()
           

該jitter選項使你能夠将随機元件添加到執行時間。如果你有多個伺服器并且不希望它們在完全相同的時刻運作作業,或者如果你想防止具有類似選項的多個作業始終同時運作,這可能很有用:

# 在120s内随機選擇一個額外延遲
sched.add_job(job_func, 'interval', hours=1, jitter=120)
           

(3)cron

作用:當你想在一天中的特定時間定期運作任務時使用

classapscheduler.triggers.cron.CronTrigger(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None, jitter=None)
           

當目前時間比對所有指定的時間限制時觸發,類似于 UNIX cron 排程器的工作方式。

參數:

year (int|str) – 4 位數字年份

month (int|str) – 月 (1-12)

day (int|str) – 月中的第幾天 (1-31)

week (int|str) – ISO 周 (1-53)

day_of_week (int|str) – 工作日的編号或名稱(0-6 或 mon,tue,wed,thu,fri,sat,sun)

hour (int|str) – 小時 (0-23)

minute (int|str) – 分鐘 (0-59)

second (int|str) – 秒 (0-59)

start_date (datetime|str) – 最早可能觸發的日期/時間(包括)

end_date (datetime|str) – 要觸發的最晚可能日期/時間(包括)

timezone (datetime.tzinfo|str) – 用于日期/時間計算的時區(預設為排程器時區)

jitter (int|None) – 最多延遲作業執行幾秒鐘

介紹:

這是 APScheduler 中最強大的内置觸發器。你可以在每個字段上指定多種不同的表達式,在确定下一次執行時間時,它會在每個字段中找到滿足條件的最早時間。這種行為類似于大多數類 UNIX 作業系統中的“Cron”實用程式。

你還可以分别通過start_date和 end_date參數指定 cron 樣式計劃的開始日期和結束日期。它們可以作為日期/日期時間對象或文本(采用 ISO 8601格式)給出。

與 crontab 表達式不同,你可以省略不需要的字段。當省略時間參數時,在顯式指定參數之前的參數會被設定為*,之後的參數會被設定為最小值,week 和day_of_week的最小值為* 。例如,某任務在每年每個月的第一天每小時 20 分鐘執行。下面的代碼示例應該進一步說明這種行為。

day=1, minute=20
等同于
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
           

表達式類型:

下表列出了從年份到第二個字段中使用的所有可用表達式。可以在單個字段中給出多個表達式,用逗号分隔。

表達式 參數類型 描述
* any 比對字段所有取值
*/a any 比對字段每遞增 a 後的值, 從字段最小值開始,包括最小值,比如小時(hour)的 */5,則比對0,5,10,15,20
a-b any 比對字段 a 到 b 之間的取值,a 必須小于 b,包括 a 與 b,比如1-5,則比對1,2,3,4,5
a-b/c any 比對 a 到 b 之間每遞增 c 後的值,包括 a,不一定包括 b,比如1-20/5,則比對1,6,11,16
xth y day 比對 y 在當月的第 x 次,比如 3rd fri 指當月的第三個周五
last x day 比對 x 在當月的最後一次,比如 last fri 指當月的最後一個周五
last day 比對當月的最後一天
x,y,z any 組合表達式,可以組合确定值或上方的表達式
在month和day_of_week領域接受英文簡寫月和星期名(jan-dec和mon-sun分别)。
from apscheduler.schedulers.blocking import BlockingScheduler

def job_func():
    print("Hello World")

sched = BlockingScheduler()

# 任務會在6月、7月、8月、11月和12月的第三個周五,00:00、01:00、02:00和03:00觸發
sched.add_job(job_func, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

sched.start()
           

可以使用start_date和end_date來限制計劃運作的總時間:

from apscheduler.schedulers.blocking import BlockingScheduler

def job_func():
    print("Hello World")

sched = BlockingScheduler()

# 在2021-09-02 00:00:00前,每周一到每周五 10:35運作
sched.add_job(job_func, 'cron', day_of_week='mon-fri', hour=10, minute=35, end_date='2021-09-02')
sched.start()
           

可通過 scheduled_job() 裝飾器實作:

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10, minute=35, end_date='2021-09-02')
def job_func():
    print("Hello World")

sched.start()
           

運作結果:

APScheduler定時任務架構一.APScheduler簡介二.APScheduler基本使用

使用标準 crontab 表達式安排任務:

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

sched = BlockingScheduler()

def job_func():
    print("Hello World")

# from_crontab(cls, expr, timezone=None)
# 其中expr: minute, hour, day of month, month, day of week
# 在9月到11月間前15天内的每個星期的10:48觸發
sched.add_job(job_func, CronTrigger.from_crontab('48 10 1-15 sep-nov *'))
sched.start()
           

運作結果:

APScheduler定時任務架構一.APScheduler簡介二.APScheduler基本使用

也可添加jitter參數:

sched.add_job(job_func, 'cron', hour='*', jitter=120)
           

夏令時問題:

cron 觸發器使用所謂的“挂鐘”時間。是以,如果所選時區遵守 DST(夏令時),你應該注意它可能會在進入或離開 DST 時導緻 cron 觸發器出現意外行為。從标準時間切換到夏令時時,時鐘會向前移動一小時或半小時,具體取決于時區。同樣,當切換回标準時間時,時鐘會向後移動一小時或半小時。這将導緻一些時間段要麼根本不存在,要麼重複。如果你的日程安排要在這些時間段之一執行作業,則它的執行頻率可能比預期的要高或低。這不是一個錯誤。如果你希望避免這種情況,請使用不遵守 DST 的時區,可以使用UTC時間,或提前預知并規劃好執行的問題。

# 在Europe/Helsinki時區, 在三月最後一個周一就不會觸發;在十月最後一個周一會觸發兩次
sched.add_job(job_function, 'cron', hour=3, minute=30)
           

5.配置排程器

APScheduler 提供了許多不同的方式來配置排程器。你可以選擇直接傳字典,也可以将選項作為關鍵字參數傳入。你還可以先執行個體化排程器,然後添加任務并配置排程器。通過這種方式,你可以在任何環境中獲得最大的靈活性。

可以在BaseScheduler該類的 API 參考中找到排程器級别配置選項的完整清單 。排程器子類可能還有其他選項,這些選項記錄在它們各自的 API 參考中。單個任務存儲和執行器的配置選項同樣可以在它們的 API 參考頁面上找到。

假設你想使用預設任務存儲和預設執行器在你的應用程式中運作 BackgroundScheduler:

from apscheduler.schedulers.background import BackgroundScheduler


scheduler = BackgroundScheduler()

#在此或在計劃程式初始化之前初始化應用程式的其餘部分,因為是非阻塞的背景排程器,是以程式會繼續向下執行
           

這将為你提供一個 BackgroundScheduler,其有一個名稱為default的MemoryJobStore(記憶體任務儲存器)和一個名稱是default且預設最大線程是10的ThreadPoolExecutor(線程池執行器)。

現在,如果你希望擁有兩個使用兩個執行器的任務存儲器,并且你還希望調整任務的預設值并設定不同的時區。可參考下面的三個例子,它們是完全等價的:

一個名為“mongo”的 MongoDBJobStore

一個名為“default”的 SQLAlchemyJobStore(使用 SQLite)

一個名為“default”的 ThreadPoolExecutor,工作線程數為 20個

一個名為“processpool”的 ProcessPoolExecutor,工作程序數為 5個

UTC 作為排程器的時區

預設情況下為新任務關閉合并模式

新任務的預設最大執行個體數限制為 3個

方法一:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
           

方法二:

from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})
           

方法三:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# 在這裡做些别的事情,可能會增加任務等

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
           

6.啟動排程器

隻需調用start()排程器即可啟動排程器。對于除BlockingScheduler之外的排程器,此調用将立即傳回,你可以繼續應用程式的初始化過程,如排程器添加任務。

對于 BlockingScheduler,你隻需要start()在完成任何初始化步驟後調用即可,因程式則會阻塞在start()位置,故要運作的代碼必須寫在start()之前。 。

注意:排程器啟動後,您将無法再更改其設定。

7.添加任務

添加任務的方法如下(兩種):

(1)通過調用add_job()

(2)通過裝飾器scheduled_job()

第一種方法是最常見的方法。第二種方法主要是為了友善聲明在程式運作時不會更改的任務。

第一種方法add_job()方法會傳回一個 apscheduler.job.Job執行個體,你可以稍後使用它來修改或删除任務。

在任何時候你都可以在排程安排任務。但是如果添加任務時排程器尚未運作,則任務将被暫定排程,并且僅在排程器啟動時才計算其第一次運作時間。

需要注意的是,如果你使用序列化任務的執行器或任務存儲器,它将對你的任務增加一些要求:

a.目标可調用對象必須可全局通路

b.可調用對象的任何參數都必須是可序列化的

在内置任務存儲中,隻有 MemoryJobStore 不會序列化任務 。在内置執行器中,隻有 ProcessPoolExecutor 會序列化任務。

注意:

a. 如果你在程式初始化期間在持久任務存儲器中安排任務,則必須為任務定義一個具體的ID 并使用replace_existing=True ,否則每次程式重新啟動時你都會獲得任務的新副本,也就表示任務的狀态不會儲存。

b.要立即運作任務,可以在添加任務時省略trigger參數。

8.删除任務

當你從排程器中删除任務時,它會從其關聯的任務存儲器中删除,并且不會再被執行。有兩種方法可以實作這一點:

(1)通過remove_job()使用任務的 ID 和任務存儲器别名調用

(2)在通過add_job()建立的任務執行個體上調用remove()方法

第二種方式更友善,但前提必須在建立任務執行個體時,執行個體被儲存在變量中。對于通過scheduled_job()建立的任務,隻能選擇第一種方式。

如果任務的排程結束(即它的觸發器不會産生任何進一步的運作時間),它會被自動删除。

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()


def job_func1():
    print("Hello")


def job_func2():
    print("World")


job = sched.add_job(job_func1, 'cron', hour=11, minute=59)
job.remove()
sched.add_job(job_func2, 'cron', hour=11, minute=59)

sched.start()
           

運作結果:

APScheduler定時任務架構一.APScheduler簡介二.APScheduler基本使用

同樣,通過任務的具體ID:

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()


def job_func1():
    print("Hello")


def job_func2():
    print("World")


sched.add_job(job_func1, 'cron', hour=11, minute=56, id='my_job_id')
sched.remove_job('my_job_id')
sched.add_job(job_func2, 'cron', hour=11, minute=56)
sched.start()
           

運作結果:

APScheduler定時任務架構一.APScheduler簡介二.APScheduler基本使用

9.暫停和恢複工作

你可以通過Job執行個體或排程器本身輕松暫停和恢複任務。當任務暫停時,它的下一次運作時間将被清除,并且在任務恢複之前不會為其計算進一步的運作時間。

暫停任務,請使用以下任一方法:

apscheduler.job.Job.pause()

apscheduler.schedulers.base.BaseScheduler.pause_job()

恢複任務:

apscheduler.job.Job.resume()

apscheduler.schedulers.base.BaseScheduler.resume_job()

10.擷取預定任務清單

通過get_jobs()就可以獲得一個可修改的任務清單。get_jobs()第二個參數可以指定任務儲存器名稱,那麼就會獲得對應任務儲存器的任務清單。

為友善起見,print_jobs()可以快速列印格式化的任務清單,包含觸發器,下次運作時間等資訊。

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()


def job_func1():
    print("Hello")


def job_func2():
    print("World")


job = sched.add_job(job_func1, 'cron', hour=14, minute=17)

sched.add_job(job_func2, 'cron', hour=14, minute=17)
sched.print_jobs()
sched.start()
           
APScheduler定時任務架構一.APScheduler簡介二.APScheduler基本使用

11.修改任務

通過apscheduler.job.Job.modify()或modify_job(),你可以修改任務當中除了id的任何屬性。

job.modify(max_instances=6, name='Alternate name')
           

如果你想重新安排任務——即更改其觸發器,你可以使用 apscheduler.job.Job.reschedule()或 reschedule_job()。這些方法為任務構造一個新的觸發器,并根據新的觸發器重新計算其下一次運作時間。

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
           

12.關閉排程器

要關閉排程器:

scheduler.shutdown()
           

預設情況下,排程器關閉其任務存儲器和執行器,并等待所有目前正在執行的任務完成。如果你不想等待,你可以這樣做:

scheduler.shutdown(wait=False)
           

這仍将關閉任務存儲器和執行器,但不會等待任何正在運作的任務完成。

13.暫停/恢複任務處理

可以暫停正在執行任務的處理:

scheduler.pause()
           

恢複任務:

scheduler.resume()
           

也可以在排程器啟動時,預設所有任務設為暫停狀态:

scheduler.start(paused=True)
           

13.限制同時執行的任務執行個體數

預設情況下,每個任務隻允許同時運作一個執行個體。這意味着,如果任務即将運作,但前一次運作尚未完成,則将最新運作視為丢失。為了避免這種情況的發生,通過在添加任務時使用max_instances關鍵字參數,可以設定排程器允許并發運作的特定任務的最大執行個體數。

14.丢失任務的執行與合并

有時,任務會由于一些問題沒有被執行。最常見的情況就是,在資料庫裡的任務到了該執行的時間,但排程器被關閉了,那麼這個任務就成了“啞彈任務”。錯過執行時間後,排程器才打開了。這時,排程器會檢查每個任務的misfire_grace_time參數int值,即啞彈上限,來确定是否還執行啞彈任務(這個參數可以全局設定的或者是為每個任務單獨設定)。此時,一個啞彈任務,就可能會被連續執行多次。

但這就可能導緻一個問題,有些啞彈任務實際上并不需要被執行多次。coalescing合并參數就能把一個多次的啞彈任務揉成一個一次的啞彈任務。也就是說,coalescing為True能把多個排隊執行的同一個啞彈任務,變成一個,而不會觸發啞彈事件。

注意:如果任務的執行由于池中沒有可用的線程或程序而延遲,則執行器可能會由于運作太晚(與其最初指定的運作時間相比)而跳過它。如果這可能發生在你的應用程式中,你可能需要增加執行器中的線程/程序數,或者将misfire_grace_time 設定調整為更高的值。

15.排程器事件

可以将事件偵聽器附加到排程器。排程器事件在某些情況下被觸發,并且可能在其中包含有關該特定事件詳細資訊的附加資訊,比如目前運作次數等。通過add_listener()為其提供适當的mask參數,或将不同的常量組合在一起,可以隻偵聽特定類型的事件 。回調對象會有一個參數就是觸發的事件。

有關events可用事件及其屬性的詳細資訊,請參閱子產品的文檔。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')
# 當任務執行完或任務出錯時,調用my_listener
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
           

事件類型:

常量 描述 事件類
EVENT_SCHEDULER_STARTED 排程器已啟動 SchedulerEvent
EVENT_SCHEDULER_SHUTDOWN 排程器被關 SchedulerEvent
EVENT_SCHEDULER_PAUSED 排程器中的任務處理已暫停 SchedulerEvent
EVENT_SCHEDULER_RESUMED 排程器中的任務處理已恢複 SchedulerEvent
EVENT_EXECUTOR_ADDED 一個執行器被添加到排程器 SchedulerEvent
EVENT_EXECUTOR_REMOVED 一個執行器被移除到排程器中 SchedulerEvent
EVENT_JOBSTORE_ADDED 任務存儲已添加到排程器 SchedulerEvent
EVENT_JOBSTORE_REMOVED 任務存儲已從排程器中删除 SchedulerEvent
EVENT_ALL_JOBS_REMOVED 所有任務都已從所有任務存儲或一個特定任務存儲中删除 SchedulerEvent
EVENT_JOB_ADDED 任務已添加到任務存儲 JobEvent
EVENT_JOB_REMOVED 任務已從任務存儲中删除 JobEvent
EVENT_JOB_MODIFIED 從排程器外部修改了任務 JobEvent
EVENT_JOB_SUBMITTED 任務已送出給其執行器以運作 JobSubmissionEvent
EVENT_JOB_MAX_INSTANCES 送出給其執行器的任務未被執行器接受,因為該任務已達到其最大并發執行執行個體數 JobSubmissionEvent
EVENT_JOB_EXECUTED 一個任務執行成功 JobExecutionEvent
EVENT_JOB_ERROR 任務在執行期間引發異常 JobExecutionEvent
EVENT_JOB_MISSED 錯過了任務的執行 JobExecutionEvent
EVENT_ALL 包含所有事件類型的全能掩碼 N/A

16.故障排除

如果排程器沒有按預期工作,将記錄apscheduler器的日志記錄級别提高到該DEBUG級别會有所幫助 。

如果您還沒有首先啟用日志記錄,您可以這樣做:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
           

這應該提供許多關于排程器内部發生的事情的有用資訊。

還要確定您檢查了常見問題部分,看看您的問題是否已經有了解決方案。

報告錯誤:

Github 提供了一個錯誤跟蹤器。

文章參考連結:

  APScheduler官網-使用者指南

  黑色小米粥-APScheduler官方文檔翻譯