最近用到了python的定时框架,简单记录一下
安装apscheduler
pip install apscheduler
1.定时器的分类
apscheduler的定时调度器分很多种,视情况而使用,说一说我用过的
调度器 | 特点 |
---|---|
BackgroundScheduler | 不会阻塞主线程,在后台运行,但是主程序必须一直运行才会起作用,也就是说要在while中执行 |
BlockingScheduler特点 | 会阻塞主线程,用于单一程序调度,主程序不用一直运行,不用在while中执行 |
2.任务执行方式分类
执行任务的方式有两种
- 间隔性执行任务
- 定时执行任务也就是cron任务
任务执行方式 | 特点 |
---|---|
间隔性执行 | 间隔一段时间后会执行一次,不需要公式 |
cron | 在指定的时间执行,需要学习公式 |
3.实例
- 间隔性执行任务,调度器为BackgroundScheduler ,以下示例可以传参数
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/10/30 15:52
# @Author : Aries
# @File : schedulerDemo.py
# @Software: PyCharm
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
def SpacedTask(arg):
print("The scheduler is start {} args is {}".format(datetime.now(), arg))
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(func=SpacedTask, trigger='interval', seconds=2, args=(1,))
scheduler.start()
while True:
time.sleep(0.5)
- 由此可见,这种调度方式主程序必须一直运行才可以生效,否则无效
- func:函数名字(注意不要加括号)
- trigger:任务执行方式 "interval"间隔调度
- seconds:时间单位,也可以换成小时,分钟等
- args:参数,注意是元祖,最后必须加个逗号表示结束
程序运行结果如下
- cron调度,调度器为BlockingScheduler ,以下示例可以传参数
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/10/30 15:52
# @Author : Aries
# @File : schedulerDemo.py
# @Software: PyCharm
from apscheduler.schedulers.background import BlockingScheduler
from datetime import datetime
def SpacedTask(arg):
print("The scheduler is start {} args is {}".format(datetime.now(), arg))
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(func=SpacedTask, trigger='cron', hour=16, minute=26, args=(1,))
scheduler.start()
- 由此可见,这种调度方式主程序不需要一直运行,主程序会阻塞
- func:函数名字(注意不要加括号)
- trigger:任务执行方式 "cron"定时调度
- hour=16,minute=26:在每天的16.26执行,更多表达式自行查询
- args:参数,注意是元祖,最后必须加个逗号表示结束
4.调度器的配置
配置属性 | 特点 |
---|---|
jobstore | 配置作业存储方式 |
executors | 配置最大线程进程数量 |
job_defaults | 最大实例数和coalesce默认情况 |
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/10/30 15:52
# @Author : Aries
# @File : schedulerDemo.py
# @Software: PyCharm
from concurrent.futures import ThreadPoolExecutor
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
# 配置执行器,并设置线程数,进程数
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
# 默认关闭 最大实例 3
job_defaults = {
'coalesce': False,
'max_instances': 2
}
def SpacedTask(arg):
print("The scheduler is start {} args is {}".format(datetime.now(), arg))
if __name__ == '__main__':
scheduler = BackgroundScheduler(executors=executors, job_defaults=job_defaults)
scheduler.add_job(func=SpacedTask, trigger='interval', seconds=1, args=(1,))
scheduler.start()
while True:
time.sleep(0.5)
- 由于这里是测试案例,就不配置存储方式了
- executors :default 最大线程数为20,processpool:最大进程为5
- job_defaults :coalesce:默认关闭,max_instances:最大实例数2(这里指的实例为多线程实例最大数量为2)
5.函数说明
函数 | 说明 |
---|---|
add_job | 添加定时任务 |
start | 调度器开始执行 |
shutdown | 结束定时任务 |
pause | 暂停任务 |
remove_job | 按照指定ID删除任务,需要在创建任务的时候指定ID |
get_jobs | 获取所有的任务列表 |
modify_job | 修改任务,通过ID修改任务内容(ID无法被修改) |
记录完结,未经允许侵权必究