1. tick数据模拟器
tick数据模拟器可以从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
它可以用来复盘或者调试策略,加快策略的开发进度。
1.1 功能:
- 可以启动、暂停、恢复和停止回放tick数据。
- 速度可调,根据需要决定合适的速度
- 模拟的接口选,默认CTP
- 时间范围可变
1.2 tick数据来源
tick数据模拟器回放的tick数据不是凭空生造的,它是真实的历史行情。通常先用vnpy中的DataRecorder录制您感兴趣的tick数据,它会把tick数据记录到vnpy数据库中,
然后就可以利用tick数据模拟器在休市时间进行回放了。
2. tick数据模拟器的实现
"""
tick数据模拟器
作者:hxxjava
时间:2022-12-10
功能:从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
"""
from typing import List,Dict
from datetime import datetime,timedelta
from time import sleep
from vnpy.event.engine import Event,EventEngine
from vnpy.trader.engine import MainEngine,BaseEngine
from vnpy.trader.constant import InstrumentStatus
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import EVENT_TICK,EVENT_STATUS
from vnpy.trader.utility import extract_vt_symbol,CHINA_TZ
from vnpy.trader.database import get_database
from vnpy.trader.app import BaseApp
from threading import Thread
EVENT_TICKSIM_START = "eTickSimStart."
EVENT_TICKSIM_PAUSE = "eTickSimPause."
EVENT_TICKSIM_STOP = "eTickSimStop."
EVENT_TICKSIM_FINISHED = "eTickSimFinished."
TICK_SIM_ENGINE_NAME = "tick_sim"
class TickSimEngine(BaseEngine):
""" tick数据模拟器 """
def __init__(self, main_engine: MainEngine, event_engine: EventEngine) -> None:
""" constructor """
super(TickSimEngine, self).__init__(main_engine, event_engine, TICK_SIM_ENGINE_NAME)
self.ticks:List[TickData] = []
self.db = get_database()
self.speed = 1.0
self._active = False
self._pause = False
self.gateway_name = "CTP"
self.register_event()
print(f"tick数据模拟器已经安装。")
def register_event(self) -> None:
""" """
# self.event_engine.register(EVENT_TICK,self.process_tick_event)
self.event_engine.register(EVENT_TICKSIM_START,self.process_ticksim_start_event)
self.event_engine.register(EVENT_TICKSIM_PAUSE,self.process_ticksim_pause_event)
self.event_engine.register(EVENT_TICKSIM_STOP,self.process_ticksim_stop_event)
self.event_engine.register(EVENT_TICKSIM_FINISHED,self.process_ticksim_finished)
def load_ticks(self) -> None:
""" """
symbol,exchange = extract_vt_symbol(self.vt_symbol)
self.ticks = self.db.load_tick_data(symbol=symbol,exchange=exchange,start=self.start_time,end=self.end_time)
print(f"TickSimulator load {len(self.ticks)} ticks")
def process_tick_event(self,event:Event) -> None:
""" """
tick:Dict = event.data
print(f"recieve:{tick}")
def process_ticksim_start_event(self,event:Event) -> None:
""" """
data:Dict = event.data
print(f"I got EVENT_TICKSIM_START:{data}")
self.start_simulator(**data)
def process_ticksim_pause_event(self,event:Event) -> None:
""" """
self._pause = not self._pause
print(f"I got EVENT_TICKSIM_PAUSE:{data}")
def process_ticksim_stop_event(self,event:Event) -> None:
""" """
self._active = True
print(f"I got EVENT_TICKSIM_STOP:{data}")
def process_ticksim_finished(self,event:Event) -> None:
""" """
# 送出收盘交易状态
tick:TickData = event.data
next_second:datetime = tick.datetime + timedelta(seconds=1)
next_second.replace(second=0,microsecond=0)
status = StatusData(
gateway_name=self.gateway_name,
symbol=tick.symbol,
exchange=tick.exchange,
settlement_group_id="100",
instrument_status=InstrumentStatus.CLOSE,
trading_segment_sn = 100,
enter_time = next_second.strftime("%H:%M"),
)
self.event_engine.put(Event(EVENT_STATUS,data=status))
self.stop()
print(f"finish_time:{tick.datetime}")
def start(self) -> None:
""" """
self._active = True
self.thread = Thread(target=self.run)
self.thread.start()
def stop(self) -> None:
""" """
self._active = False
self._pause = False
self.speed = 1.0
self.ticks.clear()
def run(self) -> None:
""" 仿真线程函数 """
while self._active:
# 遍历发送
if not self._pause:
if self.ticks:
tick = self.ticks.pop(0)
tick.gateway_name = self.gateway_name
event = Event(EVENT_TICK,tick)
# print(f"EVENT_TICK event data = {tick}")
self.event_engine.put(event)
if len(self.ticks) == 0:
print(f"all ticks are send out !")
self.event_engine.put(Event(EVENT_TICKSIM_FINISHED,data=tick))
sleep(0.5/self.speed)
print(f"TickSimulator is stoped !")
def start_simulator(self,vt_symbol:str,start_time:datetime,end_time:datetime,speed:float=1.0,gateway_name:str='CTP') -> None:
""" 启动tick模拟器 """
self.vt_symbol = vt_symbol
self.start_time = start_time
self.end_time = end_time
self.speed = speed
self.gateway_name = gateway_name
self.load_ticks()
self.start()
3. tick数据模拟器使用
这里给出最简单的使用方法,具体您可以发挥想象力。
3.1 加载tick数据模器到MainEgine
""" """
event_engine = EventEngine()
main_engine = MainEngine(event_engine=event_engine)
main_engine.add_engine(TickSimEngine)
3.2 启动回放方法1
tick_sim_engine:TickSimEngine = main_engine.get_engine(TICK_SIM_ENGINE_NAME)
end_time = datetime.now().replace(tzinfo=CHINA_TZ)
start_time = datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ)
tick_sim_engine.start_simulator(
vt_symbol='p2301.DCE', # 回放合约
start_time=start_time, # 开始时间
end_time=end_time, # 结束时间
speed=10.0 # 回放速度
)
3.3 启动回放方法2
data = {
"vt_symbol":'p2301.DCE',
"start_time":datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ),
"end_time": datetime.now().replace(tzinfo=CHINA_TZ),
"speed":10,
"gateway_name":"CTP"
} # 意义同上
event_engine.put(Event(EVENT_TICKSIM_START,data))
3.4 暂停回放方法
event_engine.put(Event(EVENT_TICKSIM_PAUSE))
3.5 停止回放方法
event_engine.put(Event(EVENT_TICKSIM_STOP))
注释:
无论3.2~3.5中的代码在哪里,event_engine必须和3.1加载tick数据模器时是同一个消息引擎。