一. 線程定時器Timer原理
Timer最基本的了解是定時器,可以啟動多個定時任務,這些定時器任務是異步執行,是以不存在等待順序執行順序。
定時器隻能執行一次,如果需要重複執行,需要重新添加任務。
導入子產品
from threading import Timer
timer = threading.Timer(interval, function, args=None, kwargs=None)
參數介紹
interval — 定時器間隔,間隔多少秒之後啟動定時器任務(機關:秒);
function — 線程函數;
args — 線程參數,可以傳遞元組類型資料,預設為空(預設參數);
kwargs — 線程參數,可以傳遞字典類型資料,預設為空(預設參數);
fromthreading import Timer
import time
def test1(name):
print('test1 {}'.format(name))
#每隔一秒會執行一次
timer2= Timer(1, test1, ('bob', ))
timer2.start()
def test2():
print('test2')
#前面的5是等待的時間,第三個參數是傳入test1的參數,數組形式
timer= Timer(5, test1, ('alex', ))
timer.start()
# timer.cancel()#取消執行
test2()
執行結果:
二. Schedule(排程器)
Schedule是一個第三方輕量級的任務排程子產品,可以按照秒,分,小時,日期或者自定義事件執行時間
方法格式功能
scheduler.enter(delay, priority, action, argument=(), kwargs={})
在 time 規定的時間後,執行 action 參數指定的函數,其中 argument 和 kwargs 負責為 action 指定的函數傳參,priority 參數執行要執行任務的等級,當同一時間點有多個任務需要執行時,等級越高( priority 值越小)的任務會優先執行。該函數會傳回一個 event,可用來取消該任務。
scheduler.cancel(event)
取消 event 任務。注意,如果 event 參數執行的任務不存在,則會引發 ValueError 錯誤。
scheduler.run(blocking=True)
運作所有需要排程的任務。如果調用該方法的 blocking 參數為 True,該方法将會阻塞線程,直到所有被排程的任務都執行完成。
示例如下:
import threading
import timefromsched import scheduler
def job1(job):
print('---------------執行任務:{}------------'.format(job))
def job2(job):
print('---------------執行任務:{}------------'.format(job))
def job3(job):
print('---------------執行任務:{}------------'.format(job))
def start_job(*jobs):
#建立任務排程對象
sche=scheduler()# 指定時間後執行job函數whileTrue:for job injobs:
sche.enter(job['interval_time'], i, job['function'], argument=(job['job'],))
#執行所有排程的任務
sche.run()
def main():whileTrue:
print('------------------main-------------------------')
time.sleep(2)
#定義為線程方法傳入的參數
my_list= [{'job': 'python', 'interval_time': 1, 'function': job1},{'job': 'C++', 'interval_time': 5, 'function': job2},{'job': 'Java', 'interval_time': 2, 'function': job3}]
#建立線程
thread= threading.Thread(target = start_job,args =my_list)
#啟動線程
thread.start()
main()
執行結果:
三. APScheduler(任務架構)
APScheduler是Python的一個定時任務架構,用于執行周琦或者定時任務
可以基于日期,時間間隔,及類似Linux上的定時任務crontab類型的定時任務
不僅可以添加,删除定時任務,還可以将任務存儲到資料庫中,實作任務的持久化,使用起來非常友善
1. APScheduler元件
(1). Job作業
Job作為APScheduler最小執行機關
建立Job時指定執行的函數,函數中所需參數,Job執行時的一些設定資訊
建構說明
id:指定作業的唯一ID
name:指定作業的名字
trigger:apscheduler定義的觸發器,用于确定Job的執行時間,根據設定的trigger規則,計算得到下次執行此job的
時間, 滿足時将會執行
executor:apscheduler定義的執行器,job建立時設定執行器的名字,根據字元串你名字到scheduler擷取到執行此
job的 執行器,執行job指定的函數
max_instances:執行此job的最大執行個體數,executor執行job時,根據job的id來計算執行次數,根據設定的最大執行個體數
來确定是否可執行
next_run_time:Job下次的執行時間,建立Job時可以指定一個時間[datetime],不指定的話則預設根據trigger擷取觸
發時間
misfire_grace_time:Job的延遲執行時間,例如Job的計劃執行時間是21:00:00,但因服務重新開機或其他原因導緻21:00:31才執行,如果設定此key為40,則該job會繼續執行,否則将會丢棄此job
coalesce:Job是否合并執行,是一個bool值。例如scheduler停止20s後重新開機啟動,而job的觸發器設定為5s執行
一次,是以此job錯過了4個執行時間,如果設定為是,則會合并到一次執行,否則會逐個執行
func:Job執行的函數
args:Job執行函數需要的位置參數
kwargs:Job執行函數需要的關鍵字參數
2. Scheduler工作流程圖
(1). Scheduler添加job流程
(2). Scheduler排程流程
執行個體代碼:
import time
import tracebackfromapscheduler.schedulers.background import BackgroundScheduler
def python(job):
print('job {}'.format(job))
def Java(job):
print('job {}'.format(job))
def C(job):
print('job {}'.format(job))classSchedule():
def __init__(self):
self.scheduler_jobs= [{'job_id': 'python', 'job_function': python,'job_type': 'interval', 'job_interval_time': 1},
{'job_id': 'Java', 'job_function': Java,'job_type': 'interval', 'job_interval_time': 10},
{'job_id': 'C', 'job_function': C,'job_type': 'interval', 'job_interval_time': 3}
]
self.scheduler=BackgroundScheduler()
def start_schedule(self):try:for scheduler_job inself.scheduler_jobs:if not self.is_exist_scheduler_job(scheduler_job['job_id']):
# 執行函數,執行類型,執行id,執行間隔時間,傳入參數
print('---------------開始啟動定時任務{}--------------'.format(scheduler_job['job_id']))
self.scheduler.add_job(scheduler_job['job_function'], scheduler_job['job_type'],
id=scheduler_job['job_id'], max_instances=1,
seconds=scheduler_job['job_interval_time'], args=(scheduler_job['job_id'], ))
self.scheduler.start()
except Exceptionase:
msg=traceback.format_exc()
print(msg)
def is_exist_scheduler_job(self, scheduler_job_id):'''是否存在定時任務'''
ifself.scheduler.get_job(scheduler_job_id):returnTruereturnFalse
def main(self):whileTrue:
print('-----------main------------------')
time.sleep(2)if __name__ == '__main__':
obj=Schedule()
obj.start_schedule()
obj.main()
執行結果: