文章目录
- 1、创建一个简单的定时任务
-
- 代码说明:
-
- 1.1、实例化一个调度器
- 1.2、指定一个触发器(triggers)
-
- 1.2.1 使用触发器 interval 后,可跟的参数有:
- 1.2.2 使用触发器 date 后,可跟的参数有:
- 1.2.2 使用触发器 cron 后,可跟的参数有:
- 2、调度器、作业存储器 、执行器 详细说明
-
- 2.1 开始任务前,一般先对调度器进行配置。
- 2.2 作业存储器(jobstores)说明
- 2.3 执行器配置
- 2.4、启动和关闭调度器
- 3、添加工作定时任务
-
- 3.1 添加任务的方法有两种:
- 3.2 删除任务
- 4、事件监听
1、创建一个简单的定时任务
建一个每隔多少时间运行一次的任务进行说明
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def my_clock():
print(f"调用时间: {datetime.now()}")
sched = BlockingScheduler() # 实例化一个调度器
# 每隔2秒调用一次 my_clock()函数
sched.add_job(my_clock, trigger='interval', seconds=2)
print(f'开始时间:{datetime.now()}')
sched.start()
ps:函数调用是在start()开始2秒后才开始第一次调用的。
开始时间:2021-03-23 22:48:27.174595
调用时间: 2021-03-23 22:48:29.186497 # 两秒调用一次,即2秒后才开始第一次调用函数。
调用时间: 2021-03-23 22:48:31.190019
……
代码说明:
1.1、实例化一个调度器
sched = BlockingScheduler()
BlockingScheduler() :调度器是阻塞的,即同个线程内 scheduler.start() 之后的代码都不会被运行。
还有其它6个调度器,分别为:
1、BackgroundScheduler: 会开启一个守护线程,即不阻塞主线程的运行,当主线程运行结束后scheduler也自动停止。默认最大线程数为10个。
2、AsyncIOScheduler: 在协程asyncio中应用。
3、GeventScheduler: 在gevent 框架中使用。
4、TornadoScheduler: 在Tornado 框架中使用。
5、TwistedScheduler: 在Twisted 框架的应用。
6、QtScheduler: 适合使用 QT 的情况。
注:如果用 BackgroundScheduler 调度器来执行sqlalchemy的数据库操作,可能会报线程错误。比如:
sqlalchemy.exc.ProgrammingError: (sqlite3.ProgrammingError) SQLite objects created in a thread can only be used in that same thread. ……
此时得在 create_engine创建引擎时加入参数 connect_args={‘check_same_thread’: False}
如:
engine = create_engine('sqlite+pysqlite:///..\\name.db', future=True, connect_args={'check_same_thread': False})
1.2、指定一个触发器(triggers)
sched.add_job(my_clock, trigger='interval', seconds=2)
由 trigger 参数指定触发器,来指定调用函数的方式。
可选择的有:
1、interval:以固定的时间间隔执行。
2、date:在特定的时间日期执行。
3、cron:指定在某个时间点执行或循环执行。
1.2.1 使用触发器 interval 后,可跟的参数有:
weeks (int) – 间隔的周数
days (int) – 间隔的天数
hours (int) – 间隔的小时
minutes (int) –间隔的分钟
seconds (int) – 间隔的秒
start_date (datetime|str) – 开始时间,如果 start_date 为空,则默认是 datetime.now() + interval 作为起始时间。
end_date (datetime|str) – 结束时间
timezone (datetime.tzinfo|str) – 时区
jitter (int|None) – 波动参数,给每次触发添加一个范围内随机浮动秒数。一般适用于多服务器,避免同时运行造成服务拥堵
例:
from datetime import datetime
# 每两小时执行一次
sched.add_job(job_function, 'interval', hours=2)
# 从2021年3月24日22:19:00 到 2021年12月31日23:59:59 的时间内,每两小时执行一次。
sched.add_job(job_function, 'interval', hours=2, start_date='2021-3-24 22:19:00', end_date='2021-12-31 23:59:59')
# next_run_time 设置第一次调用的时间。
# 下面代码如果不设置next_run_time,则会在start()后两小时才开始第一次调用。
# 设置 next_run_time=datetime.datetime.now() 后,则马上开始第一次调用。
sched.add_job(main, 'interval', hours=2, next_run_time=datetime.datetime.now())
1.2.2 使用触发器 date 后,可跟的参数有:
run_date (datetime| str ) – 任务开始的时间,可以是datetime类型,也可以是字符串,如果 run_date 为空,则默认取当前时间。
timezone (datetime.tzinfo| str ) – 时区。
例:
# 在2021年3月24日 22:42:30 调用函数 my_clock。
sched.add_job(my_clock, trigger='date', run_date=datetime(2021, 3, 24, 22, 42, 30))
sched.add_job(my_clock, trigger='date', run_date="2021-3-24 22:42:30")
1.2.2 使用触发器 cron 后,可跟的参数有:
year ( int | str ) – 4 digit year - (表示四位数的年份,如 2008 年)
month ( int | str ) – month ( 1 - 12 ) - (表示取值范围为 1 - 12 月)
day ( int | str ) – day of the ( 1 - 31 ) - (表示取值范围为 1 - 31 日)
week ( int | str ) – ISO week ( 1 - 53 ) - (格里历 2006 年 12 月 31 日可以写成 2006 年 - W52 - 7 (扩展形式)或 2006W527 (紧凑形式))
day_of_week ( int | str ) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示,一周的开始时间总是周一)
hour ( int | str ) – hour ( 0 - 23 ) - (表示取值范围为 0 - 23 时)
minute ( int | str ) – minute ( 0 - 59 ) - (表示取值范围为 0 - 59 分)
second ( int | str ) – second ( 0 - 59 ) - (表示取值范围为 0 - 59 秒)
start_date (datetime| str ) – earliest possible date / time to trigger on (inclusive) - (表示开始时间)
end_date (datetime| str ) – latest possible date / time to trigger on (inclusive) - (表示结束时间)
timezone (datetime.tzinfo| str ) – time zone to use for the date / time calculations (defaults to scheduler timezone) - (表示时区取值)
参数格式:
序号 | 格式 | 可用参数 | 说明 |
---|---|---|---|
1 | * | 所有 | 表示每个单位的该参数。如 hour="*" 表示每小时,second="*" 表示每秒钟。 |
2 | */a | 所有 | 表示最小单位。如 second=’*/2’ 表示最小单位为2秒,即每隔2秒。 注:second=‘2’ 表示第二秒。 |
3 | a-b | 所有 | 表示 [a , b] 范围内包含a, b。 |
4 | a-b/c | 所有 | 表示 [a , b] 范围内每间隔c 。 |
5 | xth y | day参数 | 表示 每个月的第x个星期y。如 day=‘3rd fri’ 每个月的第3个星期5。 |
6 | last x | day参数 | 表示每个月最后一个星期x, 如 day=‘last sun’ 每个月最后一个星期天。 |
7 | last | day参数 | 表示每个月的最后一天。 |
8 | x,y,z | 所有 | 触发任何匹配表达式;可以组合任意数量的上述表达式。如 month=‘6-8,11-12’ 表示 6,7,8,11,12 月 |
2、调度器、作业存储器 、执行器 详细说明
2.1 开始任务前,一般先对调度器进行配置。
配置的内容有:
1、作业存储器(jobstores):默认的作业存储只是简单地将作业存储在内存中,但也可以存储到各种数据库中。当一个 job 保存到一个持久化地作业存储中时,其数据必须要被序列化(serialized),当它们被加载回来时再执行反序列化(deserialized)。非默认的作业存储不会将作业数据保存到内存中,相反,内存会作为后端存储介质在保存、加载、更新和搜索 job 过程中的中间人。作业存储不会在调度器(scheduler)之间共享。
2、执行器(executors):通常它们都是负责将 job 中指定的可调用的部分提交到线程或进程池。当 job 完成后,执行器会通知(notifies)调度器,由调度器随后发出(emits)一个恰当的事件(event)。
3、默认添加新任务时的配置(job_defaults)。
简单例子,对 BackgroundScheduler 进行配置:
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# 作业存储器(jobstores)
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 执行器(executors)
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
# 默认添加新任务时的配置(job_defaults)
job_defaults = {
'coalesce': False,
'max_instances': 3
}
# 在实例化时配置。
# 此处使用了UTC时间,记录到数据库中会与中国时间相差8个小时。
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
# 先实例化,再配置。
# scheduler = BackgroundScheduler()
# scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
2.2 作业存储器(jobstores)说明
APScheduler 提供了许多不同的方法来配置 scheduler 。你可以使用一个配置字典,或者是直接将其作为 options 的关键字参数。你也可以 先实例化 scheduler ,随后再添加 job 和配置 scheduler 。
有以下几种可以选择:
1、MemoryJobStore:默认内存存储,没有序列化,jobs就存在内存里,重启程序,之前的job就没了。
2、SQLAlchemyJobStore:所有sqlalchemy支持的数据库都可以做为backend,增删改查操作转化为对应backend的sql语句。
3、MongoDBJobStore:用mongodb作任务储存。
4、RedisJobStore: 用redis作作任务储存。
5、RethinkDBJobStore: 用rethinkdb 作任务储存。
6、ZooKeeperJobStore:用ZooKeeper做任务储存。
下面再以SQLAlchemyJobStore例子设置为存储为Mysql的:
jobstores = {
'default': SQLAlchemyJobStore(
url="mysql+pymysql://user:[email protected]/dbname?charset=utf8",
table='table_name')
}
注:应该就是SQLAlchemy连接数据库的方式。
user:用户名。
passwd:密码。
host:地址。
dbname:数据库名。
charset:编码方式。
当以数据库来存储job时,如果程序终止重新启动后,默认会执行之前未执行的任务,这样会导致任务多次执行。当要避免任务重复执行,需在add_job时加入这两个参数
id=‘id_name’, replace_existing=True
例:
scheduler.add_job(my_clock, trigger='interval', seconds=3,
id='id_name', replace_existing=True)
2.3 执行器配置
Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。
每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor找到实际的执行器对象,然后根据执行器对象执行Job
Executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor。如果选择tornado作为调度的库,选择TornadoExecutor。如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以。
Executor的选择需要根据实际的scheduler来选择不同的执行器
目前APScheduler支持的Executor:
1、AsyncIOExecutor
2、GeventExecutor
3、ThreadPoolExecutor
4、ProcessPoolExecutor
5、TornadoExecutor
6、TwistedExecutor
2.4、启动和关闭调度器
1、启动调度器
启动调度器只需调用调度器上的start()方法。
scheduler = BackgroundScheduler()
…………
scheduler.start()
除了BlockingScheduler,非阻塞调度器都会立即返回,可以继续运行之后的代码,比如添加任务等。
对于BlockingScheduler,程序则会阻塞在start()位置,所以,要运行的代码必须写在start()之前。
2、关闭调度器
scheduler.shutdown()
默认情况下,调度器会先把正在执行的任务处理完,再关闭任务储存器和执行器。但是,如果想直接关闭,可以添加参数:
scheduler.shutdown(wait=False)
上述方法不管有没有任务在执行,会强制关闭调度器。
3、添加工作定时任务
3.1 添加任务的方法有两种:
1、通过调用add_job()
2、通过装饰器scheduled_job()
import time
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
# 通过add_job()添加
sched.add_job(my_job, 'interval' , seconds = 5 )
# 通过 scheduled_job 装饰器添加
@sched.scheduled_job( 'interval' , seconds = 5 )
def my_job():
print time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime(time.time()))
sched.start()
第一种方法是常用的;第二种方法方便,但缺点就是运行时,不能修改任务。第一种add_job()方法会返回一个apscheduler.job.Job实例,这样就可以在运行时,修改或删除任务。
如果调度器还没有启动,此时添加任务,那么任务就处于一个暂存的状态。只有当调度器启动时,才会开始计算下次运行时间。
还有一点要注意,如果你的执行器或任务储存器是会序列化任务的,那么这些任务就必须符合:
1、回调函数必须全局可用
2、回调函数参数必须也是可以被序列化的
ps:内置任务储存器中,只有MemoryJobStore不会序列化任务;内置执行器中,只有ProcessPoolExecutor会序列化任务。
add_job() 参数说明:
func:Job执行的函数
trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行。
args:Job执行函数需要的位置参数
kwargs:Job执行函数需要的关键字参数
id:指定作业的唯一ID
name:指定作业的名字
misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40, 则该job会继续执行,否则将会丢弃此job。
coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行。
max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行。
next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间。
executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的 执行器,执行job指定的函数。
replace_existing: bool值,避免出现重复多个任务。
ps:
1、默认情况下,每个 job 同时只会有一个实例在运行。这意味着如果一个 job 到达计划运行时间点时,前一个 job 尚未完成,那么这个 job 最近的一次运行计划将会 misfire(错过)。可以通过在添加 job 时指定 max_instances 关键字参数来设置具体 job 的最大实例数目,以便 scheduler 随后可以并发地执行它。
2、有时候 scheduler 无法在被调度的 job 的计划运行时间点去执行这个 job 。常见的原因是这个 job 是在持久化的 job store 中,恰好在其打算运行的时刻 scheduler 被关闭或重启了。这样,这个 job 就被定义为 misfired (错过)。scheduler 稍后会检查 job 每个被错过的执行时间的 misfire_grace_time 选项(可以单独给每个 job 设置或者给 scheduler 做全局设置),以此来确定这个执行操作是否要继续被触发。这可能到导致连续多次执行。
如果这个行为不符合你的实际需要,可以使用 coalescing 来回滚所有的被错过的执行操作为唯一的一个操作。如果对 job 启用了 coalescing ,那么即便 scheduler 在队列中看到这个 job 一个或多个执行计划,scheduler 都只会触发一次。
3.2 删除任务
如果想从调度器移除一个任务,就要从相应的任务储存器中移除它,有两种方式:
1、在通过add_job() 创建的任务实例上调用remove()方法。
2、调用remove_job(),参数为:任务ID,任务名称。
job = scheduler.add_job(myfunc, 'interval' , minutes = 2 )
job.remove()
#如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效
sched.add_job(myfunc, 'interval' , minutes = 2 , id = 'my_job_id' )
sched.remove_job( 'my_job_id' )
其它的一些任务管理方法:
1、通过job实例管理。
job = scheduler.add_job(myfunc, 'interval', minutes=2) # 添加任务
job.remove() # 删除任务
job.pause() # 暂定任务
job.resume() # 恢复任务
job.shutdown() # 关闭调度
job.shutdown(wait=False) # 不等待正在运行的任务
2、通过调度器管理。
scheduler.print_jobs() # 可以快速打印格式化的任务列表,包含触发器,下次运行时间等信息。
scheduler.get_jobs() # 获取任务列表。
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 添加任务
scheduler.remove_job('my_job_id') # 删除任务
scheduler.pause_job('my_job_id') # 暂定任务
scheduler.resume_job('my_job_id') # 恢复任务
3、修改任务
通过 apscheduler.job.Job.modify() 或 modify_job(),你可以修改任务当中除了id的任何属性。
比如:
job.modify(max_instances=6, name='Alternate name')
如果想要重新调度任务(就是改变触发器),你能通过apscheduler.job.Job.reschedule()或reschedule_job()来实现。这些方法会重新创建触发器,并重新计算下次运行时间。
比如:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
4、事件监听
可以为 scheduler 绑定事件监听器(event listen)。Scheduler 事件在某些情况下会被触发,而且它可能携带有关特定事件的细节信息。为 add_listener() 函数提供适当的掩码参数(mask argument)或者是将不同的常数组合到一起,可以监听特定类型的事件。可调用的 listener 可以通过 event object 作为参数而被调用。
用 add_listener(callback,mask)绑定监听事件,第一个参数是回调函数,mask是指定侦听事件类型,mask参数也可以是逻辑组合。回调函数是一个可以接收一个参数的函数。
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
print(
"job执行job:\ncode => {}\njob.id => {}\njobstore=>{}".format(
event.code,
event.job_id,
event.jobstore
))
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
目前APScheduler定义的Event
Constant(事件类型) Description(描述) Event class(事件类)
EVENT_SCHEDULER_STARTED The scheduler was started SchedulerEvent
EVENT_SCHEDULER_SHUTDOWN The scheduler was shut down SchedulerEvent
EVENT_SCHEDULER_PAUSED Job processing in the scheduler was paused SchedulerEvent
EVENT_SCHEDULER_RESUMED Job processing in the scheduler was resumed SchedulerEvent
EVENT_EXECUTOR_ADDED An executor was added to the scheduler SchedulerEvent
EVENT_EXECUTOR_REMOVED An executor was removed to the scheduler SchedulerEvent
EVENT_JOBSTORE_ADDED A job store was added to the scheduler SchedulerEvent
EVENT_JOBSTORE_REMOVED A job store was removed from the scheduler SchedulerEvent
EVENT_ALL_JOBS_REMOVED All jobs were removed from either all job
stores or one particular job store SchedulerEvent
EVENT_JOB_ADDED A job was added to a job store JobEvent
EVENT_JOB_REMOVED A job was removed from a job store JobEvent
EVENT_JOB_MODIFIED A job was modified from outside the scheduler JobEvent
EVENT_JOB_SUBMITTED A job was submitted to its executor to be run JobSubmissionEvent
EVENT_JOB_MAX_INSTANCES A job being submitted to its executor was not
accepted by the executor because the job has
already reached its maximum concurrently
executing instances JobSubmissionEvent
EVENT_JOB_EXECUTED A job was executed successfully JobExecutionEvent
EVENT_JOB_ERROR A job raised an exception during execution JobExecutionEvent
EVENT_JOB_MISSED A job’s execution was missed JobExecutionEvent
EVENT_ALL A catch-all mask that includes every event type N/A