APScheduler是一款功能非常强大的定时任务框架。利用APScheduler框架我们可以很方便实现一个基于Python的定时任务系统
1,安装
pip install apscheduler
2,简单调用(一次性调用任务)
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def aps_test():
print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好')
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=2))
scheduler.start()
启动后两秒,执行任务aps_test()
3,在flask中使用APScheduler(也是一次调用性任务)
from flask import Flask,jsonify
from flask_apscheduler import APScheduler
import datetime
import random
app = Flask(__name__)
queue = []
class Config(object):
JOBS = [
{
'id': 'jobjob',
# 'func': 'jobs:job1',
'func': '__main__:job1',
'args': (1, 2),
'next_run_time':datetime.datetime.now() + datetime.timedelta(seconds=10)
}
]
SCHEDULER_API_ENABLED = True
def job1(a, b):
global queue
while True:
if len(queue) > 10:
queue = []
else:
ranInfo = random.randint(1, 7) # 随机1-7
queue.append(ranInfo)
@app.route("/getImgInfo")
def getImgInfo():
global queue
return jsonify({'status': 0, 'errmsg': queue})
if __name__ == '__main__':
app.config.from_object(Config())
scheduler = APScheduler()
# it is also possible to enable the API directly
# scheduler.api_enabled = True
scheduler.init_app(app)
scheduler.start()
app.run()
说明:使用定时调度任务是为了解决一个问题。因为后台有个死循环程序,相当于上面的job1,我要获得该程序中的queue信息。不使用调度任务,web程序就启动不起来。我想过使用消息队列,把web程序和死循环程序拆成两个程序。一个向消息队列中发消息,另一个向消息队列中取消息。但那样太麻烦了,还有许多这样那样的问题。
现在使用定时调度任务就能完美解决了,上面的程序是在启动后10秒这行job1任务。
4,参考博客地址
https://www.cnblogs.com/huchong/p/9088611.html
https://blog.csdn.net/qq_37634812/article/details/79208782