天天看點

python apscheduler執行_Python下定時任務架構APScheduler的使用

今天準備實作一個功能需要用到定時執行任務,是以就看到了Python的一個定時任務架構APScheduler,試了一下感覺還不錯。

1.APScheduler簡介:

APScheduler是Python的一個定時任務架構,可以很友善的滿足使用者定時執行或者周期執行任務的需求,它提供了基于日期date、固定時間間隔interval 、以及類似于Linux上的定時任務crontab類型的定時任務。并且該架構不僅可以添加、删除定時任務,還可以将任務存儲到資料庫中,實作任務的持久化,是以使用起來非常友善。

2.APScheduler安裝:

APScheduler的安裝相對來說也非常簡單,可以直接利用pip安裝,如果沒有pip可以下載下傳源碼,利用源碼安裝。

1).利用pip安裝:(推薦)# pip install apscheduler# python setup.py install

3.基本概念

APScheduler有四種元件及相關說明:

1) triggers(觸發器):觸發器包含排程邏輯,每一個作業有它自己的觸發器,用于決定接下來哪一個作業會運作,除了他們自己初始化配置外,觸發器完全是無狀态的。

2)job stores(作業存儲):用來存儲被排程的作業,預設的作業存儲器是簡單地把作業任務儲存在記憶體中,其它作業存儲器可以将任務作業儲存到各種資料庫中,支援MongoDB、Redis、SQLAlchemy存儲方式。當對作業任務進行持久化存儲的時候,作業的資料将被序列化,重新讀取作業時在反序列化。

3) executors(執行器):執行器用來執行定時任務,隻是将需要執行的任務放在新的線程或者線程池中運作。當作業任務完成時,執行器将會通知排程器。對于執行器,預設情況下選擇ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任務如比較消耗CPU的任務則可以選擇ProcessPoolExecutor,當然根據根據實際需求可以同時使用兩種執行器。

4) schedulers(排程器):排程器是将其它部分聯系在一起,一般在應用程式中隻有一個排程器,應用開發者不會直接操作觸發器、任務存儲以及執行器,相反排程器提供了處理的接口。通過排程器完成任務的存儲以及執行器的配置操作,如可以添加。修改、移除任務作業。

APScheduler提供了多種排程器,可以根據具體需求來選擇合适的排程器,常用的排程器有:

BlockingScheduler:适合于隻在程序中運作單個任務的情況,通常在排程器是你唯一要運作的東西時使用。

BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用。

AsyncIOScheduler:适合于使用asyncio架構的情況

GeventScheduler: 适合于使用gevent架構的情況

TornadoScheduler: 适合于使用Tornado架構的應用

TwistedScheduler: 适合使用Twisted架構的應用

QtScheduler: 适合使用QT的情況

4.配置排程器

APScheduler提供了許多不同的方式來配置排程器,你可以使用一個配置字典或者作為參數關鍵字的方式傳入。你也可以先建立排程器,再配置和添加作業,這樣你可以在不同的環境中得到更大的靈活性。

1)下面一個簡單的示例:import time

from apscheduler.schedulers.blocking import BlockingScheduler

def test_job():

print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

scheduler = BlockingScheduler()

'''

#該示例代碼生成了一個BlockingScheduler排程器,使用了預設的預設的任務存儲MemoryJobStore,以及預設的執行器ThreadPoolExecutor,并且最大線程數為10。

'''

scheduler.add_job(test_job, 'interval', seconds=5, id='test_job')

'''

#該示例中的定時任務采用固定時間間隔(interval)的方式,每隔5秒鐘執行一次。

#并且還為該任務設定了一個任務id

'''

scheduler.start()

2)如果想執行一些複雜任務,如上邊所說的同時使用兩種執行器,或者使用多種任務存儲方式,并且需要根據具體情況對任務的一些預設參數進行調整。可以參考下面的方式。(http://apscheduler.readthedocs.io/en/latest/userguide.html)

第一種方式:from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.mongodb import MongoDBJobStore

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

jobstores = {

'mongo': MongoDBJobStore(),

'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

}

executors = {

'default': ThreadPoolExecutor(20),

'processpool': ProcessPoolExecutor(5)

}

job_defaults = {

'coalesce': False,

'max_instances': 3

}

scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

第二種方式:from apscheduler.schedulers.background import BackgroundScheduler

# The "apscheduler." prefix is hard coded

scheduler = BackgroundScheduler({

'apscheduler.jobstores.mongo': {

'type': 'mongodb'

},

'apscheduler.jobstores.default': {

'type': 'sqlalchemy',

'url': 'sqlite:///jobs.sqlite'

},

'apscheduler.executors.default': {

'class': 'apscheduler.executors.pool:ThreadPoolExecutor',

'max_workers': '20'

},

'apscheduler.executors.processpool': {

'type': 'processpool',

'max_workers': '5'

},

'apscheduler.job_defaults.coalesce': 'false',

'apscheduler.job_defaults.max_instances': '3',

'apscheduler.timezone': 'UTC',

})

第三種方式:from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

from apscheduler.executors.pool import ProcessPoolExecutor

jobstores = {

'mongo': {'type': 'mongodb'},

'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

}

executors = {

'default': {'type': 'threadpool', 'max_workers': 20},

'processpool': ProcessPoolExecutor(max_workers=5)

}

job_defaults = {

'coalesce': False,

'max_instances': 3

}

scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

5.對任務作業的基本操作:

1).添加作業有兩種方式:第一種可以直接調用add_job(),第二種使用scheduled_job()修飾器。

而add_job()是使用最多的,它可以傳回一個apscheduler.job.Job執行個體,因而可以對它進行修改或者删除,而使用修飾器添加的任務添加之後就不能進行修改。

例如使用add_job()添加作業:#!/usr/bin/env python

#-*- coding:UTF-8

import time

import datetime

from apscheduler.schedulers.blocking import BlockingScheduler

def job1(f):

print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), f

def job2(arg1, args2, f):

print f, args1, args2

def job3(**args):

print args

'''

APScheduler支援以下三種定時任務:

cron: crontab類型任務

interval: 固定時間間隔任務

date: 基于日期時間的一次性任務

'''

scheduler = BlockingScheduler()

#循環任務示例

scheduler.add_job(job1, 'interval', seconds=5, args=('循環',), id='test_job1')

#定時任務示例

scheduler.add_job(job1, 'cron', second='*/5', args=('定時',), id='test_job2')

#一次性任務示例

scheduler.add_job(job1, next_run_time=(datetime.datetime.now() + datetime.timedelta(seconds=10)), args=('一次',), id='test_job3')

'''

傳遞參數的方式有元組(tuple)、清單(list)、字典(dict)

注意:不過需要注意采用元組傳遞參數時後邊需要多加一個逗号

'''

#基于list

scheduler.add_job(job2, 'interval', seconds=5, args=['a','b','list'], id='test_job4')

#基于tuple

scheduler.add_job(job2, 'interval', seconds=5, args=('a','b','tuple',), id='test_job5')

#基于dict

scheduler.add_job(job3, 'interval', seconds=5, kwargs={'f':'dict', 'a':1,'b':2}, id='test_job7')

print scheduler.get_jobs()

scheduler.start()

#帶有參數的示例

scheduler.add_job(job2, 'interval', seconds=5, args=['a','b'], id='test_job4')

scheduler.add_job(job2, 'interval', seconds=5, args=('a','b',), id='test_job5')

scheduler.add_job(job3, 'interval', seconds=5, kwargs={'a':1,'b':2}, id='test_job6')

print scheduler.get_jobs()

scheduler.start()

或者使用scheduled_job()修飾器來添加作業:@sched.scheduled_job('cron', second='*/5' ,id='my_job_id',)

def test_task():

print("Hello world!")

2).獲得任務清單:

可以通過get_jobs方法來擷取目前的任務清單,也可以通過get_job()來根據job_id來獲得某個任務的資訊。并且apscheduler還提供了一個print_jobs()方法來列印格式化的任務清單。

例如:scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')

print scheduler.get_job('my_job_id')

print scheduler.get_jobs()

3).修改任務:

修改任務任務的屬性可以使用apscheduler.job.Job.modify()或者modify_job()方法,可以修改除了id的其它任何屬性。

例如:job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job' name='test_job')

job.modify(max_instances=5, name='my_job')

4).删除任務:

删除排程器中的任務有可以用remove_job()根據job ID來删除指定任務或者使用remove(),如果使用remove()需要事先儲存在添加任務時傳回的執行個體對象,任務删除後就不會在執行。

注意:通過scheduled_job()添加的任務隻能使用remove_job()進行删除。

例如:job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')

job.remove()

或者scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')

scheduler.remove_job('my_job')

5).暫停與恢複任務:

暫停與恢複任務可以直接操作任務執行個體或者排程器來實作。當任務暫停時,它的運作時間會被重置,暫停期間不會計算時間。

暫停任務:apscheduler.job.Job.pause()

apscheduler.schedulers.base.BaseScheduler.pause_job()

恢複任務apscheduler.job.Job.resume()

apscheduler.schedulers.BaseScheduler.resume_job()

6).啟動排程器

可以使用start()方法啟動排程器,BlockingScheduler需要在初始化之後才能執行start(),對于其他的Scheduler,調用start()方法都會直接傳回,然後可以繼續執行後面的初始化操作。

例如:from apscheduler.schedulers.blocking import BlockingScheduler

def my_job():

print "Hello world!"

scheduler = BlockingScheduler()

scheduler.add_job(my_job, 'interval', seconds=5)

scheduler.start()

7).關閉排程器:

使用下邊方法關閉排程器:scheduler.shutdown()

預設情況下排程器會關閉它的任務存儲和執行器,并等待所有正在執行的任務完成,如果不想等待,可以進行如下操作:scheduler.shutdown(wait=False)

注意:

當出現No handlers could be found for logger “apscheduler.scheduler”次錯誤資訊時,說明沒有

logging子產品的logger存在,是以需要添加上,對應新增内容如下所示(僅供參考):import logging

logging.basicConfig(level=logging.DEBUG,

format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',

datafmt='%a, %d %b %Y %H:%M:%S',

filename='/var/log/aaa.txt',

filemode='a')