VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 32
声望: 2

最近在做日线和周线合成的时候,遇到不少坑,特意做了一下改进,直接上代码:

def the_end_window_datetime(dt: datetime,
                            window: int = 1,
                            interval: Interval = Interval.MINUTE,
                            delta_tuple: tuple = None) -> datetime:
    """获取当前周期的最后时间"""
    if interval.MINUTE == interval:
        nsecs = dt.minute * 60 + dt.second + dt.microsecond * 1e-6
        window_sec = 60 * window
    elif interval.HOUR == interval:
        nsecs = dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond * 1e-6
        window_sec = 3600 * window
    elif interval.DAILY == interval:
        nsecs = dt.day * 86400 + dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond * 1e-6
        window_sec = 86400 * window
    elif interval.WEEKLY == interval:
        nsecs = dt.weekday() * 86400 + dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond * 1e-6
        window_sec = 604800 * window
    shift = 0
    if delta_tuple:
        days, hours, minutes, seconds = delta_tuple
        shift = days * 86400 + hours * 3600 + minutes * 60 + seconds
    delta = math.ceil((nsecs - shift) / window_sec) * window_sec + shift - nsecs
    return dt + timedelta(seconds = delta)
#----------------------------------------------------------------------
# 自定义 BarGenerator
#----------------------------------------------------------------------
class AzBarGenerator(BarGenerator):
    """
    一般周期生成器,由于无法保证 window 结束时间的确定性,因此合并前检测
    如:国内期货上午 10:15 - 10:30 休市,因此就无法保证 30min 周期的结束时间有数据
        所以只能通过下一个数据是否跨越了 window 的结束时间来判定
    """

    def __init__(self,
                 on_bar: Callable,
                 window: int = 1,
                 on_window_bar: Callable = None,
                 interval: Interval = Interval.MINUTE):
        super(AzBarGenerator, self).__init__(
            on_bar, window, on_window_bar, interval)

    # 调整为结束时间戳,方便后续处理
    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False
        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        if not self.bar:
            new_minute = True
        elif tick.datetime > self.bar.datetime:
            # 不属于同一个 window 了,生成 bar
            self.on_bar(self.bar)
            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol       = tick.symbol,
                exchange     = tick.exchange,
                interval     = Interval.MINUTE,
                # 标记此 window 的结束时间戳
                datetime     = the_end_window_datetime(tick.datetime),
                gateway_name = tick.gateway_name,
                open_price   = tick.last_price,
                high_price   = tick.last_price,
                low_price    = tick.last_price,
                close_price  = tick.last_price,
                open_interest = tick.open_interest)
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

        self.last_tick = tick

    def _update_window_bar(self, bar: BarData) -> None:
        """更新 window bar 数据"""
        if not self.window_bar:
            dt = self.normalize_window_bar_datetime(bar.datetime)
            self.window_bar  = BarData(
                symbol       = bar.symbol,
                exchange     = bar.exchange,
                datetime     = dt,
                gateway_name = bar.gateway_name,
                open_price   = bar.open_price,
                high_price   = bar.high_price,
                low_price    = bar.low_price)
            # Otherwise, update high/low price into window bar
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price, bar.high_price)
            self.window_bar.low_price = min(
                self.window_bar.low_price, bar.low_price)

        # Update close price/volume into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += int(bar.volume)
        self.window_bar.open_interest = bar.open_interest

    def update_bar(self, bar: BarData) -> None:
        """update bar into generator"""
        # Check if window bar completed
        if self.check_window_bar_finished(bar):
            self.on_window_bar(self.window_bar)
            self.window_bar = None
        # 更新 window bar
        self._update_window_bar(bar)
        # Cache last bar object
        self.last_bar = bar

    def check_window_bar_finished(self, bar: BarData):
        """检查 window bar 是否完成"""
        if self.window_bar and bar.datetime > self.window_bar.datetime:
            return True
        else:
            return False

    def normalize_window_bar_datetime(self, dt: datetime):
        """标准化 window bar datetime 数据"""
        return the_end_window_datetime(dt, self.window, self.interval)
Member
avatar
加入于:
帖子: 32
声望: 2

接下来是日周期的,比较简单:

class AzDBarGenerator(AzBarGenerator):
    """日线生成器,收盘时间总是能确定的,因此合并后检测"""

    def __init__(self,
                 on_bar: Callable,
                 window: int = 1,
                 on_window_bar: Callable = None,
                 close_time: tuple = (0, 15, 0, 0)):
        super(AzDBarGenerator, self).__init__(
            on_bar, window, on_window_bar, Interval.DAILY)
        # 每日的收盘时间
        self._close_time = close_time

    def update_bar(self, bar: BarData) -> None:
        """update bar into generator"""
        # 更新 window bar
        self._update_window_bar(bar)
        # Check if window bar completed
        if self.check_window_bar_finished(bar):
            self.on_window_bar(self.window_bar)
            self.window_bar = None

    def check_window_bar_finished(self, bar: BarData):
        """检查一个 window bar 是否结束"""
        if bar.datetime.hour == self._close_time[1] and \
           bar.datetime.minute == self._close_time[2] and \
           bar.datetime.second == self._close_time[3]:
            # 检测到收盘信息,更新 window bar 的收盘时间
            # 由于日线涉及到夜盘跨天、跨周等复杂情况,为了方便,收盘还需要更新时间戳
            self.window_bar.datetime = bar.datetime
            return True
        return False

    def normalize_window_bar_datetime(self, dt: datetime):
        """标准化 window bar datetime 数据"""
        return the_end_window_datetime(
            dt, self.window, self.interval, self._close_time)

然后是周周期的:

class AzWBarGenerator(AzBarGenerator):
    """周线生成器,由于假期的存在, window 结束时间不确定,因此先检测后合并"""

    def __init__(self,
                 on_bar: Callable,
                 window: int = 1,
                 on_window_bar: Callable = None,
                 # 截止时间为周五收盘
                 close_time: tuple = (4, 15, 0, 0)):
        super(AzWBarGenerator, self).__init__(
            on_bar, window, on_window_bar, Interval.WEEKLY)
        self._close_time = close_time
        self._end_window_time = None

    def check_window_bar_finished(self, bar: BarData):
        """检查一个 window bar 是否结束"""
        if self.window_bar and \
           bar.datetime.hour == self._close_time[1] and \
           bar.datetime.minute == self._close_time[2] and \
           bar.datetime.second == self._close_time[3]:
            # 检测到收盘信息,更新 window bar 的收盘时间
            # 由于周线涉及到放假等特殊情况,并不能确保周五收盘
            # 为了方便, 在收盘时都要更新时间戳
            self.window_bar.datetime = bar.datetime
        if self._end_window_time and bar.datetime > self._end_window_time:
            return True
        return False

    def normalize_window_bar_datetime(self, dt: datetime):
        """标准化 window bar datetime 数据"""
        end_window = the_end_window_datetime(
            dt, self.window, self.interval, self._close_time)
        self._end_window_time = end_window
        return end_window

之后在策略里面这样使用:

        self.bg15 = AzBarGenerator(self.on_bar, 15, self.on_15m_bar)
        self.bg_d = AzDBarGenerator(self.on_bar, 1, self.on_daily_bar)
        self.bg_w = AzWBarGenerator(self.on_bar, 1, self.on_weekly_bar)

然后就可以在 on_daily_bar 和 on_weekly_bar 中写逻辑了。
下面是测试数据:

I99.DCE 指数数据,通过 15 分钟 k 线数据合成
INFO  day: 2020-06-15 15:00:00 762.5, 774.0, 743.5, 749.0
INFO  day: 2020-06-16 15:00:00 749.5, 772.5, 749.5, 768.0
INFO  day: 2020-06-17 15:00:00 769.0, 773.5, 750.5, 753.0
INFO  day: 2020-06-18 15:00:00 753.5, 762.0, 745.0, 751.5
INFO  day: 2020-06-19 15:00:00 756.0, 764.0, 751.0, 754.0
INFO  week: 2020-06-19 15:00:00 762.5, 774.0, 743.5, 754.0
INFO  day: 2020-06-22 15:00:00 754.5, 757.5, 736.5, 739.0
INFO  day: 2020-06-23 15:00:00 739.0, 747.0, 734.5, 742.5
INFO  day: 2020-06-24 15:00:00 746.0, 757.0, 742.0, 755.5
INFO  week: 2020-06-24 15:00:00 754.5, 757.5, 734.5, 755.5
INFO  day: 2020-06-29 15:00:00 750.0, 751.0, 718.0, 721.0
INFO  day: 2020-06-30 15:00:00 723.0, 736.0, 722.5, 728.5
INFO  day: 2020-07-01 15:00:00 728.0, 734.0, 715.0, 726.0
INFO  day: 2020-07-02 15:00:00 723.0, 731.0, 719.5, 722.5
INFO  day: 2020-07-03 15:00:00 725.0, 731.5, 722.0, 729.5
INFO  week: 2020-07-03 15:00:00 750.0, 751.0, 715.0, 729.5

总算和文华的数据对上了,中途有放假数据也是一致的了。

Member
avatar
加入于:
帖子: 35
声望: 1

mark!

Member
avatar
加入于:
帖子: 6
声望: 0

先收下

Member
avatar
加入于:
帖子: 42
声望: 4

AzBargenerator的代码中

if not self.bar:
    new_minute = True
elif tick.datetime > self.bar.datetime:
    # 不属于同一个 window 了,生成 bar
    self.on_bar(self.bar)
    new_minute = True

elif的判断有问题吧,按这个逻辑,每隔半秒(现在交易所0.5s推送一笔成交)就会生成一根1分钟线。你测试时能跟文华K线对上?

Member
avatar
加入于:
帖子: 42
声望: 4

上面的判断没有问题,是我看错了。因为你之前已经对self.bar.datetime做了预置处理。但是下面的代码最后一行应该是有问题的

else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

这一行代码不应该有,否则K线的生成间隔就都乱了。

Member
avatar
加入于:
帖子: 16
声望: 2

远山 wrote:

上面的判断没有问题,是我看错了。因为你之前已经对self.bar.datetime做了预置处理。但是下面的代码最后一行应该是有问题的

else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

这一行代码不应该有,否则K线的生成间隔就都乱了。

好像是的,去掉这一行就完美了,整体没什么问题了。
谢谢楼主,学习了。

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】