天天看點

Python任務排程子產品APScheduler

介紹

官網文檔:http://apscheduler.readthedoc...

API:http://apscheduler.readthedoc...

APScheduler

是一個python的第三方庫,用來提供python的背景程式。包含四個元件,分别是:

  • triggers: 任務觸發器元件,提供任務觸發方式
  • job stores: 任務商店元件,提供任務儲存方式
  • executors: 任務排程元件,提供任務排程方式
  • schedulers: 任務排程元件,提供任務工作方式

安裝

pip 安裝

$ pip install apscheduler
           

源碼安裝

$ python setup.py install
           

簡單的執行個體

from apscheduler.schedulers.blocking import BlockingScheduler
import time

# 執行個體化一個排程器
scheduler = BlockingScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

# 添加任務并設定觸發方式為3s一次
scheduler.add_job(job1, 'interval', seconds=3)

# 開始運作排程器
scheduler.start()
           

輸出:

$ python first.py
Fri Sep  8 20:41:55 2017: 執行任務
Fri Sep  8 20:41:58 2017: 執行任務
...
           

各元件功能

trigger元件

trigger

提供任務的觸發方式,共三種方式:

  • date:隻在某個時間點執行一次

    run_date(datetime|str)

scheduler.add_job(my_job, 'date', run_date=date(2017, 9, 8), args=[])
scheduler.add_job(my_job, 'date', run_date=datetime(2017, 9, 8, 21, 30, 5), args=[])
scheduler.add_job(my_job, 'date', run_date='2017-9-08 21:30:05', args=[])
# The 'date' trigger and datetime.now() as run_date are implicit
sched.add_job(my_job, args=[[])
           
  • interval: 每隔一段時間執行一次

    weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

scheduler.add_job(my_job, 'interval', hours=2)
scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00)

@scheduler.scheduled_job('interval', id='my_job_id', hours=2)
def my_job():
    print("Hello World")
           
  • cron: 使用同linux下crontab的方式

    (year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

sched.add_job(my_job, 'cron', hour=3, minute=30)
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30')

@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")
           

scheduler元件

scheduler

元件提供執行的方式,在不同的運用環境中選擇合适的方式

  • BlockingScheduler: 程序中隻運作排程器時的方式
from apscheduler.schedulers.blocking import BlockingScheduler
import time

scheduler = BlockingScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
           
  • BackgroundScheduler: 不想使用任何架構時的方式
from apscheduler.schedulers.background import BackgroundScheduler
import time

scheduler = BackgroundScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()

while True:
    pass
           
  • AsyncIOScheduler: asyncio module的方式(Python3)
from apscheduler.schedulers.asyncio import AsyncIOScheduler
try:
    import asyncio
except ImportError:
    import trollius as asyncio
...
...
# while True:pass 
try:
    asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
    pass
           
  • GeventScheduler: gevent方式
from apscheduler.schedulers.gevent import GeventScheduler

...
...

g = scheduler.start()
# while True:pass
try:
    g.join()
except (KeyboardInterrupt, SystemExit):
    pass
           
  • TornadoScheduler: Tornado方式
from tornado.ioloop import IOLoop
from apscheduler.schedulers.tornado import TornadoScheduler

...
...

# while True:pass
try:
    IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
    pass
           
  • TwistedScheduler: Twisted方式
from twisted.internet import reactor
from apscheduler.schedulers.twisted import TwistedScheduler

...
...

# while True:pass
try:
    reactor.run()
except (KeyboardInterrupt, SystemExit):
    pass
           
  • QtScheduler: Qt方式

executors元件

executors

元件提供任務的排程方式

  • base
  • debug
  • gevent
  • pool

    (max_workers=10)

  • twisted

jobstore元件

jobstore

提供任務的各種持久化方式

  • base
  • memory
  • mongodb

    scheduler.add_jobstore('mongodb', collection='example_jobs')

  • redis

    scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')

  • rethinkdb

    scheduler.add_jobstore('rethinkdb', database='apscheduler_example')

  • sqlalchemy

    scheduler.add_jobstore('sqlalchemy', url=url)

  • zookeeper

    scheduler.add_jobstore('zookeeper', path='/example_jobs')

任務操作

添加任務

add_job

(如上)

如果使用了任務的存儲,開啟時最好添加

replace_existing=True

,否則每次開啟都會建立任務的副本

開啟後任務不會馬上啟動,可修改trigger參數

删除任務

remove_job

# 根據任務執行個體删除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

# 根據任務id删除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
           

任務的暫停

pause_job

和繼續

resume_job

job = scheduler.add_job(myfunc, 'interval', minutes=2)
# 根據任務執行個體
job.pause()
job.resume()

# 根據任務id暫停
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
           

任務的修飾

modify

和重設

reschedule_job

修飾:

job.modify(max_instances=6, name='Alternate name')

重設:

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

排程器操作

  • 開啟

    scheduler.start()

  • 關閉

    scheduler.shotdown(wait=True | False)

  • 暫停

    scheduler.pause()

  • 繼續

    scheduler.resume()

  • 監聽 http://apscheduler.readthedoc...
def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
           

官方執行個體

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)