1. 系统自带的BarGenerator产生n分钟bar会丢弃一部分bar。

上周升级到了vnpy 2.9.0版本,编写了个策略,用到了30分钟Bar。

        self.dir_bg = BarGenerator(on_bar = self.on_bar,window = 30,
                                    on_window_bar = self.on_30m_bar,interval = Interval.MINUTE)

那个意思就是创建一个30分钟bar合成器。
策略的on_30m_bar()是这样的,先打印出来看看:

    def on_30m_bar(self, bar: BarData):
        """  
        收到方向周期的K线
        """
        print(f"{self.strategy_name}收到30分钟周期K线{bar}")

结果杯具了:

GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 23, 21, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=225506.0, turnover=10790057010.0, open_interest=1921172.0, open_price=4788.0, high_price=4808.0, low_price=4762.0, close_price=4777.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 23, 21, 30, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=179987.0, turnover=8570390020.0, open_interest=1903437.0, open_price=4778.0, high_price=4778.0, low_price=4751.0, close_price=4760.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 23, 22, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=99381.0, turnover=4723786710.0, open_interest=1905948.0, open_price=4760.0, high_price=4766.0, low_price=4743.0, close_price=4746.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 23, 22, 30, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=83782.0, turnover=3985661600.0, open_interest=1904511.0, open_price=4745.0, high_price=4767.0, low_price=4744.0, close_price=4763.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 9, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=99253.0, turnover=4714720470.0, open_interest=1916969.0, open_price=4763.0, high_price=4766.0, low_price=4738.0, close_price=4748.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 9, 30, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=85886.0, turnover=4076563050.0, open_interest=1930796.0, open_price=4750.0, high_price=4760.0, low_price=4735.0, close_price=4736.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 10, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=491666.0, turnover=23080991050.0, open_interest=1982231.0, open_price=4736.0, high_price=4741.0, low_price=4660.0, close_price=4665.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 11, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=279000.0, turnover=13059711390.0, open_interest=2005223.0, open_price=4666.0, high_price=4713.0, low_price=4654.0, close_price=4676.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 13, 30, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=271595.0, turnover=12701729900.0, open_interest=2021599.0, open_price=4664.0, high_price=4709.0, low_price=4648.0, close_price=4673.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 14, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=266018.0, turnover=12358486960.0, open_interest=2082998.0, open_price=4673.0, high_price=4674.0, low_price=4622.0, close_price=4623.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 14, 30, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=175873.0, turnover=8162467860.0, open_interest=2081475.0, open_price=4624.0, high_price=4654.0, low_price=4624.0, close_price=4637.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 21, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=312119.0, turnover=14487421790.0, open_interest=2043795.0, open_price=4635.0, high_price=4664.0, low_price=4613.0, close_price=4655.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 21, 30, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=106717.0, turnover=4970359120.0, open_interest=2025130.0, open_price=4656.0, high_price=4666.0, low_price=4648.0, close_price=4655.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 22, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=60255.0, turnover=2807077210.0, open_interest=2011503.0, open_price=4657.0, high_price=4665.0, low_price=4652.0, close_price=4659.0)
GsjyDemo2收到30分钟周期K线BarData(gateway_name='RQ', symbol='rb2205', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2022, 2, 24, 22, 30, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=None, volume=184047.0, turnover=8524883500.0, open_interest=1989335.0, open_price=4660.0, high_price=4661.0, low_price=4608.0, close_price=4614.0)

错误:居然把每天10:00-11:00的1分钟bar合成为1个30分钟bar !无论怎样也是错误的,因为10:00-11:00一共有45分钟到交易数据,怎么也不能合成为1个30分钟bar,肯定错误了。

2. 错在哪里了?

找到BarGenerator的错误了:

    def update_bar_minute_window(self, bar: BarData) -> None:
        """"""
        # If not inited, create window bar object
        if not self.window_bar:
            dt = bar.datetime.replace(second=0, microsecond=0)
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        # Otherwise, update high/low price into window bar
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price,
                bar.high_price
            )
            self.window_bar.low_price = min(
                self.window_bar.low_price,
                bar.low_price
            )

        # Update close price/volume/turnover into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += bar.volume
        self.window_bar.turnover += bar.turnover
        self.window_bar.open_interest = bar.open_interest

        Check if window bar completed

        # 这里错误了,用当前1分钟到分钟数+1与30取模来决定一个30分钟K线是否结束,
        # 先推送已合成bar,在生成下一个新的30分钟bar。
        # 可是10:15-10:30是休市时间段,永远也等不到10:29分钟到那个1分钟bar,所以只能在10:59符合条件,
        # 因此这个30分钟bar实际上包含了45分钟到交易数据,错误!!!

        if not (bar.datetime.minute + 1) % self.window:   
             self.on_window_bar(self.window_bar)
             self.window_bar = None

3. 如何解决?

问题分析清楚了,就不再解释怎么修改了,直接上修改的BarGenerator完整代码吧。
BarGenerator在vnpy.trader.utility中,拷贝过去替换就OK了。
测试过了,和文华6产生的30分钟K线一模一样。
如果想知道哪里修改了,查找 # hxxjava就可以找到修改处。

3.1 修改的代码

vnpy/trader/utility.py的前面添加引用:

from datetime import timedelta

BarGenerator的修改如下:

class BarGenerator:
    """
    For:
    1. generating 1 minute bar data from tick data
    2. generating x minute bar/x hour bar data from 1 minute data

    Notice:
    1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
    2. for x hour bar, x can be any number
    """

    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE,
        daily_close_time:str = "15:00"
    ):
        """Constructor"""
        self.bar: BarData = None
        self.on_bar: Callable = on_bar

        self.interval: Interval = interval
        self.interval_count: int = 0

        self.hour_bar: BarData = None

        self.window: int = window
        self.count_for_window : int = 0   # hxxjava add
        self.window_bar: BarData = None
        self.on_window_bar: Callable = on_window_bar

        self.last_tick: TickData = None
        self.daily_close_time = daily_close_time

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        # Filter tick data with older timestamp
        if self.last_tick and tick.datetime < self.last_tick.datetime:
            return

        if not self.bar:
            new_minute = True
        elif (
            (self.bar.datetime.minute != tick.datetime.minute)
            or (self.bar.datetime.hour != tick.datetime.hour)
        ):
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )
            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            if tick.high_price > self.last_tick.high_price:
                self.bar.high_price = max(self.bar.high_price, tick.high_price)

            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            if tick.low_price < self.last_tick.low_price:
                self.bar.low_price = min(self.bar.low_price, tick.low_price)

            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

            turnover_change = tick.turnover - self.last_tick.turnover
            self.bar.turnover += max(turnover_change, 0)

        self.last_tick = tick

    def update_bar(self, bar: BarData) -> None:
        """
        Update 1 minute bar into generator
        """
        if self.interval == Interval.MINUTE:
            self.update_bar_minute_window(bar)
        else:
            self.update_bar_hour_window(bar)

    def update_bar_minute_window(self, bar: BarData) -> None:
        """"""
        # If not inited, create window bar object
        if not self.window_bar:
            dt = bar.datetime.replace(second=0, microsecond=0)
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        # Otherwise, update high/low price into window bar
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price,
                bar.high_price
            )
            self.window_bar.low_price = min(
                self.window_bar.low_price,
                bar.low_price
            )

        # Update close price/volume/turnover into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += bar.volume
        self.window_bar.turnover += bar.turnover
        self.window_bar.open_interest = bar.open_interest

        # Check if window bar completed
        # if not (bar.datetime.minute + 1) % self.window:
        #     self.on_window_bar(self.window_bar)
        #     self.window_bar = None

        # hxxjava add start
        h,m = self.daily_close_time.split(':')
        today_close_time = bar.datetime.replace(hour=int(h),minute=int(m),second=0,microsecond=0)
        enter_next_day = bar.datetime + timedelta(minutes=1) == today_close_time
        if self.count_for_window + 1 == self.window or enter_next_day:
            self.on_window_bar(self.window_bar)
            self.window_bar = None

        if enter_next_day:
            self.count_for_window = 0
        else:
            self.count_for_window += 1
            self.count_for_window %= self.window

        # hxxjava add end

    def update_bar_hour_window(self, bar: BarData) -> None:
        """"""
        # If not inited, create window bar object
        if not self.hour_bar:
            dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
            self.hour_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
                close_price=bar.close_price,
                volume=bar.volume,
                turnover=bar.turnover,
                open_interest=bar.open_interest
            )
            return

        finished_bar = None

        # If minute is 59, update minute bar into window bar and push
        if bar.datetime.minute == 59:
            self.hour_bar.high_price = max(
                self.hour_bar.high_price,
                bar.high_price
            )
            self.hour_bar.low_price = min(
                self.hour_bar.low_price,
                bar.low_price
            )

            self.hour_bar.close_price = bar.close_price
            self.hour_bar.volume += bar.volume
            self.hour_bar.turnover += bar.turnover
            self.hour_bar.open_interest = bar.open_interest

            finished_bar = self.hour_bar
            self.hour_bar = None

        # If minute bar of new hour, then push existing window bar
        elif bar.datetime.hour != self.hour_bar.datetime.hour:
            finished_bar = self.hour_bar

            dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
            self.hour_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
                close_price=bar.close_price,
                volume=bar.volume,
                turnover=bar.turnover,
                open_interest=bar.open_interest
            )
        # Otherwise only update minute bar
        else:
            self.hour_bar.high_price = max(
                self.hour_bar.high_price,
                bar.high_price
            )
            self.hour_bar.low_price = min(
                self.hour_bar.low_price,
                bar.low_price
            )

            self.hour_bar.close_price = bar.close_price
            self.hour_bar.volume += bar.volume
            self.hour_bar.turnover += bar.turnover
            self.hour_bar.open_interest = bar.open_interest

        # Push finished window bar
        if finished_bar:
            self.on_hour_bar(finished_bar)

    def on_hour_bar(self, bar: BarData) -> None:
        """"""
        if self.window == 1:
            self.on_window_bar(bar)
        else:
            if not self.window_bar:
                self.window_bar = BarData(
                    symbol=bar.symbol,
                    exchange=bar.exchange,
                    datetime=bar.datetime,
                    gateway_name=bar.gateway_name,
                    open_price=bar.open_price,
                    high_price=bar.high_price,
                    low_price=bar.low_price
                )
            else:
                self.window_bar.high_price = max(
                    self.window_bar.high_price,
                    bar.high_price
                )
                self.window_bar.low_price = min(
                    self.window_bar.low_price,
                    bar.low_price
                )

            self.window_bar.close_price = bar.close_price
            self.window_bar.volume += bar.volume
            self.window_bar.turnover += bar.turnover
            self.window_bar.open_interest = bar.open_interest

            self.interval_count += 1
            if not self.interval_count % self.window:
                self.interval_count = 0
                self.on_window_bar(self.window_bar)
                self.window_bar = None

    def generate(self) -> Optional[BarData]:
        """
        Generate the bar data and call callback immediately.
        """
        bar = self.bar

        if self.bar:
            bar.datetime = bar.datetime.replace(second=0, microsecond=0)
            self.on_bar(bar)

        self.bar = None
        return bar

3.2 修改说明

本人修改原则是n分钟bar按照日内对齐的原则,即:

  • 跨过日内的休市时间段连续合成;
  • n分钟生成一个bar并且立即生成一个bar并且推送之;
  • 但是遇到收市,无论是否满n分钟并且立即生成一个bar并且推送之。

3.3 使用说明

注意到BarGenerator的构造函数多了个daily_close_time参数,字符串类型,默认值为"15:00"。
例如:

  • 如果是国内股票、期货或期权,大部分交易品种都无需特别修改收市时间,忽略即可;例如:
self.bg30m = BarGenerator(on_bar = self.on_bar,window = 30,on_window_bar = self.on_30m_bar,interval = Interval.MINUTE)  # 默认15:00收市

但是如果有些例如国债等品种,它的收市时间不是15:00,则需要在特别传参,在写作交易策略的时候,可以给出代表收市时间的字符串参数,供创建实例的时候传递给该参数。虽然麻烦了一丢丢,但是已经可以算得上是够方便的啦!

例如:

  • 如果交易的是每日16:00收市的品种,这样创建30分钟bar生成器:
self.bg30m = BarGenerator(on_bar = self.on_bar,window = 30,
                                    on_window_bar = self.on_30m_bar,
                                    interval = Interval.MINUTE,
                                    daily_close_time= "16:00" )
  • 如果交易的是每日伦敦金,每日收市是北京时间5:00,这样创建30分钟bar生成器:
self.bg30m = BarGenerator(on_bar = self.on_bar,window = 30,
                                    on_window_bar = self.on_30m_bar,
                                    interval = Interval.MINUTE,
                                    daily_close_time= "5:00" )