天天看點

vnpy 查詢持倉量_VNPY源碼(四)DataRecorder

VNPY源碼學習系列文章:

一、源碼

"""

注冊EVENT_TICK、EVENT_CONTRACT,當有EVENT_TICK的時候,調用process_contract_event函數(其實就是record_tick函數),

将task put到queue

通過run函數,從self.queue獲得task(Tick、Bar),調用database_manager的方法儲存資料

"""

from threading import Thread

from queue import Queue, Empty

from copy import copy

from vnpy.event import Event, EventEngine

from vnpy.trader.engine import BaseEngine, MainEngine

from vnpy.trader.object import (

SubscribeRequest,

TickData,

BarData,

ContractData

)

from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT

from vnpy.trader.utility import load_json, save_json, BarGenerator

from vnpy.trader.database import database_manager

APP_NAME = "DataRecorder"

EVENT_RECORDER_LOG = "eRecorderLog"

EVENT_RECORDER_UPDATE = "eRecorderUpdate"

class RecorderEngine(BaseEngine):

""""""

setting_filename = "data_recorder_setting.json"

def __init__(self, main_engine: MainEngine, event_engine: EventEngine):

""""""

super().__init__(main_engine, event_engine, APP_NAME)

self.queue = Queue()

self.thread = Thread(target=self.run)

self.active = False

self.tick_recordings = {}

self.bar_recordings = {}

self.bar_generators = {}

self.load_setting()

self.register_event()

self.start()

self.put_event()

def load_setting(self):

""""""

setting = load_json(self.setting_filename)

self.tick_recordings = setting.get("tick", {})

self.bar_recordings = setting.get("bar", {})

def save_setting(self):

""""""

setting = {

"tick": self.tick_recordings,

"bar": self.bar_recordings

}

save_json(self.setting_filename, setting)

def run(self):

"""

調用database_manager的方法儲存資料

"""

while self.active:

try:

task = self.queue.get(timeout=1)

task_type, data = task

if task_type == "tick":

database_manager.save_tick_data([data])

elif task_type == "bar":

database_manager.save_bar_data([data])

except Empty:

continue

def close(self):

""""""

self.active = False

if self.thread.isAlive():

self.thread.join()

def start(self):

""""""

self.active = True

self.thread.start()

def add_bar_recording(self, vt_symbol: str):

"""

将symbol資料寫入bar_recordings["symbol"]這個字典,訂閱合約,調用save_setting進行儲存(裡面有tick_recordings,bar_recordings字典),

然後調用put_event(實質為調用Eventengine的self._queue.put(event)),将事件放入隊列。

"""

if vt_symbol in self.bar_recordings:

self.write_log(f"已在K線記錄清單中:{vt_symbol}")

return

contract = self.main_engine.get_contract(vt_symbol)

if not contract:

self.write_log(f"找不到合約:{vt_symbol}")

return

self.bar_recordings[vt_symbol] = {

"symbol": contract.symbol,

"exchange": contract.exchange.value,

"gateway_name": contract.gateway_name

}

self.subscribe(contract)

self.save_setting()

self.put_event()

self.write_log(f"添加K線記錄成功:{vt_symbol}")

def add_tick_recording(self, vt_symbol: str):

""""""

if vt_symbol in self.tick_recordings:

self.write_log(f"已在Tick記錄清單中:{vt_symbol}")

return

contract = self.main_engine.get_contract(vt_symbol)

if not contract:

self.write_log(f"找不到合約:{vt_symbol}")

return

self.tick_recordings[vt_symbol] = {

"symbol": contract.symbol,

"exchange": contract.exchange.value,

"gateway_name": contract.gateway_name

}

self.subscribe(contract)

self.save_setting()

self.put_event()

self.write_log(f"添加Tick記錄成功:{vt_symbol}")

def remove_bar_recording(self, vt_symbol: str):

""""""

if vt_symbol not in self.bar_recordings:

self.write_log(f"不在K線記錄清單中:{vt_symbol}")

return

self.bar_recordings.pop(vt_symbol)

self.save_setting()

#調用下面的put_event方法,将EVENT_RECORDER_UPDATE放入隊列

self.put_event()

self.write_log(f"移除K線記錄成功:{vt_symbol}")

def remove_tick_recording(self, vt_symbol: str):

""""""

if vt_symbol not in self.tick_recordings:

self.write_log(f"不在Tick記錄清單中:{vt_symbol}")

return

self.tick_recordings.pop(vt_symbol)

self.save_setting()

self.put_event()

self.write_log(f"移除Tick記錄成功:{vt_symbol}")

def register_event(self):

""""""

self.event_engine.register(EVENT_TICK, self.process_tick_event)

self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)

def process_tick_event(self, event: Event):

"""

調用下面的record_tick方法(其實就是将task put到queue)

"""

tick = event.data

if tick.vt_symbol in self.tick_recordings:

self.record_tick(tick)

if tick.vt_symbol in self.bar_recordings:

bg = self.get_bar_generator(tick.vt_symbol)

bg.update_tick(tick)

def process_contract_event(self, event: Event):

""""""

contract = event.data

vt_symbol = contract.vt_symbol

if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):

self.subscribe(contract)

def write_log(self, msg: str):

""""""

event = Event(

EVENT_RECORDER_LOG,

msg

)

self.event_engine.put(event)

def put_event(self):

"""

調用event_engine的put方法

"""

tick_symbols = list(self.tick_recordings.keys())

tick_symbols.sort()

bar_symbols = list(self.bar_recordings.keys())

bar_symbols.sort()

data = {

"tick": tick_symbols,

"bar": bar_symbols

}

event = Event(

EVENT_RECORDER_UPDATE,

data

)

self.event_engine.put(event)

def record_tick(self, tick: TickData):

"""

将task put到queue, 不過這裡為什麼要用self.queue,不是調用put_event?

"""

task = ("tick", copy(tick))

self.queue.put(task)

def record_bar(self, bar: BarData):

""""""

task = ("bar", copy(bar))

self.queue.put(task)

def get_bar_generator(self, vt_symbol: str):

"""

從bar_generators這個字典通過symbol取出bg的執行個體

"""

bg = self.bar_generators.get(vt_symbol, None)

if not bg:

bg = BarGenerator(self.record_bar)

self.bar_generators[vt_symbol] = bg

return bg

def subscribe(self, contract: ContractData):

""""""

req = SubscribeRequest(

symbol=contract.symbol,

exchange=contract.exchange

)

self.main_engine.subscribe(req, contract.gateway_name)

二、database_manager

在DataRecorder子產品有下面的代碼

from vnpy.trader.database import database_manager

if task_type == "tick":

database_manager.save_tick_data([data])

elif task_type == "bar":

database_manager.save_bar_data([data])

可是在vnpy.trader.database下面沒有找到這個database_manager,

隻在init.py找到下面這句。

from vnpy.trader.database.database import BaseDatabaseManager

database_manager: "BaseDatabaseManager" = init(settings=settings)

原來它調用的是initialize中的init方法,傳回的是BaseDatabaseManager對象

它的save_tick_data方法是在C:\vnstudio\Lib\site-packages\vnpy\~rader\database\database.py檔案中BaseDatabaseManager這個類定義的。

然後在上面的import中,将這個類import了進來。

@abstractmethod

def save_tick_data(

self,

datas: Sequence["TickData"],

):

pass

參考:https://www.vnpy.com/forum/topic/805-databaseyuan-ma-yue-du-bi-ji-+pei-zhi-jiao-cheng