之前为了确保bars里面的数据都是最新的用的是bars里面的datetime都相等再推送,这样写不同交易时间的标的就不能写到一起组合交易了。问题是bars里面的数据是逐条更新的,里面bar.datetime是像{"rb2010":2020-08-07 09:00,"rb2101":2020-08-07 09:00},{"rb2010":2020-08-07 10:00,"rb2101":2020-08-07 09:00},{"rb2010":2020-08-07 10:00,"rb2101":2020-08-07 10:00}这样推送的,不能在{"rb2010":2020-08-07 10:00,"rb2101":2020-08-07 09:00}这样的情况下推送bars,rb2101的datetime不是最新的计算会出错。那么怎么才能确保bars里面的数据都是最新的再统一推送呢?今天发现可以用bar.vt_symbol来判定bars里面的数据是否一轮推送完成,代码如下:
def __init__(self, strategy_engine: StrategyEngine, strategy_name: str,vt_symbols: List[str], setting: dict):
"""
全局变量要写到init函数里面
"""
super().__init__(strategy_engine, strategy_name, vt_symbols, setting)
self.bars_data = {}
self.minute_data = {}
self.bar_generator = {}
self.array_manager = {}
for vt_symbol in self.vt_symbols:
self.bar_generator[vt_symbol] = BarGenerator(self.on_bar, self.operation_circle, self.on_x_minute_bar, Interval.MINUTE)
self.array_manager[vt_symbol] = ArrayManager(self.buffer_size)
#------------------------------------------------------------------------------------
def on_tick(self, tick: TickData):
"""
收到所有tick数据推送
"""
self.bar_generator[tick.vt_symbol].update_tick(tick)
#------------------------------------------------------------------------------------
def on_bar(self, bar: BarData):
"""
收到所有bar分钟数据推送
"""
#实盘on_bars数据由on_bar传参,回测数据直接从on_bars传参
self.bars_data.update({bar.vt_symbol: bar})
#bar.vt_symbol等于bars_data最后的key,所有bar推送完成,推送bars_data到on_bars
if bar.vt_symbol == list(self.bars_data.keys())[-1]:
self.on_bars(self.bars_data)
#------------------------------------------------------------------------------------
def on_bars(self, bars: Dict[str, BarData]):
"""
收到bars分钟数据推送
"""
for bar in bars.values():
self.bar_generator[bar.vt_symbol].update_bar(bar)
#------------------------------------------------------------------------------------
def on_x_minute_bar(self, bar: BarData):
"""
收到所有X分钟bar数据推送
"""
self.minute_data.update({bar.vt_symbol: bar})
#bar.vt_symbol等于minute_data最后的key,所有bar推送完成,推送到on_x_minute_bars
if bar.vt_symbol == list(self.minute_data.keys())[-1]:
self.on_x_minute_bars(self.minute_data)
#------------------------------------------------------------------------------------
def on_x_minute_bars(self, bars: Dict[str, BarData]):
"""
收到X分钟bars数据推送
"""
for bar in bars.values():
self.array_manager[bar.vt_symbol].update_bar(bar)
if not self.array_manager[bar.vt_symbol].inited:
return