VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 141
声望: 57
from vnpy.trader.object import BarData, TickData, OrderData, TradeData
from vnpy.trader.constant import (Exchange,Interval, Direction, Offset,OrderType)
INTERVAL2SECONDS = {
    Interval.MINUTE:60,
    Interval.HOUR:60 * 60,
    Interval.DAILY:60 * 60 * 24,
}
class CtaTemplate:
    def __init__(self, cta_engine:"CtaEngine", strategy_name: str, vt_symbol: str, setting: dict, ):
        self.last_datetime = None
    #-----------------------------------------------------------------------
    def check_bar_lost(self,bar: BarData,interval: Interval = Interval.MINUTE):
        """
        检查丢失bar
        """
        if self.last_datetime:
            time_delta = (bar.datetime - self.last_datetime).seconds
            senconds = INTERVAL2SECONDS[interval]
            count = time_delta / senconds
            if time_delta > senconds:
                self.check_data_count += count
                if interval == Interval.MINUTE:
                    start_diff = timedelta(minutes=count)
                    end_diff = timedelta(minutes=1)
                elif interval == Interval.HOUR:
                    start_diff = timedelta(hours=count)
                    end_diff = timedelta(hours=1)
                elif interval == Interval.DAILY:
                    start_diff = timedelta(days=count)
                    end_diff = timedelta(days=1)
                print(f"{bar.vt_symbol}缺失bar周期:{interval.value},缺失时间段:{bar.datetime - start_diff}至{bar.datetime - end_diff},缺失量:{count},总缺失量:{self.check_data_count}")    
        self.last_datetime = bar.datetime
Member
avatar
加入于:
帖子: 141
声望: 57

然后在on_bar里面调用

    def on_bar(self, bar: BarData):
        """
        收到分钟推送
        """
        self.bar_generator.update_bar(bar)
        self.check_bar_lost(bar)
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】