定時任務在量化中的應用
在實盤量化程式中有時候我們需要定時的從資料接口擷取資料來供交易政策處理産生交易信号進而做出交易決策。比如基于15分鐘K線設計的交易政策需要每15分鐘向資料接口送出請求擷取最新的15分鐘k線資料交給交易政策處理,産生相應的交易信号最後做出相應的交易決策。
python子產品介紹
面對定時任務處理需求python給我們提供了一些非常友善的定時任務處理子產品,主要有time.sleep, threading.Timer, sched(python自帶), schedule和apscheduler五個子產品。
各子產品的優缺點如下:
優點 | 缺點 | |
time.sleep | Python自帶,使用簡單, 能執行固定間隔時間的任務 | 隻能執行固定間隔時間的任務,如果有定時任務就無法完成,比如每天早上六點半叫你起床。并且 sleep 是一個阻塞函數,也就是說 sleep 這一段時間,啥都不能做。 |
threading.Timer | 非阻塞函數,使用簡單,能執行固定間隔時間的任務 | 隻能執行固定間隔時間的任務,還是不能叫你起床 |
sched(python自帶) | Python自帶,使用簡單、能執行固定間隔時間的任務和定時任務 | sched 子產品不是循環的,一次排程被執行後就結束了,如果想再執行,就需要再次 enter,可以叫你起床但不能每天都叫 |
schedule | 能執行固定間隔時間的任務和定時任務,第三方輕量庫,可以每天叫你起床 | schedule方法是串行的,如果各個任務之間時間不沖突,那是沒問題的;如果時間有沖突的話,會串行的執行指令,如果想要并發執行多任務需借助多線程或者多程序,與apscheduler相比沒有表達式操作 |
apscheduler | 能執行固定間隔時間的任務和定時任務,可以每天叫你起床,功能強大,支援多個定時任務并發執行,有豐富的表達式操作可以靈活的對時間進行判斷,比如minute='*/15', 表示到能被15整除的分鐘數才執行定時任務 | 暫時還沒發現(可能存在的缺點應該是與其他四個子產品相比略顯重量級,涉及的相關參數較多使用相對複雜) |
示例代碼
在上述的量化應用場景當中我們一般使用apscheduler定時架構,以便在高效完成相關量化實盤程式的定時需求。比如你想在能被15整除的分鐘數才擷取k線資料,用其他的子產品就難以完成,但是使用apscheduler架構就能通過幾行非常簡短的代碼來實作該需求。
在此筆者以每10秒向tushare請求資料為例展示 apscheduler定時架構的相關用法。
代碼如下:
# -*- coding: utf-8 -*-
# python環境:python3.7
# apscheduler版本:3.6.1
from apscheduler.schedulers.blocking import BlockingScheduler
import tushare as ts
from queue import Queue,Empty
from threading import Thread
from datetime import datetime
class Demo:
def __init__(self,token):
'''
初始化函數
:param token:tushare pro的token
'''
self.token=token
# 建立隊列
self._quene = Queue()
# 建立資料處理線程
self._thread1 = Thread(target=self._run, args=(self._quene,))
# 建立請求資料線程
self._thread2 = Thread(target=self.get_data, args=(self._quene,))
def start(self):
'''
線程啟動函數
'''
self._thread1.start()
self._thread2.start()
def get_bar(self,_quene):
'''
向tushare pro擷取資料
'''
ts.set_token(self.token)
pro = ts.pro_api()
df_bar=pro.daily(ts_code='000001.SZ',
start_date='20180701',
end_date='20180718')
_quene.put(df_bar)
# apscheduler定時架構
def get_data(self,_quene):
'''
使用apscheduler定時架構建立定時任務
'''
# 建立排程器對象
scheduler = BlockingScheduler()
# 添加定時任務
scheduler.add_job(self.get_bar, 'cron', second='*/10', args=[_quene])
# 啟動排程器,背景監控定時任務,到點執行
scheduler.start()
def _run(self,_queue):
'''
從隊列中擷取資料,并做處理
'''
while True:
try:
data = _queue.get(block=True, timeout=1)
if data is not None:
self.process_data(data)
except Empty:
pass
def process_data(self,data):
'''
列印前三行資料和時間
'''
print(data.head(3))
print(datetime.now())
if __name__=='__main__':
# 老版不再維護tushare pro版:https://tushare.pro/
# 設定tushare pro版的登入token
token = '33261e14a0f45680d6afdc86b85bc0c4c80ba6b8dc199d7313a30838'
demo=Demo(token)
demo.start()
運作結果:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5SYhNmYxEzYwgDN4YzMwIzMyUWYlRDNzQmYkVGO0I2Yj9CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
掃碼添加作者微信
量化玩家
微信号:quant_player