VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 402
声望: 144

1. tick数据模拟器

tick数据模拟器可以从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
它可以用来复盘或者调试策略,加快策略的开发进度。

1.1 功能:

  • 可以启动、暂停、恢复和停止回放tick数据。
  • 速度可调,根据需要决定合适的速度
  • 模拟的接口选,默认CTP
  • 时间范围可变

1.2 tick数据来源

tick数据模拟器回放的tick数据不是凭空生造的,它是真实的历史行情。通常先用vnpy中的DataRecorder录制您感兴趣的tick数据,它会把tick数据记录到vnpy数据库中,
然后就可以利用tick数据模拟器在休市时间进行回放了。

2. tick数据模拟器的实现

"""
tick数据模拟器
作者:hxxjava
时间:2022-12-10
功能:从数据库读取历史tick数据,以一定的速度将其按时间顺序放入消息队列中,以此实现历史行情回放。
"""
from typing import List,Dict
from datetime import datetime,timedelta
from time import sleep 
from vnpy.event.engine import Event,EventEngine
from vnpy.trader.engine import MainEngine,BaseEngine
from vnpy.trader.constant import InstrumentStatus
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import EVENT_TICK,EVENT_STATUS
from vnpy.trader.utility import extract_vt_symbol,CHINA_TZ

from vnpy.trader.database import get_database

from vnpy.trader.app import BaseApp
from threading import Thread

EVENT_TICKSIM_START = "eTickSimStart."
EVENT_TICKSIM_PAUSE = "eTickSimPause."
EVENT_TICKSIM_STOP = "eTickSimStop."

EVENT_TICKSIM_FINISHED = "eTickSimFinished."

TICK_SIM_ENGINE_NAME = "tick_sim"

class TickSimEngine(BaseEngine):
    """ tick数据模拟器 """
    def __init__(self, main_engine: MainEngine, event_engine: EventEngine) -> None:
        """ constructor """
        super(TickSimEngine, self).__init__(main_engine, event_engine, TICK_SIM_ENGINE_NAME)

        self.ticks:List[TickData] = []

        self.db = get_database()
        self.speed = 1.0
        self._active = False
        self._pause = False
        self.gateway_name = "CTP"

        self.register_event()

        print(f"tick数据模拟器已经安装。")

    def register_event(self) -> None:
        """ """
        # self.event_engine.register(EVENT_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_TICKSIM_START,self.process_ticksim_start_event) 
        self.event_engine.register(EVENT_TICKSIM_PAUSE,self.process_ticksim_pause_event) 
        self.event_engine.register(EVENT_TICKSIM_STOP,self.process_ticksim_stop_event) 

        self.event_engine.register(EVENT_TICKSIM_FINISHED,self.process_ticksim_finished)     

    def load_ticks(self) -> None:
        """  """
        symbol,exchange = extract_vt_symbol(self.vt_symbol)
        self.ticks = self.db.load_tick_data(symbol=symbol,exchange=exchange,start=self.start_time,end=self.end_time)
        print(f"TickSimulator load {len(self.ticks)} ticks")

    def process_tick_event(self,event:Event) -> None:
        """ """
        tick:Dict = event.data
        print(f"recieve:{tick}")

    def process_ticksim_start_event(self,event:Event) -> None:
        """ """
        data:Dict = event.data
        print(f"I got EVENT_TICKSIM_START:{data}")
        self.start_simulator(**data)

    def process_ticksim_pause_event(self,event:Event) -> None:
        """ """
        self._pause = not self._pause
        print(f"I got EVENT_TICKSIM_PAUSE:{data}")

    def process_ticksim_stop_event(self,event:Event) -> None:
        """ """
        self._active = True
        print(f"I got EVENT_TICKSIM_STOP:{data}")

    def process_ticksim_finished(self,event:Event) -> None:
        """ """
        # 送出收盘交易状态
        tick:TickData = event.data

        next_second:datetime = tick.datetime + timedelta(seconds=1)
        next_second.replace(second=0,microsecond=0)
        status = StatusData(
            gateway_name=self.gateway_name,
            symbol=tick.symbol,
            exchange=tick.exchange,
            settlement_group_id="100",
            instrument_status=InstrumentStatus.CLOSE,
            trading_segment_sn = 100,
            enter_time = next_second.strftime("%H:%M"),
        )
        self.event_engine.put(Event(EVENT_STATUS,data=status))

        self.stop()

        print(f"finish_time:{tick.datetime}")

    def start(self) -> None:
        """ """
        self._active = True
        self.thread = Thread(target=self.run)
        self.thread.start()

    def stop(self) -> None:
        """ """
        self._active = False
        self._pause = False
        self.speed = 1.0
        self.ticks.clear()

    def run(self) -> None:
        """ 仿真线程函数 """
        while self._active:
            # 遍历发送
            if not self._pause:
                if self.ticks:
                    tick = self.ticks.pop(0)
                    tick.gateway_name = self.gateway_name

                    event = Event(EVENT_TICK,tick)

                    # print(f"EVENT_TICK event data = {tick}")
                    self.event_engine.put(event)

                    if len(self.ticks) == 0:
                        print(f"all ticks are send out !")
                        self.event_engine.put(Event(EVENT_TICKSIM_FINISHED,data=tick))

            sleep(0.5/self.speed)

        print(f"TickSimulator is stoped !")

    def start_simulator(self,vt_symbol:str,start_time:datetime,end_time:datetime,speed:float=1.0,gateway_name:str='CTP') -> None:
        """ 启动tick模拟器 """
        self.vt_symbol = vt_symbol
        self.start_time = start_time
        self.end_time = end_time
        self.speed = speed
        self.gateway_name = gateway_name
        self.load_ticks()
        self.start()

3. tick数据模拟器使用

这里给出最简单的使用方法,具体您可以发挥想象力。

3.1 加载tick数据模器到MainEgine

    """ """
    event_engine = EventEngine()
    main_engine = MainEngine(event_engine=event_engine)

    main_engine.add_engine(TickSimEngine)

3.2 启动回放方法1

    tick_sim_engine:TickSimEngine = main_engine.get_engine(TICK_SIM_ENGINE_NAME)
    end_time = datetime.now().replace(tzinfo=CHINA_TZ)
    start_time = datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ)
    tick_sim_engine.start_simulator(
             vt_symbol='p2301.DCE',  # 回放合约
             start_time=start_time,      # 开始时间
             end_time=end_time,        # 结束时间
             speed=10.0                      # 回放速度
    )

3.3 启动回放方法2

    data = {
        "vt_symbol":'p2301.DCE',                   
        "start_time":datetime(2022,12,9,22,50,0).replace(tzinfo=CHINA_TZ),
        "end_time": datetime.now().replace(tzinfo=CHINA_TZ),
        "speed":10,
        "gateway_name":"CTP"
    }  # 意义同上
    event_engine.put(Event(EVENT_TICKSIM_START,data))

3.4 暂停回放方法

    event_engine.put(Event(EVENT_TICKSIM_PAUSE))

3.5 停止回放方法

    event_engine.put(Event(EVENT_TICKSIM_STOP))

注释:

无论3.2~3.5中的代码在哪里,event_engine必须和3.1加载tick数据模器时是同一个消息引擎。

Member
avatar
加入于:
帖子: 733
声望: 39

感谢分享!

Member
avatar
加入于:
帖子: 3
声望: 0

大神,问下 是否可以再回测中, 进行subscribe的方式订阅合约?。我的逻辑上是开盘后计算找到合适的合约在做买入卖出的计算

Member
avatar
加入于:
帖子: 402
声望: 144

leo-2a6111b0fda7498e wrote:

大神,问下 是否可以再回测中, 进行subscribe的方式订阅合约?。我的逻辑上是开盘后计算找到合适的合约在做买入卖出的计算

不可以,因为回测中是不连接CTP网关接口的,它的数据全部来自历史数据,此时是无法大约数据的。

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】