python 多種定時任務實作方法和舉例,包括sleep、Timer、schedule、APScheduler等,python有很多定時任務架構,包括調用同步方法和異步方法,主要整理一下,除此之外,還有很多其他的實作,例如 celery 等等。
while True: + sleep()
threading.Timer定時器
排程子產品schedule
任務架構APScheduler
1、while循環中使用sleep
缺點:不容易控制,而且是個阻塞函數
def timer(n):
'''''
每n秒執行一次
'''
while True:
print(time.strftime('%Y-%m-%d %X',time.localtime()))
yourTask() # 此處為要執行的任務
time.sleep(n)
2、schedule子產品
優點:可以管理和排程多個任務,可以進行控制
缺點:阻塞式函數
import schedule
import time
import datetime
def job1():
print('Job1:每隔10秒執行一次的任務,每次執行2秒')
print('Job1-startTime:%s' %(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(2)
print('Job1-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
def job2():
print('Job2:每隔30秒執行一次,每次執行5秒')
print('Job2-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(5)
print('Job2-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
def job3():
print('Job3:每隔1分鐘執行一次,每次執行10秒')
print('Job3-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(10)
print('Job3-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
def job4():
print('Job4:每天下午17:49執行一次,每次執行20秒')
print('Job4-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(20)
print('Job4-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
def job5():
print('Job5:每隔5秒到10秒執行一次,每次執行3秒')
print('Job5-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(3)
print('Job5-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
if __name__ == '__main__':
schedule.every(10).seconds.do(job1)
schedule.every(30).seconds.do(job2)
schedule.every(1).minutes.do(job3)
schedule.every().day.at('17:49').do(job4)
schedule.every(5).to(10).seconds.do(job5)
while True:
schedule.run_pending()
3、Threading子產品中的Timer
優點:非阻塞
缺點:不易管理多個任務
from threading import Timer
import datetime
# 每隔兩秒執行一次任務
def printHello():
print('TimeNow:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
t = Timer(2, printHello)
t.start()
if __name__ == "__main__":
printHello()
4、sched子產品
sched子產品實作了一個時間排程程式,該程式可以通過單線程執行來處理按照時間尺度進行排程的時間。
通過調用scheduler.enter(delay,priority,func,args)函數,可以将一個任務添加到任務隊列裡面,當指定的時間到了,就會執行任務(func函數)。
delay:任務的間隔時間。
priority:如果幾個任務被排程到相同的時間執行,将按照priority的增序執行這幾個任務。
func:要執行的任務函數
args:func的參數
import time, sched
import datetime
s = sched.scheduler(time.time, time.sleep)
def print_time(a='default'):
print('Now Time:',datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),a)
def print_some_times():
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
s.enter(10,1,print_time)
s.enter(5,2,print_time,argument=('positional',))
s.run()
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print_some_times()
執行結果為:
2018-09-20 16:25:03
Now Time: 2018-09-20 16:25:08 positional
Now Time: 2018-09-20 16:25:13 default
2018-09-20 16:25:13
Process finished with exit code 0
按順序執行任務:
import time, sched
import datetime
s = sched.scheduler(time.time, time.sleep)
def event_fun1():
print("func1 Time:", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def perform1(inc):
s.enter(inc, 0, perform1, (inc,))
event_fun1()
def event_fun2():
print("func2 Time:", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def perform2(inc):
s.enter(inc, 0, perform2, (inc,))
event_fun2()
def mymain(func, inc=2):
if func == "1":
s.enter(0, 0, perform1, (10,))# 每隔10秒執行一次perform1
if func == "2":
s.enter(0, 0, perform2, (20,))# 每隔20秒執行一次perform2
if __name__ == '__main__':
mymain('1')
mymain('2')
s.run()
執行結果為:
func1 Time: 2018-09-20 16:30:28
func2 Time: 2018-09-20 16:30:28
func1 Time: 2018-09-20 16:30:38
func2 Time: 2018-09-20 16:30:48
func1 Time: 2018-09-20 16:30:48
func1 Time: 2018-09-20 16:30:58
func2 Time: 2018-09-20 16:31:08
func1 Time: 2018-09-20 16:31:08
func1 Time: 2018-09-20 16:31:18
func2 Time: 2018-09-20 16:31:28
func1 Time: 2018-09-20 16:31:28
func1 Time: 2018-09-20 16:31:38
s.run()會阻塞目前線程的執行
可以用
t=threading.Thread(target=s.run)
t.start()
也可以用s.cancal(action)來取消sched中的某個action
5、定時架構APScheduler
APSScheduler是python的一個定時任務架構,它提供了基于日期date、固定時間間隔interval、以及linux上的crontab類型的定時任務。該礦機不僅可以添加、删除定時任務,還可以将任務存儲到資料庫中、實作任務的持久化。
APScheduler是一個 Python 定時任務架構,使用起來十分友善。提供了基于日期、固定時間間隔以及 crontab 類型的任務,并且可以持久化任務、并以 daemon 方式運作應用。
使用 APScheduler 需要安裝
pip install apscheduler
APScheduler有四種元件:
triggers(觸發器):觸發器包含排程邏輯,每一個作業有它自己的觸發器,用于決定接下來哪一個作業會運作,除了他們自己初始化配置外,觸發器完全是無狀态的。
job stores(作業存儲):用來存儲被排程的作業,預設的作業存儲器是簡單地把作業任務儲存在記憶體中,其它作業存儲器可以将任務作業儲存到各種資料庫中,支援MongoDB、Redis、SQLAlchemy存儲方式。當對作業任務進行持久化存儲的時候,作業的資料将被序列化,重新讀取作業時在反序列化。
executors(執行器):執行器用來執行定時任務,隻是将需要執行的任務放在新的線程或者線程池中運作。當作業任務完成時,執行器将會通知排程器。對于執行器,預設情況下選擇ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任務如比較消耗CPU的任務則可以選擇ProcessPoolExecutor,當然根據根據實際需求可以同時使用兩種執行器。
schedulers(排程器):排程器是将其它部分聯系在一起,一般在應用程式中隻有一個排程器,應用開發者不會直接操作觸發器、任務存儲以及執行器,相反排程器提供了處理的接口。通過排程器完成任務的存儲以及執行器的配置操作,如可以添加。修改、移除任務作業。
APScheduler提供了七種排程器:
BlockingScheduler:适合于隻在程序中運作單個任務的情況,通常在排程器是你唯一要運作的東西時使用。
BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用。
AsyncIOScheduler:适合于使用asyncio異步架構的情況
GeventScheduler: 适合于使用gevent架構的情況
TornadoScheduler: 适合于使用Tornado架構的應用
TwistedScheduler: 适合使用Twisted架構的應用
QtScheduler: 适合使用QT的情況
APScheduler提供了四種存儲方式:
MemoryJobStore
sqlalchemy
mongodb
redis
APScheduler提供了三種任務觸發器:
data:固定日期觸發器:任務隻運作一次,運作完畢自動清除;若錯過指定運作時間,任務不會被建立
interval:時間間隔觸發器
cron:cron風格的任務觸發
示例1、
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
# 該示例代碼生成了一個BlockingScheduler排程器,使用了預設的任務存儲MemoryJobStore,以及預設的執行器ThreadPoolExecutor,并且最大線程數為10。
# BlockingScheduler:在程序中運作單個任務,排程器是唯一運作的東西
scheduler = BlockingScheduler()
# 采用阻塞的方式
# 采用固定時間間隔(interval)的方式,每隔5秒鐘執行一次
scheduler.add_job(job, 'interval', seconds=5)
scheduler.start()
示例2、
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
# BlockingScheduler:在程序中運作單個任務,排程器是唯一運作的東西
scheduler = BlockingScheduler()
# 采用阻塞的方式
# 采用date的方式,在特定時間隻執行一次
scheduler.add_job(job, 'date', run_date='2018-09-21 15:30:00')
scheduler.start()
示例3、
import time
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print('job:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
# BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用。
scheduler = BackgroundScheduler()
# 采用非阻塞的方式
# 采用固定時間間隔(interval)的方式,每隔3秒鐘執行一次
scheduler.add_job(job, 'interval', seconds=3)
# 這是一個獨立的線程
scheduler.start()
# 其他任務是獨立的線程
while True:
print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(2)
print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
運作結果為:
main-start: 2018-09-21 15:54:28
main-end: 2018-09-21 15:54:30
main-start: 2018-09-21 15:54:30
job: 2018-09-21 15:54:31
main-end: 2018-09-21 15:54:32
main-start: 2018-09-21 15:54:32
main-end: 2018-09-21 15:54:34
main-start: 2018-09-21 15:54:34
job: 2018-09-21 15:54:34
main-end: 2018-09-21 15:54:36
main-start: 2018-09-21 15:54:36
示例4、
import time
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print('job:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
# BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用。
scheduler = BackgroundScheduler()
# 采用非阻塞的方式
# 采用date的方式,在特定時間裡執行一次
scheduler.add_job(job, 'date', run_date='2018-09-21 15:53:00')
# 這是一個獨立的線程
scheduler.start()
# 其他任務是獨立的線程
while True:
print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(2)
print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
運作結果為:
main-start: 2018-09-21 15:52:57
main-end: 2018-09-21 15:52:59
main-start: 2018-09-21 15:52:59
job: 2018-09-21 15:53:00
main-end: 2018-09-21 15:53:01
示例5、
import time
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
# BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用
scheduler = BackgroundScheduler()
# 采用非阻塞的方式
# 采用corn的方式
scheduler.add_job(job, 'cron', day_of_week='fri', second='*/5')
'''
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
econd (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
* any Fire on every value
*/a any Fire every a values, starting from the minimum
a-b any Fire on any value within the a-b range (a must be smaller than b)
a-b/c any Fire every c values within the a-b range
xth y day Fire on the x -th occurrence of weekday y within the month
last x day Fire on the last occurrence of weekday x within the month
last day Fire on the last day within the month
x,y,z any Fire on any matching expression; can combine any number of any of the above expressions
'''
scheduler.start()
# 其他任務是獨立的線程
while True:
print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(2)
print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
運作結果:
main-start: 2018-09-21 16:02:55
main-end: 2018-09-21 16:02:57
main-start: 2018-09-21 16:02:57
main-end: 2018-09-21 16:02:59
main-start: 2018-09-21 16:02:59
2018-09-21 16:03:00
main-end: 2018-09-21 16:03:01
main-start: 2018-09-21 16:03:01
main-end: 2018-09-21 16:03:03
main-start: 2018-09-21 16:03:03
2018-09-21 16:03:05
main-end: 2018-09-21 16:03:05
main-start: 2018-09-21 16:03:05
main-end: 2018-09-21 16:03:07
main-start: 2018-09-21 16:03:07
main-end: 2018-09-21 16:03:09
main-start: 2018-09-21 16:03:09
2018-09-21 16:03:10
示例6、
import time
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
# BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用
scheduler = BackgroundScheduler()
# 采用阻塞的方式
# 采用corn的方式
scheduler.add_job(job, 'cron', day_of_week='fri', second='*/5')
'''
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
econd (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
* any Fire on every value
*/a any Fire every a values, starting from the minimum
a-b any Fire on any value within the a-b range (a must be smaller than b)
a-b/c any Fire every c values within the a-b range
xth y day Fire on the x -th occurrence of weekday y within the month
last x day Fire on the last occurrence of weekday x within the month
last day Fire on the last day within the month
x,y,z any Fire on any matching expression; can combine any number of any of the above expressions
'''
scheduler.start()
示例7、
import time
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
def job():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
# mongodb存儲job
scheduler = BlockingScheduler()
client = MongoClient(host='127.0.0.1', port=27017)
store = MongoDBJobStore(collection='job', database='test', client=client)
scheduler.add_jobstore(store)
scheduler.add_job(job, 'interval', second=5)
scheduler.start()
APScheduler提供了許多不同的方式來配置排程器,你可以使用一個配置字典或者作為參數關鍵字的方式傳入。你也可以先建立排程器,再配置和添加作業,這樣你可以在不同的環境中得到更大的靈活性。
下面來看一個簡單的 BlockingScheduler 例子:
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 定義BlockingScheduler
sched = BlockingScheduler()
sched.add_job(job, 'interval', seconds=5)
sched.start()
上述代碼建立了一個 BlockingScheduler,并使用預設記憶體存儲和預設執行器。(預設選項分别是 MemoryJobStore 和 ThreadPoolExecutor,其中線程池的最大線程數為10)。配置完成後使用 start() 方法來啟動。
如果想要顯式設定 job store(使用mongo存儲)和 executor 可以這樣寫:
from datetime import datetime
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# MongoDB 參數
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)
# 輸出時間
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 存儲方式
jobstores = {
'mongo': MongoDBJobStore(collection='job', database='test', client=client),
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(10),
'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo')
scheduler.start()
trigger 規則
date
最基本的一種排程,作業隻會執行一次。它的參數如下:
run_date (datetime|str) – the date/time to run the job at
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
def my_job(text):
print(text)
# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text'])
# The 'date' trigger and datetime.now() as run_date are implicit
sched.add_job(my_job, args=['text'])
sched.start()
cron
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
from apscheduler.schedulers.blocking import BlockingScheduler
def job_function():
print("Hello World")
# BlockingScheduler
sched = BlockingScheduler()
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
sched.start()
interval
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def job_function():
print("Hello World")
# BlockingScheduler
sched = BlockingScheduler()
# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)
# The same as before, but starts on 2010-10-10 at 9:30 and stops on 2014-06-15 at 11:00
sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
sched.start()
根據項目需求需要實作三個定時任務:
1>定時更新微信token,需要2小時更新一次;
2>商品定時上線;
3>定時檢測背景服務是否存活;
Python實作定點與定時任務方式比較多,每個方式都有自己應用場景;下面來快速介紹Python中常用的定時任務實作方式:
1>循環+sleep;
2>線程子產品中Timer類;
3>schedule子產品;
4>定時架構:APScheduler
在開始之前先設定一個任務(這樣不用依賴外部環境):
1:定時或者定點監測CPU與記憶體使用率;
2:将時間,CPU,記憶體使用情況儲存到日志檔案;
先來實作系統監測功能:
準備工作:安裝psutil:pip install psutil
功能實作
#psutil:擷取系統資訊子產品,可以擷取CPU,記憶體,磁盤等的使用情況
import psutil
import time
import datetime
#logfile:監測資訊寫入檔案
def MonitorSystem(logfile = None):
#擷取cpu使用情況
cpuper = psutil.cpu_percent()
#擷取記憶體使用情況:系統記憶體大小,使用記憶體,有效記憶體,記憶體使用率
mem = psutil.virtual_memory()
#記憶體使用率
memper = mem.percent
#擷取目前時間
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
print(line)
if logfile:
logfile.write(line)
代碼運作結果:
2019-03-21 14:23:41 cpu:0.6%, mem:77.2%
接下來我們要實作定時監測,比如3s監測一下系統資源使用情況。
最簡單使用方式:sleep
這種方式最簡單,直接使用while+sleep就可以實作:
def loopMonitor():
while True:
MonitorSystem()
#2s檢查一次
time.sleep(3)
loopMonitor()
輸出結果:
2019-03-21 14:28:42 cpu:1.5%, mem:77.6%
2019-03-21 14:28:45 cpu:1.6%, mem:77.6%
2019-03-21 14:28:48 cpu:1.4%, mem:77.6%
2019-03-21 14:28:51 cpu:1.4%, mem:77.6%
2019-03-21 14:28:54 cpu:1.3%, mem:77.6%
這種方式存在問題:隻能處理單個定時任務。
又來了新任務:需要每秒監測網絡收發位元組,代碼實作如下:
def MonitorNetWork(logfile = None):
#擷取網絡收資訊
netinfo = psutil.net_io_counters()
#擷取目前時間
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'
print(line)
if logfile:
logfile.write(line)
MonitorNetWork()
代碼執行結果:
2019-03-21 14:47:21 bytessent=169752183, bytesrecv=1107900973
如果我們同時在while循環中監測兩個任務會有等待問題,不能每秒監測網絡情況。
Timer實作方式
timer最基本了解就是定時器,我們可以啟動多個定時任務,這些定時器任務是異步執行,是以不存在等待順序執行問題。
先來看Timer的基本使用:
主要方法:
Timer方法 說明
Timer(interval, function, args=None, kwargs=None) 建立定時器
cancel() 取消定時器
start() 使用線程方式執行
join(self, timeout=None) 等待線程執行結束
定時器隻能執行一次,如果需要重複執行,需要重新添加任務;
我們先來看基本使用:
from threading import Timer
#記錄目前時間
print(datetime.datetime.now())
#3S執行一次
sTimer = Timer(3, MonitorSystem)
#1S執行一次
nTimer = Timer(1, MonitorNetWork)
#使用線程方式執行
sTimer.start()
nTimer.start()
#等待結束
sTimer.join()
nTimer.join()
#記錄結束時間
print(datetime.datetime.now())
輸出結果:
2019-03-21 15:13:36.739798
2019-03-21 15:13:37 bytessent=171337324, bytesrecv=1109002349
2019-03-21 15:13:39 cpu:1.4%, mem:93.2%
2019-03-21 15:13:39.745187
可以看到,花費時間為3S,但是我們想要做的是每秒監控網絡狀态;如何處理。
Timer隻能執行一次,是以執行完成之後需要再次添加任務,我們對代碼進行修改:
from threading import Timer
import psutil
import time
import datetime
def MonitorSystem(logfile = None):
cpuper = psutil.cpu_percent()
mem = psutil.virtual_memory()
memper = mem.percent
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
print(line)
if logfile:
logfile.write(line)
#啟動定時器任務,每三秒執行一次
Timer(3, MonitorSystem).start()
def MonitorNetWork(logfile = None):
netinfo = psutil.net_io_counters()
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'
print(line)
if logfile:
logfile.write(line)
#啟動定時器任務,每秒執行一次
Timer(1, MonitorNetWork).start()
MonitorSystem()
MonitorNetWork()
執行結果:
2019-03-21 15:18:21 cpu:1.5%, mem:93.2%
2019-03-21 15:18:21 bytessent=171376522, bytesrecv=1109124678
2019-03-21 15:18:22 bytessent=171382215, bytesrecv=1109128294
2019-03-21 15:18:23 bytessent=171384278, bytesrecv=1109129702
2019-03-21 15:18:24 cpu:1.9%, mem:93.2%
2019-03-21 15:18:24 bytessent=171386341, bytesrecv=1109131110
2019-03-21 15:18:25 bytessent=171388527, bytesrecv=1109132600
2019-03-21 15:18:26 bytessent=171390590, bytesrecv=1109134008
從時間中可以看到,這兩個任務可以同時進行不存在等待問題。
Timer的實質是使用線程方式去執行任務,每次執行完後會銷毀,是以不必擔心資源問題。
排程子產品:schedule
schedule是一個第三方輕量級的任務排程子產品,可以按照秒,分,小時,日期或者自定義事件執行時間;
安裝方式:
pip install schedule
我們來看一個例子:
import datetime
import schedule
import time
def func():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :',ts)
def func2():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func2 time:',ts)
def tasklist():
#清空任務
schedule.clear()
#建立一個按秒間隔執行任務
schedule.every(1).seconds.do(func)
#建立一個按2秒間隔執行任務
schedule.every(2).seconds.do(func2)
#執行10S
for i in range(10):
schedule.run_pending()
time.sleep(1)
tasklist()
執行結果:
do func time : 2019-03-22 08:51:38
do func2 time: 2019-03-22 08:51:39
do func time : 2019-03-22 08:51:39
do func time : 2019-03-22 08:51:40
do func2 time: 2019-03-22 08:51:41
do func time : 2019-03-22 08:51:41
do func time : 2019-03-22 08:51:42
do func2 time: 2019-03-22 08:51:43
do func time : 2019-03-22 08:51:43
do func time : 2019-03-22 08:51:44
do func2 time: 2019-03-22 08:51:45
do func time : 2019-03-22 08:51:45
do func time : 2019-03-22 08:51:46
執行過程分析:
1>因為在jupyter下執行,是以先将schedule任務清空;
2>按時間間在schedule中隔添加任務;
3>這裡按照秒間隔添加func,按照兩秒間隔添加func2;
4>schedule添加任務後,需要查詢任務并執行任務;
5>為了防止占用資源,每秒查詢到點任務,然後順序執行;
第5個順序執行怎麼了解,我們修改func函數,裡面添加time.sleep(2)
然後隻執行func工作,輸出結果:
do func time : 2019-03-22 09:00:59
do func time : 2019-03-22 09:01:02
do func time : 2019-03-22 09:01:05
可以看到時間間隔為3S,為什麼不是1S?
因為這個按照順序執行,func休眠2S,循環任務查詢休眠1S,是以會存在這個問題。
在我們使用這種方式執行任務需要注意這種阻塞現象。
我們看下schedule子產品常用使用方法:
#schedule.every(1)建立Job, seconds.do(func)按秒間隔查詢并執行
schedule.every(1).seconds.do(func)
#添加任務按分執行
schedule.every(1).minutes.do(func)
#添加任務按天執行
schedule.every(1).days.do(func)
#添加任務按周執行
schedule.every().weeks.do(func)
#添加任務每周1執行,執行時間為下周一這一時刻時間
schedule.every().monday.do(func)
#每周1,1點15開始執行
schedule.every().monday.at("12:00").do(job)
這種方式局限性:如果工作任務回非常耗時就會影響其他任務執行。我們可以考慮使用并發機制配置這個子產品使用。
任務架構APScheduler
APScheduler是Python的一個定時任務架構,用于執行周期或者定時任務,
可以基于日期、時間間隔,及類似于Linux上的定時任務crontab類型的定時任務;
該該架構不僅可以添加、删除定時任務,還可以将任務存儲到資料庫中,實作任務的持久化,使用起來非常友善。
安裝方式:pip install apscheduler
apscheduler元件及簡單說明:
1>triggers(觸發器):觸發器包含排程邏輯,每一個作業有它自己的觸發器
2>job stores(作業存儲):用來存儲被排程的作業,預設的作業存儲器是簡單地把作業任務儲存在記憶體中,支援存儲到MongoDB,Redis資料庫中
3> executors(執行器):執行器用來執行定時任務,隻是将需要執行的任務放在新的線程或者線程池中運作
4>schedulers(排程器):排程器是将其它部分聯系在一起,對使用者提供接口,進行任務添加,設定,删除。
來看一個簡單例子:
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def func():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :',ts)
def func2():
#耗時2S
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func2 time:',ts)
time.sleep(2)
def dojob():
#建立排程器:BlockingScheduler
scheduler = BlockingScheduler()
#添加任務,時間間隔2S
scheduler.add_job(func, 'interval', seconds=2, id='test_job1')
#添加任務,時間間隔5S
scheduler.add_job(func2, 'interval', seconds=3, id='test_job2')
scheduler.start()
dojob()
輸出結果:
do func time : 2019-03-22 10:32:20
do func2 time: 2019-03-22 10:32:21
do func time : 2019-03-22 10:32:22
do func time : 2019-03-22 10:32:24
do func2 time: 2019-03-22 10:32:24
do func time : 2019-03-22 10:32:26
輸出結果中可以看到:任務就算是有延時,也不會影響其他任務執行。
APScheduler架構提供豐富接口去實作定時任務,可以去參考官方文檔去檢視使用方式。
最後選擇:
簡單總結上面四種定時定點任務實作:
1:循環+sleep方式适合簡答測試,
2:timer可以實作定時任務,但是對定點任務來說,需要檢查目前時間點;
3:schedule可以定點定時執行,但是需要在循環中檢測任務,而且存在阻塞;
4:APScheduler架構更加強大,可以直接在裡面添加定點與定時任務;
綜合考慮,決定使用APScheduler架構,實作簡單,隻需要直接建立任務,并将添加到排程器中即可。