天天看點

flask+APScheduler定時任務的使用APScheduler簡介使用結合flask使用(大衆點評資料同步執行個體)

APScheduler定時任務使用以及在flask中的調用

  • APScheduler簡介
    • 組成部分
      • 排程器
    • 安裝
      • 普通使用安裝
      • 結合flask使用安裝
  • 使用
    • 添加job
    • add_job參數詳解
      • interval 間隔時間(每隔一段時間執行)
      • date 定時排程(隻執行一次)
      • cron定時排程(某一定時時刻執行)
    • job的其他操作
  • 結合flask使用(大衆點評資料同步執行個體)
    • 在flask的config配置中添加:
    • 在main.py檔案建立flask執行個體并初始化APScheduler

任務需求:在flask中每天自動同步大衆點評資料到資料庫,這裡選擇了Flask-APScheduler

APScheduler簡介

APScheduler基于Quartz的一個Python定時任務架構,實作了Quartz的所有功能,使用起來十分友善。提供了基于日期、固定時間間隔以及crontab類型的任務,并且可以持久化任務。基于這些功能,我們可以很友善的實作一個python定時任務系統。

組成部分

APScheduler有四種組成部分:

  • 觸發器(trigger)包含排程邏輯,每一個作業有它自己的觸發器,用于決定接下來哪一個作業會運作。除了他們自己初始配置意外,觸發器完全是無狀态的。
  • 作業存儲(jobstore)存儲被排程的作業,預設的作業存儲是簡單地把作業儲存在記憶體中,其他的作業存儲是将作業儲存在資料庫中。一個作業的資料講在儲存在持久化作業存儲時被序列化,并在加載時被反序列化。排程器不能分享同一個作業存儲。
  • 執行器(executor)處理作業的運作,他們通常通過在作業中送出制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器将會通知排程器。
  • 排程器(scheduler)是其他的組成部分。你通常在應用隻有一個排程器,應用的開發者通常不會直接處理作業存儲、排程器和觸發器,相反,排程器提供了處理這些的合适的接口。配置作業存儲和執行器可以在排程器中完成,例如添加、修改和移除作業。

排程器

APScheduler提供了多種排程器,可以根據具體需求來選擇合适的排程器,常用的排程器有:

  • BlockingScheduler:适合于隻在程序中運作單個任務的情況,通常在排程器是你唯一要運作的東西時使用。
  • BackgroundScheduler: 适合于要求任何在程式背景運作的情況,當希望排程器在應用背景執行時使用。
  • AsyncIOScheduler:适合于使用asyncio架構的情況
  • GeventScheduler: 适合于使用gevent架構的情況
  • TornadoScheduler: 适合于使用Tornado架構的應用
  • TwistedScheduler: 适合使用Twisted架構的應用
  • QtScheduler: 适合使用QT的情況

安裝

普通使用安裝

pip install apscheduler
           

結合flask使用安裝

pip install Flask-APScheduler
           

使用

添加job

  1. work1使用add_job()方法添加,表示每隔5秒執行一次,第一次執行是程式運作5秒後
  2. work2使用裝飾器添加
import time
from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()


def work1():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


@sched.scheduled_job('interval', seconds=2)
def work2():
    print('work2222')

sched.add_job(work1, 'interval', seconds=5)
sched.start()
           

add_job參數詳解

id

# id代表該job唯一辨別,不可重複,之後可以用id查找job
sched.add_job(work1, 'interval', seconds=5, id='my_work1')
           

trigger

它管理着作業的排程方式。它可以為date, interval或者cron。對于不同的trigger,對應的參數也不同。

interval 間隔時間(每隔一段時間執行)

後面可以寫的參數:

  • weeks (int) – number of weeks to wait
  • days (int) – number of days to wait
  • hours (int) – number of hours to wait
  • minutes (int) – number of minutes to wait
  • seconds (int) – number of seconds to wait
  • start_date (datetime|str) – starting point for the interval

    calculation

  • end_date (datetime|str) – latest possible date/time to trigger on
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time

    calculations

#表示每隔3天17時19分07秒執行一次任務
sched.add_job(my_job, 'interval',days=3,hours=17,minutes=19,seconds=7)
           

date 定時排程(隻執行一次)

  • run_date (datetime|str) – the date/time to run the job at -(任務開始的時間)
  • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
# The job will be executed on November 6th, 2009
sched.add_job(func=work1, trigger='date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(func=work1, trigger='date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
           

cron定時排程(某一定時時刻執行)

後面參數和interval大緻相同

# 表示2017年3月22日17時19分07秒執行該程式
sched.add_job(work1, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7)
# 表示任務在6,7,8,11,12月份的第三個星期五的00:00,01:00,02:00,03:00 執行該程式
sched.add_job(work1, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00
sched.add_job(work1, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
# 表示每5秒執行該程式一次,相當于interval 間隔排程中seconds = 5
sched.add_job(work1, 'cron', second='*/5')
           

args 參數,要可疊代對象

job的其他操作

# 移除所有,移除要放在start之前才有效
sched.remove_all_jobs()
# 根據id移除job
sched.remove_job('my_work1')
sched.pause_job('my_work1')  # 暫停
sched.resume_job('my_work1')  # 恢複
sched.get_job('my_work1')  # 擷取
sched.get_jobs()  # 擷取所有job清單
# 預設情況是排程器等所有job完成後關閉,設為False時直接關閉
sched.shutdown(wait=False)  
           

結合flask使用(大衆點評資料同步執行個體)

flask的項目結構是util目錄下有個dianpingUtil.py檔案,檔案裡有個函數dianPing()是要執行的定時任務

import hashlib
import time
import json
from http import client
from urllib.parse import urlencode
from model.ZLHModel import ZlhUserFinanceDianping


def dianPing(current_app):
    data_type = 1

    today_date = time.strftime("%Y-%m-%d", time.localtime())
    _where = (ZlhUserFinanceDianping.ufd_date == today_date) & (ZlhUserFinanceDianping.ufd_data_type == data_type)
    # 如果已存在則不通路
    user_finance_dianping_count = ZlhUserFinanceDianping.select().where(_where).count()
    if not user_finance_dianping_count:
        host = 'openapi.dianping.com'
        appsecret = current_app.config['DIANPING_APPSECRET']
        time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        params = {'app_key': current_app.config['DIANPING_APPKEY'], 'timestamp': time_stamp,
                  'session': current_app.config['DIANPING_SESSION'], 'format': "json", 'v': "1",
                  'sign_method': "MD5",
                  'open_shop_uuid': current_app.config['DIANPING_OPEN_SHOP_UUID'], 'date_type': str(data_type)}
        signstr = sign(params, appsecret, 'MD5')
        params['sign'] = signstr
        encode_params = urlencode(params)
        request_url = "https://openapi.dianping.com/router/merchant/data/consumption"
        header = {"Content-type": "application/x-www-form-urlencoded ; charset=UTF-8"}
        conn = client.HTTPConnection(host)
        conn.request(method="POST", url=request_url, headers=header, body=encode_params)
        response = conn.getresponse()
        res = response.read()
        result = json.loads(res)
        if result.get('code') == 200:
            for data in result.get('data'):
                ZlhUserFinanceDianping.create(ufd_tg_consume_amount=data.get('tg_consume_amount') * 100,
                                              ufd_tg_consume_count=data.get('tg_consume_count'),
                                              ufd_mopay_consume_amount=data.get('mopay_consume_amount') * 100,
                                              ufd_mopay_consume_count=data.get('mopay_consume_count'),
                                              ufd_reservation_consume_amount=data.get(
                                                  'reservation_consume_amount') * 100,
                                              ufd_reservation_consume_count=data.get('reservation_consume_count'),
                                              ufd_platform=data.get('platform'),
                                              ufd_date=today_date, ufd_data_type=data_type,
                                              )
            current_app.logger.info('大衆點評完成同步')
        else:
            current_app.logger.info('大衆點評傳回結果錯誤' + result.get('msg'))
    else:
        current_app.logger.info('點評當日已儲存')


def sign(param, appsecret, signmethod):

    if signmethod !="MD5":
        return ''

    lists = []
    param_str = appsecret
    for item in param:
        lists.append(item)

    lists.sort()

    for key in lists:
        param_str = param_str + key + param[key]

    param_str += appsecret
    param_str = param_str.strip()

    return genMd5(param_str)


def genMd5(str):
    md5 = hashlib.md5()
    md5.update(str.encode("utf8"))
    md5.hexdigest()

    return md5.hexdigest()
           

在flask的config配置中添加:

SCHEDULER_API_ENABLED = True
# 添加需要的配置
# 大衆點評
DIANPING_APPSECRET = '***'
DIANPING_APPKEY = '***'
DIANPING_OPEN_SHOP_UUID = '***'
DIANPING_SESSION = '***'
DIANPING_REFRESH_SESSION = '**'
           

在main.py檔案建立flask執行個體并初始化APScheduler

這裡的重點是定時任務需要用到flask app上下文,是以直接将app作為參數傳過去即可

from flask import Flask
from flask_apscheduler import APScheduler
...  # 省略導入配置檔案

app = Flask(__name__, template_folder='templates', static_url_path='/static')
# 注冊APScheduler,添加任務
scheduler = APScheduler()
scheduler.init_app(app)
# 表示一天後開始執行,然後每天執行一次
scheduler.add_job(id='dianping', func='util.DianpingUtil:dianPing',
                  trigger='interval', day=1, args=[app, ])
scheduler.start()
app.config.from_object(app_config[‘dev’])  # 配置檔案
# 省略
...