VNPY源碼學習系列文章:
一、源碼
"""
注冊EVENT_TICK、EVENT_CONTRACT,當有EVENT_TICK的時候,調用process_contract_event函數(其實就是record_tick函數),
将task put到queue
通過run函數,從self.queue獲得task(Tick、Bar),調用database_manager的方法儲存資料
"""
from threading import Thread
from queue import Queue, Empty
from copy import copy
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.object import (
SubscribeRequest,
TickData,
BarData,
ContractData
)
from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT
from vnpy.trader.utility import load_json, save_json, BarGenerator
from vnpy.trader.database import database_manager
APP_NAME = "DataRecorder"
EVENT_RECORDER_LOG = "eRecorderLog"
EVENT_RECORDER_UPDATE = "eRecorderUpdate"
class RecorderEngine(BaseEngine):
""""""
setting_filename = "data_recorder_setting.json"
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
self.queue = Queue()
self.thread = Thread(target=self.run)
self.active = False
self.tick_recordings = {}
self.bar_recordings = {}
self.bar_generators = {}
self.load_setting()
self.register_event()
self.start()
self.put_event()
def load_setting(self):
""""""
setting = load_json(self.setting_filename)
self.tick_recordings = setting.get("tick", {})
self.bar_recordings = setting.get("bar", {})
def save_setting(self):
""""""
setting = {
"tick": self.tick_recordings,
"bar": self.bar_recordings
}
save_json(self.setting_filename, setting)
def run(self):
"""
調用database_manager的方法儲存資料
"""
while self.active:
try:
task = self.queue.get(timeout=1)
task_type, data = task
if task_type == "tick":
database_manager.save_tick_data([data])
elif task_type == "bar":
database_manager.save_bar_data([data])
except Empty:
continue
def close(self):
""""""
self.active = False
if self.thread.isAlive():
self.thread.join()
def start(self):
""""""
self.active = True
self.thread.start()
def add_bar_recording(self, vt_symbol: str):
"""
将symbol資料寫入bar_recordings["symbol"]這個字典,訂閱合約,調用save_setting進行儲存(裡面有tick_recordings,bar_recordings字典),
然後調用put_event(實質為調用Eventengine的self._queue.put(event)),将事件放入隊列。
"""
if vt_symbol in self.bar_recordings:
self.write_log(f"已在K線記錄清單中:{vt_symbol}")
return
contract = self.main_engine.get_contract(vt_symbol)
if not contract:
self.write_log(f"找不到合約:{vt_symbol}")
return
self.bar_recordings[vt_symbol] = {
"symbol": contract.symbol,
"exchange": contract.exchange.value,
"gateway_name": contract.gateway_name
}
self.subscribe(contract)
self.save_setting()
self.put_event()
self.write_log(f"添加K線記錄成功:{vt_symbol}")
def add_tick_recording(self, vt_symbol: str):
""""""
if vt_symbol in self.tick_recordings:
self.write_log(f"已在Tick記錄清單中:{vt_symbol}")
return
contract = self.main_engine.get_contract(vt_symbol)
if not contract:
self.write_log(f"找不到合約:{vt_symbol}")
return
self.tick_recordings[vt_symbol] = {
"symbol": contract.symbol,
"exchange": contract.exchange.value,
"gateway_name": contract.gateway_name
}
self.subscribe(contract)
self.save_setting()
self.put_event()
self.write_log(f"添加Tick記錄成功:{vt_symbol}")
def remove_bar_recording(self, vt_symbol: str):
""""""
if vt_symbol not in self.bar_recordings:
self.write_log(f"不在K線記錄清單中:{vt_symbol}")
return
self.bar_recordings.pop(vt_symbol)
self.save_setting()
#調用下面的put_event方法,将EVENT_RECORDER_UPDATE放入隊列
self.put_event()
self.write_log(f"移除K線記錄成功:{vt_symbol}")
def remove_tick_recording(self, vt_symbol: str):
""""""
if vt_symbol not in self.tick_recordings:
self.write_log(f"不在Tick記錄清單中:{vt_symbol}")
return
self.tick_recordings.pop(vt_symbol)
self.save_setting()
self.put_event()
self.write_log(f"移除Tick記錄成功:{vt_symbol}")
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
def process_tick_event(self, event: Event):
"""
調用下面的record_tick方法(其實就是将task put到queue)
"""
tick = event.data
if tick.vt_symbol in self.tick_recordings:
self.record_tick(tick)
if tick.vt_symbol in self.bar_recordings:
bg = self.get_bar_generator(tick.vt_symbol)
bg.update_tick(tick)
def process_contract_event(self, event: Event):
""""""
contract = event.data
vt_symbol = contract.vt_symbol
if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):
self.subscribe(contract)
def write_log(self, msg: str):
""""""
event = Event(
EVENT_RECORDER_LOG,
msg
)
self.event_engine.put(event)
def put_event(self):
"""
調用event_engine的put方法
"""
tick_symbols = list(self.tick_recordings.keys())
tick_symbols.sort()
bar_symbols = list(self.bar_recordings.keys())
bar_symbols.sort()
data = {
"tick": tick_symbols,
"bar": bar_symbols
}
event = Event(
EVENT_RECORDER_UPDATE,
data
)
self.event_engine.put(event)
def record_tick(self, tick: TickData):
"""
将task put到queue, 不過這裡為什麼要用self.queue,不是調用put_event?
"""
task = ("tick", copy(tick))
self.queue.put(task)
def record_bar(self, bar: BarData):
""""""
task = ("bar", copy(bar))
self.queue.put(task)
def get_bar_generator(self, vt_symbol: str):
"""
從bar_generators這個字典通過symbol取出bg的執行個體
"""
bg = self.bar_generators.get(vt_symbol, None)
if not bg:
bg = BarGenerator(self.record_bar)
self.bar_generators[vt_symbol] = bg
return bg
def subscribe(self, contract: ContractData):
""""""
req = SubscribeRequest(
symbol=contract.symbol,
exchange=contract.exchange
)
self.main_engine.subscribe(req, contract.gateway_name)
二、database_manager
在DataRecorder子產品有下面的代碼
from vnpy.trader.database import database_manager
if task_type == "tick":
database_manager.save_tick_data([data])
elif task_type == "bar":
database_manager.save_bar_data([data])
可是在vnpy.trader.database下面沒有找到這個database_manager,
隻在init.py找到下面這句。
from vnpy.trader.database.database import BaseDatabaseManager
database_manager: "BaseDatabaseManager" = init(settings=settings)
原來它調用的是initialize中的init方法,傳回的是BaseDatabaseManager對象
它的save_tick_data方法是在C:\vnstudio\Lib\site-packages\vnpy\~rader\database\database.py檔案中BaseDatabaseManager這個類定義的。
然後在上面的import中,将這個類import了進來。
@abstractmethod
def save_tick_data(
self,
datas: Sequence["TickData"],
):
pass
參考:https://www.vnpy.com/forum/topic/805-databaseyuan-ma-yue-du-bi-ji-+pei-zhi-jiao-cheng