策略原理

 

双均线策略作为最常见最基础的CTA策略,也就是常说的金叉死叉信号组合得到的策略,常用于捕捉一段大趋势。它的思想很简单,由一个短周期均线和一个长周期均线组成,短周期均线代表近期的走势,长周期均线则是较长时间的走势:

 

  • 当短周期均线从下往上突破长周期均线,也就意味着当前时间段具有上涨趋势,突破点也就是常说的金叉,意味着多头信号;
  • 当短周期均线从上向下突破短周期信号,则意味着当前时间段具有下降趋势,突破点也就是常说的死叉,意味着空头信号。

 

 

源码分析

 

下面就以vn.py项目中的双均线策略的源码为例,进行策略交易逻辑以及内部代码实现的解析。

 

1、创建策略实例

 

首先需要记住一点,所有vn.py框架中的CTA策略类(项目自带的或者用户开发的),都是基于CTA策略模板类(CtaTemplate)来实现的子类。策略类与模板类的关系如同抽象之于具体:照着汽车的设计图和技术规格,人类就能造出各种各样的汽车。

 

同理,CTA策略模板定义了一系列底层的交易函数和策略的逻辑范式,根据这种规则,我们可以快速实现出自己想要的策略。
 

class DoubleMaStrategy(CtaTemplate):
    author = "用Python的交易员"

    fast_window = 10
    slow_window = 20

    fast_ma0 = 0.0
    fast_ma1 = 0.0

    slow_ma0 = 0.0
    slow_ma1 = 0.0

    parameters = ["fast_window", "slow_window"]
    variables = ["fast_ma0", "fast_ma1", "slow_ma0", "slow_ma1"]

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
    """"""
        super(DoubleMaStrategy, self).__init__(
            cta_engine, strategy_name, vt_symbol, setting
        )

        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()

 

首先我们需要设置策略的参数和变量,两者都从属于策略类,不同的是策略参数是固定的(由交易员从外部指定),而策略变量则在交易的过程中随着策略的状态变化,所以策略变量一开始只需要初始化为对应的基础类型,例如:整数设为0,浮点数设为0.0,而字符串则设为""。

 

策略参数列表parameters中,需要写入策略的参数名称字符串,基于该列表中的内容,策略引擎会自动从缓存的策略配置json文件中读取策略配置,图形界面则会自动提供用户在创建策略实例时配置策略参数的对话框。

 

策略变量列表variables中,则需要写入策略的变量名称字符串,基于其中的内容,图形界面会自动渲染显示(调用put_event函数时更新),策略引擎会在用户停止策略、收到成交回报时、调用sync_data函数时,将变量数据写入硬盘中的缓存json文件,用于程序重启后策略状态的恢复。

 

策略类的构造函数init,需要传递cta_engine、strategy_name、vt_symbol、setting四个参数,分别对应CTA引擎对象、策略名称字符串、标的代码字符串、设置信息字典。注意其中的CTA引擎,可以是实盘引擎或者回测引擎,这样就可以很方便的实现一套代码同时跑回测和实盘了。以上参数均由策略引擎在使用策略类创建策略实例时自动传入,用户本质上无需关心。

 

在构造函数中,我们还创建了一个BarGenerator实例,并传入了on_bar的1分钟K线回调函数,用于实现将tick数据(TickData)自动合成为分钟级别K线数据(BarData)。除此之外,ArrayManager实例则用于缓存BarGenerator合成出来的K线数据,将其转化为便于向量化计算的时间序列数据结构,并在内部支持使用talib来计算指标。
 

 

2、状态变量初始化
 

注意这里的策略状态变量初始化,并不是指上一步中创建策略实例时的初始化函数init中的逻辑。当用户在VN Trader的CTA策略模块界面上,点击【添加策略】按钮,并在弹出的窗口中设置好策略实例名称、合约代码、策略参数,实际上是完成了策略实例的创建。

 

此时策略实例中的变量状态,依旧是0或者""这样的原始数据。用户需要点击策略管理界面上的【初始化】按钮,来调用策略中的on_init函数,完成加载历史数据回放给策略初始化其中的变量状态的操作。
 

def on_init(self):
    """
    Callback when strategy is inited.
    """
    self.write_log("策略初始化")
    self.load_bar(10)

 

从上面的代码中可以看到,用户在调用这个on_init函数后,会在CTA策略管理界面的日志组件中输出信息“策略初始化“,随后调用父类CtaTemplate提供的load_bar函数用于加载历史数据,CTA策略引擎会负责将数据推送给策略完成变量状态的初始化计算。

 

注意这里我们load_bar时,传入的参数是10,对应也就是加载10天的1分钟K线数据数据。在回测时,10天指的是10个交易日,而在实盘时,10天则是指的是自然日,因此加载的天数宁可多一些也不要太少。load_bar函数的实现如下:

 

def load_bar(
    self,
    days: int,
    interval: Interval = Interval.MINUTE,
    callback: Callable = None,
):
    """
    Load historical bar data for initializing strategy.
    """
    if not callback:
        callback = self.on_bar       #设置回调函数
    self.cta_engine.load_bar(self.vt_symbol, days, interval, callback)

 

CtaTemplate在这里调用了CtaEngine的load_bar函数来完成历史数据的加载回放。查看CtaEngine中对于load_bar函数的实现后,我们可以看到历史数据加载的两种模式:首先尝试使用RQData API从远端服务器拉取,前提是需要配置好RQData账号,同时该合约的行情数据在RQData上可以找到(主要是国内期货),若获取失败则会尝试在本地数据库中进行查找(默认为位于.vntrader文件夹下的sqlite数据库)。
 

def load_bar(
    self, 
    vt_symbol: str, 
    days: int, 
    interval: Interval,
    callback: Callable[[BarData], None]
):
    """"""
    symbol, exchange = extract_vt_symbol(vt_symbol)
    end = datetime.now()
    start = end - timedelta(days)

    # Query bars from RQData by default, if not found, load from database.
    bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
    if not bars:
        bars = database_manager.load_bar_data(
              symbol=symbol,
              exchange=exchange,
              interval=interval,
              start=start,
              end=end,
        )

    for bar in bars:
        callback(bar)

 

从上述代码中可以看出,通过datetime模块获取当前时间作为end,然后减去10天的时间作为start进行查询。将得到的所有bar数据通过第一步load_bar中设定的回调函数on_bar进行调用,这样就实现了将加载的K线数据推送给CTA策略。

 
 

3、启动自动交易

 

完成策略变量的初始化之后,就可以启动策略的自动交易功能了。点击图形界面的【启动策略】按钮后,CTA引擎会自动调用策略中的on_start函数,同时将策略的trading控制变量设置为True,界面上的日志组件中就会出现相应的策略启动日志信息。

 

def on_start(self):

    """
    Callback when strategy is started.
    """
    self.write_log("策略启动")
    self.put_event()

 

注意这里必须调用put_event函数,来通知图形界面刷新策略状态相关的显示(变量),如果不调用则界面不会更新。

 
 

4、接收Tick推送

 

启动自动交易后,CTP接口会以每0.5秒一次的频率推送Tick数据,再由VN Trader内部的事件引擎分发推送到我们的策略中,策略中的Tick数据处理函数如下:

 

def on_tick(self, tick: TickData):
    """
    Callback of new tick data update.
    """
    self.bg.update_tick(tick)

 

因为是较为简单的双均线策略,交易逻辑都在K线时间周期上执行,所以在接收到Tick数据后,通过调用策略实例所属的bg对象(BarGenerator)的update_tick,来实现Tick自动合成1分钟K线数据:

 

def update_tick(self, tick: TickData):
    """
    Update new tick data into generator.
    """
    new_minute = False

    # Filter tick data with 0 last price
    if not tick.last_price:
        return

    if not self.bar:
        new_minute = True
    elif self.bar.datetime.minute != tick.datetime.minute:
        self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
        )
        self.on_bar(self.bar)
        new_minute = True

    if new_minute:
        self.bar = BarData(
            symbol=tick.symbol,
            exchange=tick.exchange,
            interval=Interval.MINUTE,
            datetime=tick.datetime,
            gateway_name=tick.gateway_name,
            open_price=tick.last_price,
            high_price=tick.last_price,
            low_price=tick.last_price,
            close_price=tick.last_price,
            open_interest=tick.open_interest
        )
    else:
        self.bar.high_price = max(self.bar.high_price, tick.last_price)
        self.bar.low_price = min(self.bar.low_price, tick.last_price)
        self.bar.close_price = tick.last_price
        self.bar.open_interest = tick.open_interest
        self.bar.datetime = tick.datetime

    if self.last_tick:
        volume_change = tick.volume - self.last_tick.volume
        self.bar.volume += max(volume_change, 0)

    self.last_tick = tick

 

update_tick函数内部主要是通过检查当前的Tick数据与上一笔Tick数据是否是属于同一分钟,来判断是否有新的1分钟K线生成,如果没有就会继续进行累加更新当前K线的信息。

 

这里意味着只有当T+1分钟的第一个Tick接收到了之后,T分钟的Bar数据才会生成。在创建bg对象的时候,我们传入了on_bar作为K线合成完毕的回调函数,所以在当新的1分钟K线生成后,就会通过on_bar函数推送到策略中。
 
 

5、核心交易逻辑

 

每个策略中最至关重要的就是策略的核心交易逻辑:

 

  • 如果策略逻辑是基于Tick数据的,则在on_tick函数中实现相关的交易逻辑;
  • 如果策略逻辑是基于K线的,如这里我们的双均线策略,则在on_bar函数中实现相关的交易逻辑。

 

def on_bar(self, bar: BarData):
    """Callback of new bar data update."""
    am = self.am
    am.update_bar(bar)
    if not am.inited:
        return

    fast_ma = am.sma(self.fast_window, array=True)
    self.fast_ma0 = fast_ma[-1]
    self.fast_ma1 = fast_ma[-2]

    slow_ma = am.sma(self.slow_window, array=True)
    self.slow_ma0 = slow_ma[-1]
    self.slow_ma1 = slow_ma[-2]

    cross_over = self.fast_ma0 > self.slow_ma0 and self.fast_ma1 < self.slow_ma1
    cross_below = self.fast_ma0 < self.slow_ma0 and self.fast_ma1 > self.slow_ma1

    if cross_over:
        if self.pos == 0:
            self.buy(bar.close_price, 1)
        elif self.pos < 0:
          self.cover(bar.close_price, 1)
          self.buy(bar.close_price, 1)

    elif cross_below:
        if self.pos == 0:
            self.short(bar.close_price, 1)
        elif self.pos > 0:
            self.sell(bar.close_price, 1)
            self.short(bar.close_price, 1)

    self.put_event()

 

在接收到K线数据,即bar对象的推送后,我们需要将该bar数据放入am(ArrayManager)时间序列容器中进行更新,当有了至少100个bar数据后am对象才初始化完毕(inited变为True)。

 

这里需要注意,如果在初始化策略状态变量时,没有足够的历史数据来让am初始化完毕,则在自动交易启动后,需要至少收到100个的bar数据来填充am容器,直到am初始化完毕后,才会执行后面的交易逻辑代码。

 

之后调用封装在ArrayManager内部的talib库,用于计算最新窗口内的技术指标,对应我们双均线策略中的也就是10窗口的MA和20窗口的MA指标。

 

注意这里的am.sma实际上是对talib中的SMA函数的进一步封装,本质上是在计算bar数据的收盘价的算术平均:

 

    am = self.am
    am.update_bar(bar)
    if not am.inited:
        return

    fast_ma = am.sma(self.fast_window, array=True)
    self.fast_ma0 = fast_ma[-1]
    self.fast_ma1 = fast_ma[-2]

    slow_ma = am.sma(self.slow_window, array=True)
    self.slow_ma0 = slow_ma[-1]
    self.slow_ma1 = slow_ma[-2]

 

然后通过判断是否出现金叉死叉来决定是否触发交易逻辑:

 

    cross_over = self.fast_ma0 > self.slow_ma0 and self.fast_ma1 < self.slow_ma1
    cross_below = self.fast_ma0 < self.slow_ma0 and self.fast_ma1 > self.slow_ma1

 

  • 当出现了金叉:如果没有持仓时,则直接买入开仓;或者持有空头,则先平空再买入开仓。
  • 当出现了死叉:如果没有持仓时,则直接卖出开仓;或者是持有多头,则先平多再卖出开仓。

 

具体的委托指令已由CTA策略模板封装好了,在on_bar函数里面直接调用即可:

 

  • buy:买入开仓( Direction:多,Offset:开)
  • sell:卖出平仓( Direction:空,Offset:平)
  • short:卖出开仓( Direction:空,Offset:开)
  • cover:买入平仓( Direction:多,Offset:平)

 

此处需要注意,国内期货有开平仓的概念,例如买入操作要区分为买入开仓和买入平仓;但对于股票、外盘期货和绝大部分数字货币(OKEX除外)都是净持仓模式,没有开仓和平仓概念,所以只需使用买入(buy) 和卖出(sell) 这两个指令就可以了。

 

if cross_over:
    if self.pos == 0:
        self.buy(bar.close_price, 1)
    elif self.pos < 0:
        self.cover(bar.close_price, 1)
        self.buy(bar.close_price, 1)

elif cross_below:
    if self.pos == 0:
        self.short(bar.close_price, 1)
    elif self.pos > 0:
        self.sell(bar.close_price, 1)
        self.short(bar.close_price, 1)

self.put_event()

 
 

6、委托回报

 

on_order是委托回调函数,当我们发出一个交易委托后,这个委托每当有状态变化时,我们都会收到该委托最新的数据推送,这条数据就是委托回报。

 

其中比较重要信息的是status委托状态(包括:拒单、未成交、部分成交、完全成交、已撤单),我们可以基于委托状态实现更加细粒度的交易委托控制(算法交易)。

 

这里我们的双均线策略由于逻辑较为简单,所以在on_order中没有任何操作:

 

def on_order(self, order: OrderData):
    """
    Callback of new order data update.
    """
    pass

 

同样对于on_trader(成交回报函数)以及on_stop_order(停止单回报函数)也没有任何操作。

 

 

7、停止自动交易

 

当每日的交易时段结束后(国内期货一般是下午三点收盘后),需要点击CTA策略界面的【停止】按钮来停止策略的自动交易。
 

此时CTA策略引擎会将策略的交易状态变量trading设为False,撤销该策略之前发出的所有活动状态的委托,以及将策略variables列表中的参数写入到缓存json文件中,最后调用策略的on_stop回调函数执行用户定义的逻辑:

 

def on_stop(self):
    """
    Callback when strategy is stopped.
    """
    self.write_log("策略停止")
    self.put_event()

 
 

CTA交易流程梳理

 

最后,用我制作的这个思维导图,以双均线策略为例来梳理一下vn.py对于策略实现以及执行的流程:

 

description

 

《vn.py全实战进阶》课程全新上线,一共50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程,目前已经更新到第八集,详细内容请戳课程上线:《vn.py全实战进阶》!
 
了解更多知识,请关注vn.py社区公众号。
description