创建一个集中的k线生成引擎,可以统一的生成不同品种和长度的k线;各个策略采用注册订阅的模式,接收k线更新和推送。
使用背景,在使用vntrade cta策略交易模块时候,其每个策略都有各自实例化的K线生成器BarGenerator,来根据传入的tick或者历史回放的bar,来合成需要的k线bar,在on_minute_bar方法中进行条件发单或者平仓。对于中低频的分钟级策略,一个策略大部分时候,甚至90%时间主要的事情就是接收tick,然后合成1分钟bar,然后合成15分钟或者其他自定义长度bar。
对于一个品种只有一两个策略倒是没有什么问题,如果一个品种对应的策略比较多,比如针对螺纹钢的策略有5-6个,那么每个策略自己来花时间做同样的K线合成实在有浪费算力,尤其是python本身效率不高情况。从项目角度来看,批量的功能重复的事情,可以通过集中k线生成器来实现。集中合成bar,使用方法回调传给策略对应的方法on_bar或者on_minute_bar。
本来想创建一个新的针对Bar的Event事件,使用Event事件引擎来传送合成好的bar到订阅的策略里面;Event是标志位只有简单名称str一条,而每个bar,至少有vt_symbol, window 和interval三个标志位来区分推送给。写着写着脱离。
主要有两个类,BarFactoryEngine,就是一个前台引擎,提供针对策略K线调用方法注册,接收tick,和历史回放的bar等。
其中注册订阅方法,策略注册需要的品种vt_symbol, K线时长,on_minute_bar方法。
BarFactoryEngine带有symbol_barProdutors {品种: bar_productor实例} 字典存储对应不同品种和对应bar_productor实例。
BarProductor,就是负责和vntrader自带BarGenerator交互;被隐藏在引擎后面。有两个字典。interval_bg是{K线长度:对应BarGenerator} 字典,存储真正的生成器;interval_functions_dict是{间隔 - [注册方法]}字典存储不同间隔和需要通知方法队列。
代码在后面,使用的时候,可以在cta_engine 初始化时候,创建BarFactoryEngine;策略使用很简单,就是在替换掉原来self.bg,换成回调方法组成即可,这里一分钟方法on_bar也要显示的注册。
cta_engine.barEngine.register_bar_function(vt_symbol, 1, self.on_bar)
cta_engine.barEngine.register_bar_function(vt_symbol,10,self.on_window_bar)
另外,因为BarGenerator生成的bar,并不带有K线信息,不得不加上在BarGenerator创建bar时候加上下面两个属性,作为字典key值来找到对应调用方法组;这里可以用装饰器来实现,不过对原本BarGenerator修改都是必须的。
self.window_bar.interval =self.interval
self.window_bar.window = self.window
# Check if window bar completed
这里也支持历史推送,倒是间接的解决重复历史推送回放的问题,只要一次推送,所有这个品种的策略都会更新。 在cta_engine 的load_bar方法中,直接调用,会自动推送到所有涉及的策略,当然,对于原来的策略初始化逻辑也要修改。
for bar in bars:
# callback(bar)
self.barEngine.update_bar(bar)
最后,这个只是一个很初级的代码,没有考虑到多线程,数据同步等情况。如果真的使用,还有大面积更新,纯属抛砖引玉。
比如继承BarGenerator,实现一个barGenerator生成多个不同长度K线功能等等。
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.constant import Interval
from vnpy.trader.object import (
TickData,
BarData
)
from vnpy.trader.event import EVENT_TICK
from vnpy.trader.utility import BarGenerator
from typing import Callable
APP_NAME = "BarFactory"
class BarProductor:
"""
BarProductor有个base_bg bargenertor默认提供1分钟bar,其他分钟长度bargenertor使用这个base_bg的一分钟bar合成
有{间隔 - generator}字典保存不同间隔和对应的bargenerator
有{间隔 - [注册方法]} 字典存储不同间隔和需要通知方法队列。
"""
def __init__(self,vt_symbol):
self.vt_symbol = vt_symbol
self.base_bg = BarGenerator(self.on_bar,1,self._process)
self.interval_bg = {}
self.interval_functions_dict = {}
self.interval_bg[(1,"1m")] = self.base_bg
self.interval_functions_dict[(1,"1m")] = []
def register_function(self, interval:tuple, function) -> None:
function_list = self.get_function_list(interval)
if function not in function_list:
function_list.append(function)
def get_function_list(self,interval):
function_list = self.interval_functions_dict.get(interval,None)
if not function_list:
self.get_bg(interval)
self.interval_functions_dict[interval] = []
return self.interval_functions_dict[interval]
def get_bg(self,interval:tuple):
bg = self.interval_bg.get(interval, None)
if not bg:
self.interval_bg[interval] = BarGenerator(self.on_bar, interval[0], self._process, interval[1])
return self.interval_bg[interval]
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.base_bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
[bg.update_bar(bar) for bg in self.interval_bg.values()]
def _process(self,bar: BarData):
function_list = self.get_function_list((bar.window,bar.interval))
[function(bar) for function in function_list]
class BarFactoryEngine(BaseEngine):
"""
Bar批量生成器,策略注册需要的bar和对应on_bar或on_minute_bar方法;BarFactoryEngine去生成bar,并推送给对应方法。
BarFactoryEngine带有{品种-bar_productor实例} 字典存储对应不同品种和对应bar_productor实例,
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
self.symbol_barProdutors = {}
def register_bar_function(
self,
vt_symbol,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
""""""
bar_productor = self.get_symbol_barProdutors(vt_symbol)
bar_productor.register_function((window,interval),on_window_bar)
def get_symbol_barProdutors(self, vt_symbol: str):
bp = self.symbol_barProdutors.get(vt_symbol, None)
if not bp:
bp = BarProductor(vt_symbol)
self.symbol_barProdutors[vt_symbol] = bp
return bp
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
def process_tick_event(self, event: Event):
""""""
tick = event.data
for tick.vt_symbol in self.symbol_barProdutors:
self.symbol_barProdutors[tick.vt_symbol].on_tick(tick)
def update_bar(self, bar: BarData):
""""""
for bar.vt_symbol in self.symbol_barProdutors:
self.symbol_barProdutors[bar.vt_symbol].on_bar(bar)