天天看点

Python利用APScheduler实现定时任务框架

APScheduler是一款功能非常强大的定时任务框架。利用APScheduler框架我们可以很方便实现一个基于Python的定时任务系统

1,安装

pip install apscheduler
           

2,简单调用(一次性调用任务)

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def aps_test():
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好')

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=2))
scheduler.start()
           

启动后两秒,执行任务aps_test()

3,在flask中使用APScheduler(也是一次调用性任务)

from flask import Flask,jsonify
from flask_apscheduler import APScheduler
import datetime
import random

app = Flask(__name__)
queue = []

class Config(object):
    JOBS = [
        {
            'id': 'jobjob',
            # 'func': 'jobs:job1',
            'func': '__main__:job1',
            'args': (1, 2),
            'next_run_time':datetime.datetime.now() + datetime.timedelta(seconds=10)
        }
    ]
    SCHEDULER_API_ENABLED = True

def job1(a, b):
    global queue
    while True:
        if len(queue) > 10:
            queue = []
        else:
            ranInfo = random.randint(1, 7)  # 随机1-7
            queue.append(ranInfo)

@app.route("/getImgInfo")
def getImgInfo():
    global queue
    return jsonify({'status': 0, 'errmsg': queue})

if __name__ == '__main__':
    app.config.from_object(Config())
    scheduler = APScheduler()
    # it is also possible to enable the API directly
    # scheduler.api_enabled = True
    scheduler.init_app(app)
    scheduler.start()
    app.run()
           

说明:使用定时调度任务是为了解决一个问题。因为后台有个死循环程序,相当于上面的job1,我要获得该程序中的queue信息。不使用调度任务,web程序就启动不起来。我想过使用消息队列,把web程序和死循环程序拆成两个程序。一个向消息队列中发消息,另一个向消息队列中取消息。但那样太麻烦了,还有许多这样那样的问题。

现在使用定时调度任务就能完美解决了,上面的程序是在启动后10秒这行job1任务。

4,参考博客地址

https://www.cnblogs.com/huchong/p/9088611.html

https://blog.csdn.net/qq_37634812/article/details/79208782