天天看點

python 多種定時任務實作方法和舉例,包括sleep、Timer、schedule、APScheduler等

python 多種定時任務實作方法和舉例,包括sleep、Timer、schedule、APScheduler等,python有很多定時任務架構,包括調用同步方法和異步方法,主要整理一下,除此之外,還有很多其他的實作,例如 celery 等等。

while True: + sleep()

threading.Timer定時器

排程子產品schedule

任務架構APScheduler

1、while循環中使用sleep

缺點:不容易控制,而且是個阻塞函數

def timer(n):  
    ''''' 
    每n秒執行一次 
    '''  
    while True:    
        print(time.strftime('%Y-%m-%d %X',time.localtime()))    
        yourTask()  # 此處為要執行的任務    
        time.sleep(n)
           

2、schedule子產品

優點:可以管理和排程多個任務,可以進行控制

缺點:阻塞式函數

import schedule
import time
import datetime

def job1():
    print('Job1:每隔10秒執行一次的任務,每次執行2秒')
    print('Job1-startTime:%s' %(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(2)
    print('Job1-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')

def job2():
    print('Job2:每隔30秒執行一次,每次執行5秒')
    print('Job2-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(5)
    print('Job2-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job3():
    print('Job3:每隔1分鐘執行一次,每次執行10秒')
    print('Job3-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(10)
    print('Job3-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job4():
    print('Job4:每天下午17:49執行一次,每次執行20秒')
    print('Job4-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(20)
    print('Job4-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job5():
    print('Job5:每隔5秒到10秒執行一次,每次執行3秒')
    print('Job5-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(3)
    print('Job5-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


if __name__ == '__main__':
    schedule.every(10).seconds.do(job1)
    schedule.every(30).seconds.do(job2)
    schedule.every(1).minutes.do(job3)
    schedule.every().day.at('17:49').do(job4)
    schedule.every(5).to(10).seconds.do(job5)
    while True:
        schedule.run_pending()
           

3、Threading子產品中的Timer

優點:非阻塞

缺點:不易管理多個任務

from threading import Timer
import datetime
# 每隔兩秒執行一次任務
def printHello():
    print('TimeNow:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    t = Timer(2, printHello)
    t.start()

if __name__ == "__main__":
    printHello()
           

4、sched子產品

sched子產品實作了一個時間排程程式,該程式可以通過單線程執行來處理按照時間尺度進行排程的時間。

通過調用scheduler.enter(delay,priority,func,args)函數,可以将一個任務添加到任務隊列裡面,當指定的時間到了,就會執行任務(func函數)。

delay:任務的間隔時間。

priority:如果幾個任務被排程到相同的時間執行,将按照priority的增序執行這幾個任務。

func:要執行的任務函數

args:func的參數

import time, sched
import datetime

s = sched.scheduler(time.time, time.sleep)

def print_time(a='default'):
    print('Now Time:',datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),a)

def print_some_times():
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    s.enter(10,1,print_time)
    s.enter(5,2,print_time,argument=('positional',))
    s.run()
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print_some_times()
           

執行結果為:

2018-09-20 16:25:03
Now Time: 2018-09-20 16:25:08 positional
Now Time: 2018-09-20 16:25:13 default
2018-09-20 16:25:13

Process finished with exit code 0
           

按順序執行任務:

import time, sched
import datetime

s = sched.scheduler(time.time, time.sleep)


def event_fun1():
    print("func1 Time:", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


def perform1(inc):
    s.enter(inc, 0, perform1, (inc,))
    event_fun1()


def event_fun2():
    print("func2 Time:", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


def perform2(inc):
    s.enter(inc, 0, perform2, (inc,))
    event_fun2()


def mymain(func, inc=2):
    if func == "1":
        s.enter(0, 0, perform1, (10,))# 每隔10秒執行一次perform1
    if func == "2":
        s.enter(0, 0, perform2, (20,))# 每隔20秒執行一次perform2
if __name__ == '__main__':
    mymain('1')
    mymain('2')
    s.run()
           

執行結果為:

func1 Time: 2018-09-20 16:30:28
func2 Time: 2018-09-20 16:30:28
func1 Time: 2018-09-20 16:30:38
func2 Time: 2018-09-20 16:30:48
func1 Time: 2018-09-20 16:30:48
func1 Time: 2018-09-20 16:30:58
func2 Time: 2018-09-20 16:31:08
func1 Time: 2018-09-20 16:31:08
func1 Time: 2018-09-20 16:31:18
func2 Time: 2018-09-20 16:31:28
func1 Time: 2018-09-20 16:31:28
func1 Time: 2018-09-20 16:31:38
           

s.run()會阻塞目前線程的執行

可以用

t=threading.Thread(target=s.run)

t.start()

也可以用s.cancal(action)來取消sched中的某個action

5、定時架構APScheduler

APSScheduler是python的一個定時任務架構,它提供了基于日期date、固定時間間隔interval、以及linux上的crontab類型的定時任務。該礦機不僅可以添加、删除定時任務,還可以将任務存儲到資料庫中、實作任務的持久化。

APScheduler是一個 Python 定時任務架構,使用起來十分友善。提供了基于日期、固定時間間隔以及 crontab 類型的任務,并且可以持久化任務、并以 daemon 方式運作應用。

使用 APScheduler 需要安裝

pip install apscheduler
           

APScheduler有四種元件:

triggers(觸發器):觸發器包含排程邏輯,每一個作業有它自己的觸發器,用于決定接下來哪一個作業會運作,除了他們自己初始化配置外,觸發器完全是無狀态的。

job stores(作業存儲):用來存儲被排程的作業,預設的作業存儲器是簡單地把作業任務儲存在記憶體中,其它作業存儲器可以将任務作業儲存到各種資料庫中,支援MongoDB、Redis、SQLAlchemy存儲方式。當對作業任務進行持久化存儲的時候,作業的資料将被序列化,重新讀取作業時在反序列化。

executors(執行器):執行器用來執行定時任務,隻是将需要執行的任務放在新的線程或者線程池中運作。當作業任務完成時,執行器将會通知排程器。對于執行器,預設情況下選擇ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任務如比較消耗CPU的任務則可以選擇ProcessPoolExecutor,當然根據根據實際需求可以同時使用兩種執行器。

schedulers(排程器):排程器是将其它部分聯系在一起,一般在應用程式中隻有一個排程器,應用開發者不會直接操作觸發器、任務存儲以及執行器,相反排程器提供了處理的接口。通過排程器完成任務的存儲以及執行器的配置操作,如可以添加。修改、移除任務作業。

APScheduler提供了七種排程器:

BlockingScheduler:适合于隻在程序中運作單個任務的情況,通常在排程器是你唯一要運作的東西時使用。

BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用。

AsyncIOScheduler:适合于使用asyncio異步架構的情況

GeventScheduler: 适合于使用gevent架構的情況

TornadoScheduler: 适合于使用Tornado架構的應用

TwistedScheduler: 适合使用Twisted架構的應用

QtScheduler: 适合使用QT的情況

APScheduler提供了四種存儲方式:

MemoryJobStore

sqlalchemy

mongodb

redis

APScheduler提供了三種任務觸發器:

data:固定日期觸發器:任務隻運作一次,運作完畢自動清除;若錯過指定運作時間,任務不會被建立

interval:時間間隔觸發器

cron:cron風格的任務觸發

示例1、
import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


if __name__ == '__main__':
    # 該示例代碼生成了一個BlockingScheduler排程器,使用了預設的任務存儲MemoryJobStore,以及預設的執行器ThreadPoolExecutor,并且最大線程數為10。
    
    # BlockingScheduler:在程序中運作單個任務,排程器是唯一運作的東西
    scheduler = BlockingScheduler()
    # 采用阻塞的方式

    # 采用固定時間間隔(interval)的方式,每隔5秒鐘執行一次
    scheduler.add_job(job, 'interval', seconds=5)
    
    scheduler.start()

示例2、
import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    
if __name__ == '__main__':
    # BlockingScheduler:在程序中運作單個任務,排程器是唯一運作的東西
    scheduler = BlockingScheduler()
    # 采用阻塞的方式
    
    # 采用date的方式,在特定時間隻執行一次
    scheduler.add_job(job, 'date', run_date='2018-09-21 15:30:00')

    scheduler.start() 

示例3、
import time
from apscheduler.schedulers.background import BackgroundScheduler

def job():
    print('job:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

if __name__ == '__main__':
    # BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用。
    scheduler = BackgroundScheduler()
    # 采用非阻塞的方式

    # 采用固定時間間隔(interval)的方式,每隔3秒鐘執行一次
    scheduler.add_job(job, 'interval', seconds=3)
    # 這是一個獨立的線程
    scheduler.start()
    
    # 其他任務是獨立的線程
    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
運作結果為:

main-start: 2018-09-21 15:54:28
main-end: 2018-09-21 15:54:30
main-start: 2018-09-21 15:54:30
job: 2018-09-21 15:54:31
main-end: 2018-09-21 15:54:32
main-start: 2018-09-21 15:54:32
main-end: 2018-09-21 15:54:34
main-start: 2018-09-21 15:54:34
job: 2018-09-21 15:54:34
main-end: 2018-09-21 15:54:36
main-start: 2018-09-21 15:54:36

示例4、
import time
from apscheduler.schedulers.background import BackgroundScheduler

def job():
    print('job:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

if __name__ == '__main__':
    # BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用。
    scheduler = BackgroundScheduler()
    # 采用非阻塞的方式

    # 采用date的方式,在特定時間裡執行一次
    scheduler.add_job(job, 'date', run_date='2018-09-21 15:53:00')
    # 這是一個獨立的線程
    scheduler.start()

    # 其他任務是獨立的線程
    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
運作結果為:

main-start: 2018-09-21 15:52:57
main-end: 2018-09-21 15:52:59
main-start: 2018-09-21 15:52:59
job: 2018-09-21 15:53:00
main-end: 2018-09-21 15:53:01

示例5、
import time
from apscheduler.schedulers.background import BackgroundScheduler


def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


if __name__ == '__main__':
    # BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用
    scheduler = BackgroundScheduler()
    # 采用非阻塞的方式

    # 采用corn的方式
    scheduler.add_job(job, 'cron', day_of_week='fri', second='*/5')
    '''
    year (int|str) – 4-digit year
    month (int|str) – month (1-12)
    day (int|str) – day of the (1-31)
    week (int|str) – ISO week (1-53)
    day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
    hour (int|str) – hour (0-23)
    minute (int|str) – minute (0-59)
    econd (int|str) – second (0-59)
            
    start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
    end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
    timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
        
    *    any    Fire on every value
    */a    any    Fire every a values, starting from the minimum
    a-b    any    Fire on any value within the a-b range (a must be smaller than b)
    a-b/c    any    Fire every c values within the a-b range
    xth y    day    Fire on the x -th occurrence of weekday y within the month
    last x    day    Fire on the last occurrence of weekday x within the month
    last    day    Fire on the last day within the month
    x,y,z    any    Fire on any matching expression; can combine any number of any of the above expressions
    '''
    scheduler.start()
    
    # 其他任務是獨立的線程
    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
運作結果:

main-start: 2018-09-21 16:02:55
main-end: 2018-09-21 16:02:57
main-start: 2018-09-21 16:02:57
main-end: 2018-09-21 16:02:59
main-start: 2018-09-21 16:02:59
2018-09-21 16:03:00
main-end: 2018-09-21 16:03:01
main-start: 2018-09-21 16:03:01
main-end: 2018-09-21 16:03:03
main-start: 2018-09-21 16:03:03
2018-09-21 16:03:05
main-end: 2018-09-21 16:03:05
main-start: 2018-09-21 16:03:05
main-end: 2018-09-21 16:03:07
main-start: 2018-09-21 16:03:07
main-end: 2018-09-21 16:03:09
main-start: 2018-09-21 16:03:09
2018-09-21 16:03:10

示例6、
import time
from apscheduler.schedulers.background import BackgroundScheduler


def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


if __name__ == '__main__':
    # BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用
    scheduler = BackgroundScheduler()
    # 采用阻塞的方式

    # 采用corn的方式
    scheduler.add_job(job, 'cron', day_of_week='fri', second='*/5')
    '''
    year (int|str) – 4-digit year
    month (int|str) – month (1-12)
    day (int|str) – day of the (1-31)
    week (int|str) – ISO week (1-53)
    day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
    hour (int|str) – hour (0-23)
    minute (int|str) – minute (0-59)
    econd (int|str) – second (0-59)
            
    start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
    end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
    timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
        
    *    any    Fire on every value
    */a    any    Fire every a values, starting from the minimum
    a-b    any    Fire on any value within the a-b range (a must be smaller than b)
    a-b/c    any    Fire every c values within the a-b range
    xth y    day    Fire on the x -th occurrence of weekday y within the month
    last x    day    Fire on the last occurrence of weekday x within the month
    last    day    Fire on the last day within the month
    x,y,z    any    Fire on any matching expression; can combine any number of any of the above expressions
    '''

    scheduler.start()

示例7、
import time
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore

def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
    # mongodb存儲job
    scheduler = BlockingScheduler()
    client = MongoClient(host='127.0.0.1', port=27017)
    store = MongoDBJobStore(collection='job', database='test', client=client)
    scheduler.add_jobstore(store)
    scheduler.add_job(job, 'interval', second=5)
    scheduler.start()
           

APScheduler提供了許多不同的方式來配置排程器,你可以使用一個配置字典或者作為參數關鍵字的方式傳入。你也可以先建立排程器,再配置和添加作業,這樣你可以在不同的環境中得到更大的靈活性。

下面來看一個簡單的 BlockingScheduler 例子:

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 定義BlockingScheduler
sched = BlockingScheduler()
sched.add_job(job, 'interval', seconds=5)
sched.start()
           

上述代碼建立了一個 BlockingScheduler,并使用預設記憶體存儲和預設執行器。(預設選項分别是 MemoryJobStore 和 ThreadPoolExecutor,其中線程池的最大線程數為10)。配置完成後使用 start() 方法來啟動。

如果想要顯式設定 job store(使用mongo存儲)和 executor 可以這樣寫:

from datetime import datetime
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# MongoDB 參數
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)
# 輸出時間
def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 存儲方式
jobstores = {
    'mongo': MongoDBJobStore(collection='job', database='test', client=client),
    'default': MemoryJobStore()
}
executors = {
    'default': ThreadPoolExecutor(10),
    'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo')
scheduler.start()
           

trigger 規則

date

最基本的一種排程,作業隻會執行一次。它的參數如下:

run_date (datetime|str) – the date/time to run the job at

timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already

from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
def my_job(text):
    print(text)
# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text'])
# The 'date' trigger and datetime.now() as run_date are implicit
sched.add_job(my_job, args=['text'])
sched.start()
           

cron

year (int|str) – 4-digit year

month (int|str) – month (1-12)

day (int|str) – day of the (1-31)

week (int|str) – ISO week (1-53)

day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)

hour (int|str) – hour (0-23)

minute (int|str) – minute (0-59)

second (int|str) – second (0-59)

start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)

end_date (datetime|str) – latest possible date/time to trigger on (inclusive)

timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)

from apscheduler.schedulers.blocking import BlockingScheduler

def job_function():
    print("Hello World")
# BlockingScheduler
sched = BlockingScheduler()
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
sched.start()
           

interval

weeks (int) – number of weeks to wait

days (int) – number of days to wait

hours (int) – number of hours to wait

minutes (int) – number of minutes to wait

seconds (int) – number of seconds to wait

start_date (datetime|str) – starting point for the interval calculation

end_date (datetime|str) – latest possible date/time to trigger on

timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job_function():
    print("Hello World")
# BlockingScheduler
sched = BlockingScheduler()
# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)
# The same as before, but starts on 2010-10-10 at 9:30 and stops on 2014-06-15 at 11:00
sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
sched.start()
           

根據項目需求需要實作三個定時任務:

1>定時更新微信token,需要2小時更新一次;

2>商品定時上線;

3>定時檢測背景服務是否存活;

Python實作定點與定時任務方式比較多,每個方式都有自己應用場景;下面來快速介紹Python中常用的定時任務實作方式:

1>循環+sleep;

2>線程子產品中Timer類;

3>schedule子產品;

4>定時架構:APScheduler

在開始之前先設定一個任務(這樣不用依賴外部環境):

1:定時或者定點監測CPU與記憶體使用率;

2:将時間,CPU,記憶體使用情況儲存到日志檔案;

先來實作系統監測功能:

準備工作:安裝psutil:pip install psutil

功能實作

#psutil:擷取系統資訊子產品,可以擷取CPU,記憶體,磁盤等的使用情況
import psutil
import time
import datetime
#logfile:監測資訊寫入檔案
def MonitorSystem(logfile = None):
    #擷取cpu使用情況
    cpuper = psutil.cpu_percent()
    #擷取記憶體使用情況:系統記憶體大小,使用記憶體,有效記憶體,記憶體使用率
    mem = psutil.virtual_memory()
    #記憶體使用率
    memper = mem.percent
    #擷取目前時間
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
    print(line)
    if logfile:
        logfile.write(line)
           

代碼運作結果:

2019-03-21 14:23:41 cpu:0.6%, mem:77.2%
           

接下來我們要實作定時監測,比如3s監測一下系統資源使用情況。

最簡單使用方式:sleep

這種方式最簡單,直接使用while+sleep就可以實作:

def loopMonitor():
    while True:
        MonitorSystem()
        #2s檢查一次
        time.sleep(3)
loopMonitor()
           

輸出結果:

2019-03-21 14:28:42 cpu:1.5%, mem:77.6%
2019-03-21 14:28:45 cpu:1.6%, mem:77.6%
2019-03-21 14:28:48 cpu:1.4%, mem:77.6%
2019-03-21 14:28:51 cpu:1.4%, mem:77.6%
2019-03-21 14:28:54 cpu:1.3%, mem:77.6%
           

這種方式存在問題:隻能處理單個定時任務。

又來了新任務:需要每秒監測網絡收發位元組,代碼實作如下:

def MonitorNetWork(logfile = None):
    #擷取網絡收資訊
    netinfo = psutil.net_io_counters()
    #擷取目前時間
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'
    print(line)
    if logfile:
        logfile.write(line)
MonitorNetWork()
           

代碼執行結果:

2019-03-21 14:47:21 bytessent=169752183, bytesrecv=1107900973

如果我們同時在while循環中監測兩個任務會有等待問題,不能每秒監測網絡情況。

Timer實作方式

timer最基本了解就是定時器,我們可以啟動多個定時任務,這些定時器任務是異步執行,是以不存在等待順序執行問題。

先來看Timer的基本使用:

主要方法:

Timer方法 說明

Timer(interval, function, args=None, kwargs=None) 建立定時器

cancel() 取消定時器

start() 使用線程方式執行

join(self, timeout=None) 等待線程執行結束

定時器隻能執行一次,如果需要重複執行,需要重新添加任務;

我們先來看基本使用:

from threading import Timer
#記錄目前時間
print(datetime.datetime.now())
#3S執行一次
sTimer = Timer(3, MonitorSystem)
#1S執行一次
nTimer = Timer(1, MonitorNetWork)
#使用線程方式執行
sTimer.start()
nTimer.start()
#等待結束
sTimer.join()
nTimer.join()
#記錄結束時間
print(datetime.datetime.now())
           

輸出結果:

2019-03-21 15:13:36.739798
2019-03-21 15:13:37 bytessent=171337324, bytesrecv=1109002349
2019-03-21 15:13:39 cpu:1.4%, mem:93.2%
2019-03-21 15:13:39.745187
           

可以看到,花費時間為3S,但是我們想要做的是每秒監控網絡狀态;如何處理。

Timer隻能執行一次,是以執行完成之後需要再次添加任務,我們對代碼進行修改:

from threading import Timer
import psutil
import time
import datetime
def MonitorSystem(logfile = None):
    cpuper = psutil.cpu_percent()
    mem = psutil.virtual_memory()
    memper = mem.percent
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
    print(line)
    if logfile:
        logfile.write(line)
    #啟動定時器任務,每三秒執行一次
    Timer(3, MonitorSystem).start()

def MonitorNetWork(logfile = None):
    netinfo = psutil.net_io_counters()
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'
    print(line)
    if logfile:
        logfile.write(line)
    #啟動定時器任務,每秒執行一次
    Timer(1, MonitorNetWork).start()
MonitorSystem()
MonitorNetWork()
           

執行結果:

2019-03-21 15:18:21 cpu:1.5%, mem:93.2%
2019-03-21 15:18:21 bytessent=171376522, bytesrecv=1109124678
2019-03-21 15:18:22 bytessent=171382215, bytesrecv=1109128294
2019-03-21 15:18:23 bytessent=171384278, bytesrecv=1109129702
2019-03-21 15:18:24 cpu:1.9%, mem:93.2%
2019-03-21 15:18:24 bytessent=171386341, bytesrecv=1109131110
2019-03-21 15:18:25 bytessent=171388527, bytesrecv=1109132600
2019-03-21 15:18:26 bytessent=171390590, bytesrecv=1109134008
           

從時間中可以看到,這兩個任務可以同時進行不存在等待問題。

Timer的實質是使用線程方式去執行任務,每次執行完後會銷毀,是以不必擔心資源問題。

排程子產品:schedule

schedule是一個第三方輕量級的任務排程子產品,可以按照秒,分,小時,日期或者自定義事件執行時間;

安裝方式:

pip install schedule
           

我們來看一個例子:

import datetime
import schedule
import time
def func():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func  time :',ts)
def func2():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func2 time:',ts)
def tasklist():
    #清空任務
    schedule.clear()
    #建立一個按秒間隔執行任務
    schedule.every(1).seconds.do(func)
    #建立一個按2秒間隔執行任務
    schedule.every(2).seconds.do(func2)
    #執行10S
    for i in range(10):
        schedule.run_pending()
        time.sleep(1)
tasklist()
           

執行結果:

do func  time : 2019-03-22 08:51:38
do func2 time: 2019-03-22 08:51:39
do func  time : 2019-03-22 08:51:39
do func  time : 2019-03-22 08:51:40
do func2 time: 2019-03-22 08:51:41
do func  time : 2019-03-22 08:51:41
do func  time : 2019-03-22 08:51:42
do func2 time: 2019-03-22 08:51:43
do func  time : 2019-03-22 08:51:43
do func  time : 2019-03-22 08:51:44
do func2 time: 2019-03-22 08:51:45
do func  time : 2019-03-22 08:51:45
do func  time : 2019-03-22 08:51:46
           

執行過程分析:

1>因為在jupyter下執行,是以先将schedule任務清空;

2>按時間間在schedule中隔添加任務;

3>這裡按照秒間隔添加func,按照兩秒間隔添加func2;

4>schedule添加任務後,需要查詢任務并執行任務;

5>為了防止占用資源,每秒查詢到點任務,然後順序執行;

第5個順序執行怎麼了解,我們修改func函數,裡面添加time.sleep(2)

然後隻執行func工作,輸出結果:

do func  time : 2019-03-22 09:00:59
do func  time : 2019-03-22 09:01:02
do func  time : 2019-03-22 09:01:05
           

可以看到時間間隔為3S,為什麼不是1S?

因為這個按照順序執行,func休眠2S,循環任務查詢休眠1S,是以會存在這個問題。

在我們使用這種方式執行任務需要注意這種阻塞現象。

我們看下schedule子產品常用使用方法:

#schedule.every(1)建立Job, seconds.do(func)按秒間隔查詢并執行
schedule.every(1).seconds.do(func)
#添加任務按分執行
schedule.every(1).minutes.do(func)
#添加任務按天執行
schedule.every(1).days.do(func)
#添加任務按周執行
schedule.every().weeks.do(func)
#添加任務每周1執行,執行時間為下周一這一時刻時間
schedule.every().monday.do(func)
#每周1,1點15開始執行
schedule.every().monday.at("12:00").do(job)
           

這種方式局限性:如果工作任務回非常耗時就會影響其他任務執行。我們可以考慮使用并發機制配置這個子產品使用。

任務架構APScheduler

APScheduler是Python的一個定時任務架構,用于執行周期或者定時任務,

可以基于日期、時間間隔,及類似于Linux上的定時任務crontab類型的定時任務;

該該架構不僅可以添加、删除定時任務,還可以将任務存儲到資料庫中,實作任務的持久化,使用起來非常友善。

安裝方式:pip install apscheduler

apscheduler元件及簡單說明:

1>triggers(觸發器):觸發器包含排程邏輯,每一個作業有它自己的觸發器

2>job stores(作業存儲):用來存儲被排程的作業,預設的作業存儲器是簡單地把作業任務儲存在記憶體中,支援存儲到MongoDB,Redis資料庫中

3> executors(執行器):執行器用來執行定時任務,隻是将需要執行的任務放在新的線程或者線程池中運作

4>schedulers(排程器):排程器是将其它部分聯系在一起,對使用者提供接口,進行任務添加,設定,删除。

來看一個簡單例子:

import time
from apscheduler.schedulers.blocking import BlockingScheduler
def func():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func  time :',ts)
    
def func2():
    #耗時2S
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func2 time:',ts)
    time.sleep(2)
    
def dojob():
    #建立排程器:BlockingScheduler
    scheduler = BlockingScheduler()
    #添加任務,時間間隔2S
    scheduler.add_job(func, 'interval', seconds=2, id='test_job1')
    #添加任務,時間間隔5S
    scheduler.add_job(func2, 'interval', seconds=3, id='test_job2')
    scheduler.start()
dojob()
           

輸出結果:

do func  time : 2019-03-22 10:32:20
do func2 time: 2019-03-22 10:32:21
do func  time : 2019-03-22 10:32:22
do func  time : 2019-03-22 10:32:24
do func2 time: 2019-03-22 10:32:24
do func  time : 2019-03-22 10:32:26
           

輸出結果中可以看到:任務就算是有延時,也不會影響其他任務執行。

APScheduler架構提供豐富接口去實作定時任務,可以去參考官方文檔去檢視使用方式。

最後選擇:

簡單總結上面四種定時定點任務實作:

1:循環+sleep方式适合簡答測試,

2:timer可以實作定時任務,但是對定點任務來說,需要檢查目前時間點;

3:schedule可以定點定時執行,但是需要在循環中檢測任務,而且存在阻塞;

4:APScheduler架構更加強大,可以直接在裡面添加定點與定時任務;

綜合考慮,決定使用APScheduler架構,實作簡單,隻需要直接建立任務,并将添加到排程器中即可。