vnpy社区新人,一个弱弱的问题。
课程和demo中展示的,很多都是tick或bar生成时驱动交易信号的判断和触发。请问直接用系统时间驱动,是否可行?
简单使用时间驱动,如在某个特定时点(有可能是在开盘前,所以没有quotes中的时间戳,不能用tick驱动)提前下单,等待成交。
这样虽然在后向测试中可能会出问题,但实盘交易时可以更方便。
为了减少尝试成本,请了解的大神告知,谢谢!
vnpy社区新人,一个弱弱的问题。
课程和demo中展示的,很多都是tick或bar生成时驱动交易信号的判断和触发。请问直接用系统时间驱动,是否可行?
简单使用时间驱动,如在某个特定时点(有可能是在开盘前,所以没有quotes中的时间戳,不能用tick驱动)提前下单,等待成交。
这样虽然在后向测试中可能会出问题,但实盘交易时可以更方便。
为了减少尝试成本,请了解的大神告知,谢谢!
不清楚具体策略逻辑了,可以自己用虚拟账户试试看
参照tick驱动,注册了 self.event_engine.register(EVENT_TIME,self.process_systime_event) # test
在仿照以下程序时,遇到问题:
def process_tick_event(self, event: Event):
""""""
tick = event.data
strategies = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
self.check_stop_order(tick)
for strategy in strategies:
if strategy.inited:
self.call_strategy_func(strategy, strategy.on_tick, tick)
tick驱动时,传进来的数据有vt_symbol,可据此判断所有strategies中执行。 但时间事件中,无合约,怎么让它在所有策略中执行呢
def process_systime_event(self, event: Event):
""""""
systime = event.data
strategies = self.symbol_strategy_map[tick.vt_symbol] ?????
if not strategies:
return
for strategy in strategies:
if strategy.inited:
self.call_strategy_func(strategy, strategy.on_time, tick)
问题已解决,谢谢各位的反馈和帮助。
为了供有类似要求的朋友参考,把我这边的修改罗列在下面:
1。cta_strategy文件夹下面engin文件修改:
1)多import一个EVENT_TIMER 事件
from vnpy.trader.event import (
EVENT_TICK,
EVENT_ORDER,
EVENT_TRADE,
EVENT_POSITION,
EVENT_TIMER
)
2)注册事件时,增加一个self.event_engine.register(EVENT_TIMER,self.process_time_event)
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_TIMER,self.process_time_event) # test
3)参照process_tick_event 写一个process_time_event(self, event: Event).。由于每秒中都会调用这个过程,担心占用资源太多,所以在系统中定义了一个timecount的变量,这里设置每5秒真正调用一次。
def process_time_event(self, event: Event):
""""""
if self.timecount < 5:
self.timecount = self.timecount + 1
else:
self.timecount = 0
for k,v in self.symbol_strategy_map.items():
strategies = v
for strategy in strategies:
if strategy.inited:
#self.write_log("valid strategy time driven!!")
self.call_strategy_func(strategy, strategy.on_time)
2. 上面一个过程中实际调用on_time函数,所以要做一些处理
2.1 cta_strategy文件夹中template文件中,参考on_tick函数,定义on_time函数
@virtual
def on_time(self):
"""
Callback of new time data update.
"""
pass
2.2 自编策略文件中定义 on_time(self)函数即可
def on_time(self):
"""
Callback of new time data update.
"""
本人使用这个on_time函数,主要是实盘时用,后向测试可能出问题。
再一次感谢大家的帮助!