天天看點

python定時任務_Python3-定時任務四種實作方式

老貓最近做一個小程式開發任務,主要負責背景部分開發;根據項目需求老貓需要實作三個定時任務:

1>定時更新微信token,需要2小時更新一次;

2>商品定時上線;

3>定時檢測背景服務是否存活;

老貓使用Python去實作這三個任務,這裡需要使用定時相關知識點;

Python實作定點與定時任務方式比較多,老貓找到下面四中實作方式,每個方式都有自己應用場景;下面老貓來快速介紹Python中常用的定時任務實作方式:

1>循環+sleep;

2>線程子產品中Timer類;

3>schedule子產品;

4>定時架構:APScheduler

在開始之前先設定一個任務(這樣不用依賴外部環境):

1:定時或者定點監測CPU與記憶體使用率;

2:将時間,CPU,記憶體使用情況儲存到日志檔案;

先來實作系統監測功能:

準備工作:安裝psutil:pip install psutil

功能實作

#psutil:擷取系統資訊子產品,可以擷取CPU,記憶體,磁盤等的使用情況

import psutil

import time

import datetime

#logfile:監測資訊寫入檔案

def MonitorSystem(logfile = None):

#擷取cpu使用情況

cpuper = psutil.cpu_percent()

#擷取記憶體使用情況:系統記憶體大小,使用記憶體,有效記憶體,記憶體使用率

mem = psutil.virtual_memory()

#記憶體使用率

memper = mem.percent

#擷取目前時間

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

line = f'{ts} cpu:{cpuper}%, mem:{memper}%'

print(line)

if logfile:

logfile.write(line)

代碼運作結果:

2019-03-21 14:23:41 cpu:0.6%, mem:77.2%

接下來我們要實作定時監測,比如3s監測一下系統資源使用情況。

最簡單使用方式:sleep

這種方式最簡單,直接使用while+sleep就可以實作:

def loopMonitor():

while True:

MonitorSystem()

#2s檢查一次

time.sleep(3)

loopMonitor()

輸出結果:

2019-03-21 14:28:42 cpu:1.5%, mem:77.6%

2019-03-21 14:28:45 cpu:1.6%, mem:77.6%

2019-03-21 14:28:48 cpu:1.4%, mem:77.6%

2019-03-21 14:28:51 cpu:1.4%, mem:77.6%

2019-03-21 14:28:54 cpu:1.3%, mem:77.6%

這種方式存在問題:隻能處理單個定時任務。

又來了新任務:需要每秒監測網絡收發位元組,代碼實作如下:

def MonitorNetWork(logfile = None):

#擷取網絡收資訊

netinfo = psutil.net_io_counters()

#擷取目前時間

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'

print(line)

if logfile:

logfile.write(line)

MonitorNetWork()

代碼執行結果:

2019-03-21 14:47:21 bytessent=169752183, bytesrecv=1107900973

如果我們同時在while循環中監測兩個任務會有等待問題,不能每秒監測網絡情況。

Timer實作方式

timer最基本了解就是定時器,我們可以啟動多個定時任務,這些定時器任務是異步執行,是以不存在等待順序執行問題。

先來看Timer的基本使用:

導入:from threading import Timer

主要方法:

Timer方法

說明

Timer(interval, function, args=None, kwargs=None)

建立定時器

cancel()

取消定時器

start()

使用線程方式執行

join(self, timeout=None)

等待線程執行結束

定時器隻能執行一次,如果需要重複執行,需要重新添加任務;

我們先來看基本使用:

from threading import Timer

#記錄目前時間

print(datetime.datetime.now())

#3S執行一次

sTimer = Timer(3, MonitorSystem)

#1S執行一次

nTimer = Timer(1, MonitorNetWork)

#使用線程方式執行

sTimer.start()

nTimer.start()

#等待結束

sTimer.join()

nTimer.join()

#記錄結束時間

print(datetime.datetime.now())

輸出結果:

2019-03-21 15:13:36.739798

2019-03-21 15:13:37 bytessent=171337324, bytesrecv=1109002349

2019-03-21 15:13:39 cpu:1.4%, mem:93.2%

2019-03-21 15:13:39.745187

可以看到,花費時間為3S,但是我們想要做的是每秒監控網絡狀态;如何處理。

Timer隻能執行一次,是以執行完成之後需要再次添加任務,我們對代碼進行修改:

from threading import Timer

import psutil

import time

import datetime

def MonitorSystem(logfile = None):

cpuper = psutil.cpu_percent()

mem = psutil.virtual_memory()

memper = mem.percent

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

line = f'{ts} cpu:{cpuper}%, mem:{memper}%'

print(line)

if logfile:

logfile.write(line)

#啟動定時器任務,每三秒執行一次

Timer(3, MonitorSystem).start()

def MonitorNetWork(logfile = None):

netinfo = psutil.net_io_counters()

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'

print(line)

if logfile:

logfile.write(line)

#啟動定時器任務,每秒執行一次

Timer(1, MonitorNetWork).start()

MonitorSystem()

MonitorNetWork()

執行結果:

2019-03-21 15:18:21 cpu:1.5%, mem:93.2%

2019-03-21 15:18:21 bytessent=171376522, bytesrecv=1109124678

2019-03-21 15:18:22 bytessent=171382215, bytesrecv=1109128294

2019-03-21 15:18:23 bytessent=171384278, bytesrecv=1109129702

2019-03-21 15:18:24 cpu:1.9%, mem:93.2%

2019-03-21 15:18:24 bytessent=171386341, bytesrecv=1109131110

2019-03-21 15:18:25 bytessent=171388527, bytesrecv=1109132600

2019-03-21 15:18:26 bytessent=171390590, bytesrecv=1109134008

從時間中可以看到,這兩個任務可以同時進行不存在等待問題。

Timer的實質是使用線程方式去執行任務,每次執行完後會銷毀,是以不必擔心資源問題。

排程子產品:schedule

schedule是一個第三方輕量級的任務排程子產品,可以按照秒,分,小時,日期或者自定義事件執行時間;

安裝方式:

pip install schedule

我們來看一個例子:

import datetime

import schedule

import time

def func():

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

print('do func time :',ts)

def func2():

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

print('do func2 time:',ts)

def tasklist():

#清空任務

schedule.clear()

#建立一個按秒間隔執行任務

schedule.every(1).seconds.do(func)

#建立一個按2秒間隔執行任務

schedule.every(2).seconds.do(func2)

#執行10S

for i in range(10):

schedule.run_pending()

time.sleep(1)

tasklist()

執行結果:

do func time : 2019-03-22 08:51:38

do func2 time: 2019-03-22 08:51:39

do func time : 2019-03-22 08:51:39

do func time : 2019-03-22 08:51:40

do func2 time: 2019-03-22 08:51:41

do func time : 2019-03-22 08:51:41

do func time : 2019-03-22 08:51:42

do func2 time: 2019-03-22 08:51:43

do func time : 2019-03-22 08:51:43

do func time : 2019-03-22 08:51:44

do func2 time: 2019-03-22 08:51:45

do func time : 2019-03-22 08:51:45

do func time : 2019-03-22 08:51:46

執行過程分析:

>1>因為老貓在jupyter下執行,是以先将schedule任務清空;

>2>按時間間在schedule中隔添加任務;

>3>老貓這裡按照秒間隔添加func,按照兩秒間隔添加func2;

>4>schedule添加任務後,需要查詢任務并執行任務;

>5>為了防止占用資源,每秒查詢到點任務,然後順序執行;

第5個順序執行怎麼了解,我們修改func函數,裡面添加time.sleep(2)

然後隻執行func工作,輸出結果:

do func time : 2019-03-22 09:00:59

do func time : 2019-03-22 09:01:02

do func time : 2019-03-22 09:01:05

可以看到時間間隔為3S,為什麼不是1S?

因為這個按照順序執行,func休眠2S,循環任務查詢休眠1S,是以會存在這個問題。

在我們使用這種方式執行任務需要注意這種阻塞現象。

我們看下schedule子產品常用使用方法:

#schedule.every(1)建立Job, seconds.do(func)按秒間隔查詢并執行

schedule.every(1).seconds.do(func)

#添加任務按分執行

schedule.every(1).minutes.do(func)

#添加任務按天執行

schedule.every(1).days.do(func)

#添加任務按周執行

schedule.every().weeks.do(func)

#添加任務每周1執行,執行時間為下周一這一時刻時間

schedule.every().monday.do(func)

#每周1,1點15開始執行

schedule.every().monday.at("12:00").do(job)

這種方式局限性:如果工作任務回非常耗時就會影響其他任務執行。我們可以考慮使用并發機制配置這個子產品使用。

任務架構APScheduler

APScheduler是Python的一個定時任務架構,用于執行周期或者定時任務,

可以基于日期、時間間隔,及類似于Linux上的定時任務crontab類型的定時任務;

該該架構不僅可以添加、删除定時任務,還可以将任務存儲到資料庫中,實作任務的持久化,使用起來非常友善。

安裝方式:pip install apscheduler

apscheduler元件及簡單說明:

1>triggers(觸發器):觸發器包含排程邏輯,每一個作業有它自己的觸發器

2>job stores(作業存儲):用來存儲被排程的作業,預設的作業存儲器是簡單地把作業任務儲存在記憶體中,支援存儲到MongoDB,Redis資料庫中

3> executors(執行器):執行器用來執行定時任務,隻是将需要執行的任務放在新的線程或者線程池中運作

4>schedulers(排程器):排程器是将其它部分聯系在一起,對使用者提供接口,進行任務添加,設定,删除。

來看一個簡單例子:

import time

from apscheduler.schedulers.blocking import BlockingScheduler

def func():

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

print('do func time :',ts)

def func2():

#耗時2S

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

print('do func2 time:',ts)

time.sleep(2)

def dojob():

#建立排程器:BlockingScheduler

scheduler = BlockingScheduler()

#添加任務,時間間隔2S

scheduler.add_job(func, 'interval', seconds=2, id='test_job1')

#添加任務,時間間隔5S

scheduler.add_job(func2, 'interval', seconds=3, id='test_job2')

scheduler.start()

dojob()

輸出結果:

do func time : 2019-03-22 10:32:20

do func2 time: 2019-03-22 10:32:21

do func time : 2019-03-22 10:32:22

do func time : 2019-03-22 10:32:24

do func2 time: 2019-03-22 10:32:24

do func time : 2019-03-22 10:32:26

輸出結果中可以看到:任務就算是有延時,也不會影響其他任務執行。

APScheduler架構提供豐富接口去實作定時任務,可以去參考官方文檔去檢視使用方式。

最後選擇:

老貓簡單總結上面四種定時定點任務實作:

1:循環+sleep方式适合簡答測試,

2:timer可以實作定時任務,但是對定點任務來說,需要檢查目前時間點;

3:schedule可以定點定時執行,但是需要在循環中檢測任務,而且存在阻塞;

4:APScheduler架構更加強大,可以直接在裡面添加定點與定時任務;

綜合考慮,老貓決定使用APScheduler架構,實作簡單,隻需要直接建立任務,并将添加到排程器中即可。