VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 420
声望: 180

首先说明:这不只是DemoStrategy的问题,其他策略也一样存在

我只是拿DemoStrategy的例子来说明问题。

通常一个策略需要在on_init()函数中调用self.load_bar()

例如:

陈老师在CTA策略实战进阶中的课程里讲的DemoStrategy策略:

class DemoStrategy(CtaTemplate):
    """ 一个演示策略 """
    author = "hxx"

    fast_window = 10
    slow_window = 20

    fast_ma0 = 0
    fast_ma1 = 0
    slow_ma0 = 0
    slow_ma1 = 0

    parameters = [
        "fast_window",
        "slow_window"
    ]

    variables = [
        "fast_ma0",
        "fast_ma1",
        "slow_ma0",
        "slow_ma1",
    ]

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict 
    ):
        """构造函数"""
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)

        self.bg = NewBarGenerator(
            on_bar=self.on_bar,
            window=7,
            on_window_bar=on_7min_bar,
            interval=Interval.Minute)

        self.am = NewArrayManager()


    def on_init(self):
        """"""
        self.write_log("策略初始化")

        self.load_bar(10)       # 加载10日的7分钟K线

    def on_start(self):
        """策略启动"""
        self.write_log("策略启动")

    def on_stop(self):
        """ 策略停止 """
        self.write_log(" 策略停止 ")

    def on_tick(self,tick:TickData):
        """ Tick更新 """
        self.bg.update_tick(tick) 

    def on_bar(self, bar: BarData):
        """K线更新"""
        self.bg.update_bar(bar)

    def on_7min_bar(self, bar: BarData):
        """K线更新"""
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        """ 计算均线 """
        fast_ma = am.sma(self.fast_window,True)
        self.fast_ma0 = fast_ma[-1]
        self.fast_ma1 = fast_ma[-2]

        slow_ma = am.sma(self.slow_window,True)
        self.slow_ma0 = slow_ma[-1]
        self.slow_ma1 = slow_ma[-2]

        """ 定义金叉和死叉 """

        cross_over = (self.fast_ma0>= self.fast_ma1 and
                      self.slow_ma0<self.slow_ma1)  

        cross_below = (self.slow_ma0>self.slow_ma1 and 
                      self.slow_ma0<=self.slow_ma1)

        if cross_over:
            price = bar.close_price + 5

            if not self.pos:
                self.buy(price,1)
            elif self.pos < 0:
                self.cover(price,1)
                self.buy(price,1)
        elif cross_below:
            price = bar.close_price - 5

            if not self.pos:
                self.short(price,1)
            elif self.pos>0:
                self.sell(price,1)
                self.short(price,1)

        # 更新图形界面 
        self.put_event()

self.load_bar()是CtaTemplate策略模版的方法:

    def load_bar(
        self,
        days: int,
        interval: Interval = Interval.MINUTE,
        callback: Callable = None,
        use_database: bool = False
    ):
        """
        Load historical bar data for initializing strategy.
        """
        if not callback:
            callback = self.on_bar

        self.cta_engine.load_bar(
            self.vt_symbol,
            days,
            interval,
            callback,
            use_database
        )

CtaTemplate.load_bar()又调用vnpy.app.cta_strategy.engine里的CtaEngine的load_bar()

CtaEngine的load_bar()的代码是这样的:

    def load_bar(
        self,
        vt_symbol: str,
        days: int,
        interval: Interval,
        callback: Callable[[BarData], None],
        use_database: bool
    ):
        """"""
        symbol, exchange = extract_vt_symbol(vt_symbol)
        end = datetime.now(get_localzone())
        start = end - timedelta(days)
        bars = []

        # Pass gateway and RQData if use_database set to True
        if not use_database:
            # Query bars from gateway if available
            contract = self.main_engine.get_contract(vt_symbol)

            if contract and contract.history_data:
                req = HistoryRequest(
                    symbol=symbol,
                    exchange=exchange,
                    interval=interval,
                    start=start,
                    end=end
                )
                bars = self.main_engine.query_history(req, contract.gateway_name)

            # Try to query bars from RQData, if not found, load from database.
            else:
                bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)

        if not bars:
            bars = database_manager.load_bar_data(
                symbol=symbol,
                exchange=exchange,
                interval=interval,
                start=start,
                end=end,
            )

        for bar in bars:
            callback(bar)

可能的问题:

CtaEngine.load_bar()对历史数据的处理逻辑是:
1 首先从rqdatac获取历史分钟数据,利用DemoStrategyd策略的on_bar()合成出7分钟K线,然后调用on_7min_bar()。这样self.am是一个默认缓冲为100个FIFO数组队列,利用它执行策略的买卖信号计算和下单。它的end是当前时间,start是10天前的时间。策略初始化后,从行情接口可以不断地收到tick数据,然后执行DemoStrategyd策略on_tick()-->on_bar()-->on_7min_bar()
2 如果rqdatac没有读取到历史分钟数据,那么就从本地数据库中读取。问题来了:

1) 如果本地数据库中有start-end之间的数据,可是最后的数据时间与当前时间有空档,例如最后保存的有30分钟空档,而初始化后行情接口虽然也是按照on_tick()-->on_bar()-->on_7min_bar()的过程不断地更新self.am,可是我们知道它所管理的缓冲K线可能是不连续的,这样会导致计算的错误!
2) 如果本地数据库中有start-end之间的数据,可是这些数据本身内部可能好几段历史返分钟数据,本身中间就有空档,导致self.am所管理的缓冲K线可能是不连续的,这样会导致计算的错误!
3) 本地数据可能是从rqdatac读取保存的,有谁的动作那么快,这边保存了历史数据,那边立刻启动策略,立刻初始化?所以当rqdatac没有历史数据,转而从本地数据库读取,这本身就会造成历史K线与新合成处理的数据空档。

建议:在实盘环境下,CtaEngine.load_bar()不要保留从本地数据库读取历史数据

理由如下:

1 实盘策略不要从数据库读取数据的选择,因为容易造成无法补救的K线数据空档;
2 它发生在rqdatac无法读取的情况下,自然无法再从rqdata补齐数据;
3 而行情接口通常不提供历史分钟K线数据功能;
4 可以没有数据,没有当然不会计算买卖点,也就不会交易。
5 如果简单使用有空档K线数据,发出了错误的买卖信号,进行了错误的交易,这是不应该的!

比如:本地数据库最后一条数据是2天前的1分钟K线数据,而当前的rqdatac是不通的,可是行情接口是没有问题的,那么合成出来的新K线与历史K线之间会有多大的跳空,这个是无法想象的,依据这样的有空档的新、旧K数据来计算、交易,谁知道会出什么问题?

Member
avatar
加入于:
帖子: 17
声望: 0

我用的tushare, tushare的分钟数据 ,小时数据要单独购买。 整体算下来和米筐的差不多一样贵了。 我其实策略就是日线的, 并不需要 分钟级别数据, 我在想怎么绕开去购买分钟级别数据。 而且使用load_bar 日线级别的时候 好多合约是加载不了22根线, 那就是一个月都没有交易信号产生的机会了。 5日 10日线可能还有机会。 比这个级别 周期大一点 就困难了。

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】