VeighNa量化社区
你的开源社区量化交易平台
Member
加入于:
帖子: 158
声望: 71

创建一个集中的k线生成引擎,可以统一的生成不同品种和长度的k线;各个策略采用注册订阅的模式,接收k线更新和推送。

使用背景,在使用vntrade cta策略交易模块时候,其每个策略都有各自实例化的K线生成器BarGenerator,来根据传入的tick或者历史回放的bar,来合成需要的k线bar,在on_minute_bar方法中进行条件发单或者平仓。对于中低频的分钟级策略,一个策略大部分时候,甚至90%时间主要的事情就是接收tick,然后合成1分钟bar,然后合成15分钟或者其他自定义长度bar。

对于一个品种只有一两个策略倒是没有什么问题,如果一个品种对应的策略比较多,比如针对螺纹钢的策略有5-6个,那么每个策略自己来花时间做同样的K线合成实在有浪费算力,尤其是python本身效率不高情况。从项目角度来看,批量的功能重复的事情,可以通过集中k线生成器来实现。集中合成bar,使用方法回调传给策略对应的方法on_bar或者on_minute_bar。

本来想创建一个新的针对Bar的Event事件,使用Event事件引擎来传送合成好的bar到订阅的策略里面;Event是标志位只有简单名称str一条,而每个bar,至少有vt_symbol, window 和interval三个标志位来区分推送给。写着写着脱离。

主要有两个类,BarFactoryEngine,就是一个前台引擎,提供针对策略K线调用方法注册,接收tick,和历史回放的bar等。

其中注册订阅方法,策略注册需要的品种vt_symbol, K线时长,on_minute_bar方法。

BarFactoryEngine带有symbol_barProdutors {品种: bar_productor实例} 字典存储对应不同品种和对应bar_productor实例。

BarProductor,就是负责和vntrader自带BarGenerator交互;被隐藏在引擎后面。有两个字典。interval_bg是{K线长度:对应BarGenerator} 字典,存储真正的生成器;interval_functions_dict是{间隔 - [注册方法]}字典存储不同间隔和需要通知方法队列。

代码在后面,使用的时候,可以在cta_engine 初始化时候,创建BarFactoryEngine;策略使用很简单,就是在替换掉原来self.bg,换成回调方法组成即可,这里一分钟方法on_bar也要显示的注册。

cta_engine.barEngine.register_bar_function(vt_symbol, 1, self.on_bar)
cta_engine.barEngine.register_bar_function(vt_symbol,10,self.on_window_bar)

另外,因为BarGenerator生成的bar,并不带有K线信息,不得不加上在BarGenerator创建bar时候加上下面两个属性,作为字典key值来找到对应调用方法组;这里可以用装饰器来实现,不过对原本BarGenerator修改都是必须的。

self.window_bar.interval =self.interval
self.window_bar.window = self.window
# Check if window bar completed

这里也支持历史推送,倒是间接的解决重复历史推送回放的问题,只要一次推送,所有这个品种的策略都会更新。 在cta_engine 的load_bar方法中,直接调用,会自动推送到所有涉及的策略,当然,对于原来的策略初始化逻辑也要修改。

for bar in bars:
    # callback(bar)
    self.barEngine.update_bar(bar)

最后,这个只是一个很初级的代码,没有考虑到多线程,数据同步等情况。如果真的使用,还有大面积更新,纯属抛砖引玉。

比如继承BarGenerator,实现一个barGenerator生成多个不同长度K线功能等等。

from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.constant import Interval
from vnpy.trader.object import (
    TickData,
    BarData
)
from vnpy.trader.event import EVENT_TICK
from vnpy.trader.utility import  BarGenerator
from typing import Callable
APP_NAME = "BarFactory"
class BarProductor:
    """
    BarProductor有个base_bg bargenertor默认提供1分钟bar,其他分钟长度bargenertor使用这个base_bg的一分钟bar合成
    有{间隔 - generator}字典保存不同间隔和对应的bargenerator
    有{间隔 - [注册方法]} 字典存储不同间隔和需要通知方法队列。
    """
    def __init__(self,vt_symbol):
        self.vt_symbol = vt_symbol
        self.base_bg = BarGenerator(self.on_bar,1,self._process)
        self.interval_bg = {}
        self.interval_functions_dict = {}
        self.interval_bg[(1,"1m")] = self.base_bg
        self.interval_functions_dict[(1,"1m")] = []
    def register_function(self, interval:tuple, function) -> None:
        function_list = self.get_function_list(interval)
        if function not in function_list:
            function_list.append(function)
    def get_function_list(self,interval):
        function_list = self.interval_functions_dict.get(interval,None)
        if not function_list:
            self.get_bg(interval)
            self.interval_functions_dict[interval] = []
        return self.interval_functions_dict[interval]
    def get_bg(self,interval:tuple):
        bg = self.interval_bg.get(interval, None)
        if not bg:
            self.interval_bg[interval] = BarGenerator(self.on_bar, interval[0], self._process, interval[1])
        return self.interval_bg[interval]
    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.base_bg.update_tick(tick)
    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        [bg.update_bar(bar) for bg in self.interval_bg.values()]
    def _process(self,bar: BarData):
        function_list = self.get_function_list((bar.window,bar.interval))
        [function(bar) for function in function_list]
class BarFactoryEngine(BaseEngine):
    """
    Bar批量生成器,策略注册需要的bar和对应on_bar或on_minute_bar方法;BarFactoryEngine去生成bar,并推送给对应方法。
    BarFactoryEngine带有{品种-bar_productor实例} 字典存储对应不同品种和对应bar_productor实例,
    """
    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine, event_engine, APP_NAME)
        self.symbol_barProdutors = {}
    def register_bar_function(
            self,
            vt_symbol,
            window: int = 0,
            on_window_bar: Callable = None,
            interval: Interval = Interval.MINUTE
    ):
        """"""
        bar_productor = self.get_symbol_barProdutors(vt_symbol)
        bar_productor.register_function((window,interval),on_window_bar)
    def get_symbol_barProdutors(self, vt_symbol: str):
        bp = self.symbol_barProdutors.get(vt_symbol, None)
        if not bp:
            bp = BarProductor(vt_symbol)
            self.symbol_barProdutors[vt_symbol] = bp
        return bp
    def register_event(self):
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
    def process_tick_event(self, event: Event):
        """"""
        tick = event.data
        for tick.vt_symbol in self.symbol_barProdutors:
            self.symbol_barProdutors[tick.vt_symbol].on_tick(tick)
    def update_bar(self, bar: BarData):
        """"""
        for bar.vt_symbol in self.symbol_barProdutors:
            self.symbol_barProdutors[bar.vt_symbol].on_bar(bar)
Member
加入于:
帖子: 158
声望: 71

刚刚又想到,比如策略的删除,还涉及到取消注册的问题,看来还是改动还是很伤筋动骨的。

Administrator
avatar
加入于:
帖子: 4500
声望: 320

感谢分享,精华送上!

最早的vn.py其实只有交易接口和事件驱动引擎,策略应用都是用户直接在驱动引擎层面开发实现的。

后来大家反馈比较复杂,才逐渐提供了标准化的CTA策略模块、价差交易模块这些组件。

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】