CTA策略模块

模块构成

CTA策略模块主要由7部分构成,如下图:

  • base:定义了CTA模块中用到的一些基础设置,如引擎类型(回测/实盘)、回测模式(K线/Tick)、本地停止单的定义以及停止单状态(等待中/已撤销/已触发)。

  • template:定义了CTA策略模板(包含信号生成和委托管理)、CTA信号(仅负责信号生成)、目标仓位算法(仅负责委托管理,适用于拆分巨型委托,降低冲击成本)。

  • strategies: 官方提供的cta策略示例,包含从最基础的双均线策略,到通道突破类型的布林带策略,到跨时间周期策略,再到把信号生成和委托管理独立开来的多信号策略。(用户自定义的策略也可以放在strategies文件夹内运行)

  • backesting:包含回测引擎和参数优化。其中回测引擎定义了数据载入、委托撮合机制、计算与统计相关盈利指标、结果绘图等函数。

  • converter:定义了针对上期所品种平今/平昨模式的委托转换模块;对于其他品种用户也可以通过可选参数lock切换至锁仓模式。

  • engine:定义了CTA策略实盘引擎,其中包括:RQData客户端初始化和数据载入、策略的初始化和启动、推送Tick订阅行情到策略中、挂撤单操作、策略的停止和移除等。

  • ui:基于PyQt5的GUI图形应用。

enter image title here

 

数据加载

在实盘中,RQData通过实时载入数据进行策略的初始化。该功能主要在CTA实盘引擎engine.py内实现。 下面介绍具体流程:

  • 在菜单栏点击“配置”,进入全局配置页面输入RQData账号密码;或者直接配置json文件,即在用户目录下.vntrader文件夹找到vt_setting.json,如图。

enter image title here

  • 初始化RQData客户端:从vt_setting.json中读取RQData的账户、密码到rq_client.init()函数进行初始化

    def init_rqdata(self):
        """
        Init RQData client.
        """
        username = SETTINGS["rqdata.username"]
        password = SETTINGS["rqdata.password"]
        if not username or not password:
            return

        import rqdatac
        
        self.rq_client = rqdatac
        self.rq_client.init(username, password,
                            ('rqdatad-pro.ricequant.com', 16011))
  • RQData载入实盘数据:输入vt_symbol后,首先会转换成符合RQData格式的rq_symbol,通过get_price()函数下载数据,并且插入到数据库中。

    def query_bar_from_rq(
        self, vt_symbol: str, interval: Interval, start: datetime, end: datetime
    ):
        """
        Query bar data from RQData.
        """
        symbol, exchange_str = vt_symbol.split(".")
        rq_symbol = to_rq_symbol(vt_symbol)
        if rq_symbol not in self.rq_symbols:
            return None
        
        end += timedelta(1)     # For querying night trading period data

        df = self.rq_client.get_price(
            rq_symbol,
            frequency=interval.value,
            fields=["open", "high", "low", "close", "volume"],
            start_date=start,
            end_date=end
        )

        data = []
        for ix, row in df.iterrows():
            bar = BarData(
                symbol=symbol,
                exchange=Exchange(exchange_str),
                interval=interval,
                datetime=row.name.to_pydatetime(),
                open_price=row["open"],
                high_price=row["high"],
                low_price=row["low"],
                close_price=row["close"],
                volume=row["volume"],
                gateway_name="RQ"
            )
            data.append(bar)

 

策略开发

CTA策略模板提供完整的信号生成和委托管理功能,用户可以基于该模板自行开发策略。新策略可以放在用户运行的文件内(推荐),如在c:\users\administrator.vntrader目录下创建strategies文件夹;可以放在根目录下vnpy\app\cta_strategy\strategies文件夹内。 注意:策略文件命名是以下划线模式,如boll_channel_strategy.py;而策略类命名采用的是驼峰式,如BollChannelStrategy。

下面通过BollChannelStrategy策略示例,来展示策略开发的具体步骤:

参数设置

定义策略参数并且初始化策略变量。策略参数为策略类的公有属性,用户可以通过创建新的实例来调用或者改变策略参数。

如针对rb1905品种,用户可以创建基于BollChannelStrategy的策略示例,如RB_BollChannelStrategy,boll_window可以由18改成30。

创建策略实例的方法有效地实现了一个策略跑多个品种,并且其策略参数可以通过品种的特征进行调整。

    boll_window = 18
    boll_dev = 3.4
    cci_window = 10
    atr_window = 30
    sl_multiplier = 5.2
    fixed_size = 1

    boll_up = 0
    boll_down = 0
    cci_value = 0
    atr_value = 0

    intra_trade_high = 0
    intra_trade_low = 0
    long_stop = 0
    short_stop = 0

类的初始化

初始化分3步:

  • 通过super( )的方法继承CTA策略模板,在__init__( )函数传入CTA引擎、策略名称、vt_symbol、参数设置。

  • 调用K线生成模块:通过时间切片来把Tick数据合成1分钟K线数据,然后更大的时间周期数据,如15分钟K线。

  • 调用K线时间序列管理模块:基于K线数据,如1分钟、15分钟,来生成相应的技术指标。

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(BollChannelStrategy, self).__init__(
            cta_engine, strategy_name, vt_symbol, setting
        )

        self.bg = BarGenerator(self.on_bar, 15, self.on_15min_bar)
        self.am = ArrayManager()

策略的初始化、启动、停止

通过“CTA策略”组件的相关功能按钮实现。

注意:函数load_bar(10),代表策略初始化需要载入10个交易日的历史数据。该历史数据可以是Tick数据,也可以是K线数据。在策略初始化时候,会调用K线时间序列管理器计算并缓存相关的计算指标,但是并不触发交易。

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.load_bar(10)

    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")

    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")

Tick数据回报

策略订阅某品种合约行情,交易所会推送Tick数据到该策略上。

由于BollChannelStrategy是基于15分钟K线来生成交易信号的,故收到Tick数据后,需要用到K线生成模块里面的update_tick函数,通过时间切片的方法,聚合成1分钟K线数据,并且推送到on_bar函数。

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)

K线数据回报

收到推送过来的1分钟K线数据后,通过K线生成模块里面的update_bar函数,以分钟切片的方法,合成15分钟K线数据,并且推送到on_15min_bar函数。

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.bg.update_bar(bar)

15分钟K线数据回报

负责CTA信号的生成,由3部分组成:

  • 清空未成交委托:为了防止之前下的单子在上一个15分钟没有成交,但是下一个15分钟可能已经调整了价格,就用cancel_all()方法立刻撤销之前未成交的所有委托,保证策略在当前这15分钟开始时的整个状态是清晰和唯一的。

  • 调用K线时间序列管理模块:基于最新的15分钟K线数据来计算相应计算指标,如布林带通道上下轨、CCI指标、ATR指标

  • 信号计算:通过持仓的判断以及结合CCI指标、布林带通道、ATR指标在通道突破点挂出停止单委托(buy/sell),同时设置离场点(short/cover)。

注意:CTA策略具有低胜率和高盈亏比的特定:在难以提升胜率的情况下,研究提高策略盈亏比有利于策略盈利水平的上升。

    def on_15min_bar(self, bar: BarData):
        """"""
        self.cancel_all()

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        self.boll_up, self.boll_down = am.boll(self.boll_window, self.boll_dev)
        self.cci_value = am.cci(self.cci_window)
        self.atr_value = am.atr(self.atr_window)

        if self.pos == 0:
            self.intra_trade_high = bar.high_price
            self.intra_trade_low = bar.low_price

            if self.cci_value > 0:
                self.buy(self.boll_up, self.fixed_size, True)
            elif self.cci_value < 0:
                self.short(self.boll_down, self.fixed_size, True)

        elif self.pos > 0:
            self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
            self.intra_trade_low = bar.low_price

            self.long_stop = self.intra_trade_high - self.atr_value * self.sl_multiplier
            self.sell(self.long_stop, abs(self.pos), True)

        elif self.pos < 0:
            self.intra_trade_high = bar.high_price
            self.intra_trade_low = min(self.intra_trade_low, bar.low_price)

            self.short_stop = self.intra_trade_low + self.atr_value * self.sl_multiplier
            self.cover(self.short_stop, abs(self.pos), True)

        self.put_event()

委托回报、成交回报、停止单回报

在策略中可以直接pass,其具体逻辑应用交给回测/实盘引擎负责。

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        pass

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.put_event()

    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        pass

 

回测研究

backtesting.py定义了回测引擎,下面主要介绍相关功能函数,以及回测引擎应用示例:

加载策略

把CTA策略逻辑,对应合约品种,以及参数设置(可在策略文件外修改)载入到回测引擎中。

    def add_strategy(self, strategy_class: type, setting: dict):
        """"""
        self.strategy_class = strategy_class
        self.strategy = strategy_class(
            self, strategy_class.__name__, self.vt_symbol, setting
        )

 

载入历史数据

负责载入对应品种的历史数据,大概有4个步骤:

  • 根据数据类型不同,分成K线模式和Tick模式;

  • 通过select().where()方法,有条件地从数据库中选取数据,其筛选标准包括:vt_symbol、 回测开始日期、回测结束日期、K线周期(K线模式下);

  • order_by(DbBarData.datetime)表示需要按照时间顺序载入数据;

  • 载入数据是以迭代方式进行的,数据最终存入self.history_data。

    def load_data(self):
        """"""
        self.output("开始加载历史数据")

        if self.mode == BacktestingMode.BAR:
            s = (
                DbBarData.select()
                .where(
                    (DbBarData.vt_symbol == self.vt_symbol) 
                    & (DbBarData.interval == self.interval) 
                    & (DbBarData.datetime >= self.start) 
                    & (DbBarData.datetime <= self.end)
                )
                .order_by(DbBarData.datetime)
            )
            self.history_data = [db_bar.to_bar() for db_bar in s]
        else:
            s = (
                DbTickData.select()
                .where(
                    (DbTickData.vt_symbol == self.vt_symbol) 
                    & (DbTickData.datetime >= self.start) 
                    & (DbTickData.datetime <= self.end)
                )
                .order_by(DbTickData.datetime)
            )
            self.history_data = [db_tick.to_tick() for db_tick in s]

        self.output(f"历史数据加载完成,数据量:{len(self.history_data)}")

 

撮合成交

载入CTA策略以及相关历史数据后,策略会根据最新的数据来计算相关指标。若符合条件会生成交易信号,发出具体委托(buy/sell/short/cover),并且在下一根K线成交。

根据委托类型的不同,回测引擎提供2种撮合成交机制来尽量模仿真实交易环节:

  • 限价单撮合成交:(以买入方向为例)先确定是否发生成交,成交标准为委托价>= 下一根K线的最低价;然后确定成交价格,成交价格为委托价与下一根K线开盘价的最小值。

  • 停止单撮合成交:(以买入方向为例)先确定是否发生成交,成交标准为委托价<= 下一根K线的最高价;然后确定成交价格,成交价格为委托价与下一根K线开盘价的最大值。

 

下面展示在引擎中限价单撮合成交的流程:

  • 确定会撮合成交的价格;

  • 遍历限价单字典中的所有限价单,推送委托进入未成交队列的更新状态;

  • 判断成交状态,若出现成交,推送成交数据和委托数据;

  • 从字典中删除已成交的限价单。

    def cross_limit_order(self):
        """
        Cross limit order with last bar/tick data.
        """
        if self.mode == BacktestingMode.BAR:
            long_cross_price = self.bar.low_price
            short_cross_price = self.bar.high_price
            long_best_price = self.bar.open_price
            short_best_price = self.bar.open_price
        else:
            long_cross_price = self.tick.ask_price_1
            short_cross_price = self.tick.bid_price_1
            long_best_price = long_cross_price
            short_best_price = short_cross_price

        for order in list(self.active_limit_orders.values()):
            # Push order update with status "not traded" (pending)
            if order.status == Status.SUBMITTING:
                order.status = Status.NOTTRADED
                self.strategy.on_order(order)

            # Check whether limit orders can be filled.
            long_cross = (
                order.direction == Direction.LONG 
                and order.price >= long_cross_price 
                and long_cross_price > 0
            )

            short_cross = (
                order.direction == Direction.SHORT 
                and order.price <= short_cross_price 
                and short_cross_price > 0
            )

            if not long_cross and not short_cross:
                continue

            # Push order udpate with status "all traded" (filled).
            order.traded = order.volume
            order.status = Status.ALLTRADED
            self.strategy.on_order(order)

            self.active_limit_orders.pop(order.vt_orderid)

            # Push trade update
            self.trade_count += 1

            if long_cross:
                trade_price = min(order.price, long_best_price)
                pos_change = order.volume
            else:
                trade_price = max(order.price, short_best_price)
                pos_change = -order.volume

            trade = TradeData(
                symbol=order.symbol,
                exchange=order.exchange,
                orderid=order.orderid,
                tradeid=str(self.trade_count),
                direction=order.direction,
                offset=order.offset,
                price=trade_price,
                volume=order.volume,
                time=self.datetime.strftime("%H:%M:%S"),
                gateway_name=self.gateway_name,
            )
            trade.datetime = self.datetime

            self.strategy.pos += pos_change
            self.strategy.on_trade(trade)

            self.trades[trade.vt_tradeid] = trade

 

计算策略盈亏情况

基于收盘价、当日持仓量、合约规模、滑点、手续费率等计算总盈亏与净盈亏,并且其计算结果以DataFrame格式输出,完成基于逐日盯市盈亏统计。

下面展示盈亏情况的计算过程

  • 浮动盈亏 = 持仓量 *(当日收盘价 - 昨日收盘价)* 合约规模

  • 实际盈亏 = 持仓变化量 * (当时收盘价 - 开仓成交价)* 合约规模

  • 总盈亏 = 浮动盈亏 + 实际盈亏

  • 净盈亏 = 总盈亏 - 总手续费 - 总滑点

    def calculate_pnl(
        self,
        pre_close: float,
        start_pos: float,
        size: int,
        rate: float,
        slippage: float,
    ):
        """"""
        self.pre_close = pre_close

        # Holding pnl is the pnl from holding position at day start
        self.start_pos = start_pos
        self.end_pos = start_pos
        self.holding_pnl = self.start_pos * \
            (self.close_price - self.pre_close) * size

        # Trading pnl is the pnl from new trade during the day
        self.trade_count = len(self.trades)

        for trade in self.trades:
            if trade.direction == Direction.LONG:
                pos_change = trade.volume
            else:
                pos_change = -trade.volume

            turnover = trade.price * trade.volume * size

            self.trading_pnl += pos_change * \
                (self.close_price - trade.price) * size
            self.end_pos += pos_change
            self.turnover += turnover
            self.commission += turnover * rate
            self.slippage += trade.volume * size * slippage

        # Net pnl takes account of commission and slippage cost
        self.total_pnl = self.trading_pnl + self.holding_pnl
        self.net_pnl = self.total_pnl - self.commission - self.slippage

 

计算策略统计指标

calculate_statistics函数是基于逐日盯市盈亏情况(DateFrame格式)来计算衍生指标,如最大回撤、年化收益、盈亏比、夏普比率等。

            df["balance"] = df["net_pnl"].cumsum() + self.capital
            df["return"] = np.log(df["balance"] / df["balance"].shift(1)).fillna(0)
            df["highlevel"] = (
                df["balance"].rolling(
                    min_periods=1, window=len(df), center=False).max()
            )
            df["drawdown"] = df["balance"] - df["highlevel"]
            df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

            # Calculate statistics value
            start_date = df.index[0]
            end_date = df.index[-1]

            total_days = len(df)
            profit_days = len(df[df["net_pnl"] > 0])
            loss_days = len(df[df["net_pnl"] < 0])

            end_balance = df["balance"].iloc[-1]
            max_drawdown = df["drawdown"].min()
            max_ddpercent = df["ddpercent"].min()

            total_net_pnl = df["net_pnl"].sum()
            daily_net_pnl = total_net_pnl / total_days

            total_commission = df["commission"].sum()
            daily_commission = total_commission / total_days

            total_slippage = df["slippage"].sum()
            daily_slippage = total_slippage / total_days

            total_turnover = df["turnover"].sum()
            daily_turnover = total_turnover / total_days

            total_trade_count = df["trade_count"].sum()
            daily_trade_count = total_trade_count / total_days

            total_return = (end_balance / self.capital - 1) * 100
            annual_return = total_return / total_days * 240
            daily_return = df["return"].mean() * 100
            return_std = df["return"].std() * 100

            if return_std:
                sharpe_ratio = daily_return / return_std * np.sqrt(240)
            else:
                sharpe_ratio = 0

 

统计指标绘图

通过matplotlib绘制4幅图:

  • 资金曲线图

  • 资金回撤图

  • 每日盈亏图

  • 每日盈亏分布图

    def show_chart(self, df: DataFrame = None):
        """"""
        if not df:
            df = self.daily_df
        
        if df is None:
            return

        plt.figure(figsize=(10, 16))

        balance_plot = plt.subplot(4, 1, 1)
        balance_plot.set_title("Balance")
        df["balance"].plot(legend=True)

        drawdown_plot = plt.subplot(4, 1, 2)
        drawdown_plot.set_title("Drawdown")
        drawdown_plot.fill_between(range(len(df)), df["drawdown"].values)

        pnl_plot = plt.subplot(4, 1, 3)
        pnl_plot.set_title("Daily Pnl")
        df["net_pnl"].plot(kind="bar", legend=False, grid=False, xticks=[])

        distribution_plot = plt.subplot(4, 1, 4)
        distribution_plot.set_title("Daily Pnl Distribution")
        df["net_pnl"].hist(bins=50)

        plt.show()

 

单策略回测示例

  • 导入回测引擎和CTA策略

  • 设置回测相关参数,如:品种、K线周期、回测开始和结束日期、手续费、滑点、合约规模、起始资金

  • 载入策略和数据到引擎中,运行回测。

  • 计算基于逐日统计盈利情况,计算统计指标,统计指标绘图。

from vnpy.app.cta_strategy.backtesting import BacktestingEngine
from vnpy.app.cta_strategy.strategies.boll_channel_strategy import (
    BollChannelStrategy,
)
from datetime import datetime

engine = BacktestingEngine()
engine.set_parameters(
    vt_symbol="IF88.CFFEX",
    interval="1m",
    start=datetime(2018, 1, 1),
    end=datetime(2019, 1, 1),
    rate=3.0/10000,
    slippage=0.2,
    size=300,
    pricetick=0.2,
    capital=1_000_000,
)

engine.add_strategy(AtrRsiStrategy, {})
engine.load_data()
engine.run_backtesting()
df = engine.calculate_result()
engine.calculate_statistics()
engine.show_chart()

 

投资组合回测示例

投资组合回测是基于单策略回测的,其关键是每个策略都对应着各自的BacktestingEngine对象,下面介绍具体流程:

  • 创建回测函数run_backtesting(),这样每添加一个策略就创建其BacktestingEngine对象。

from vnpy.app.cta_strategy.backtesting import BacktestingEngine, OptimizationSetting
from vnpy.app.cta_strategy.strategies.atr_rsi_strategy import AtrRsiStrategy
from vnpy.app.cta_strategy.strategies.boll_channel_strategy import BollChannelStrategy
from datetime import datetime

def run_backtesting(strategy_class, setting, vt_symbol, interval, start, end, rate, slippage, size, pricetick, capital):
    engine = BacktestingEngine()
    engine.set_parameters(
        vt_symbol=vt_symbol,
        interval=interval,
        start=start,
        end=end,
        rate=rate,
        slippage=slippage,
        size=size,
        pricetick=pricetick,
        capital=capital    
    )
    engine.add_strategy(strategy_class, setting)
    engine.load_data()
    engine.run_backtesting()
    df = engine.calculate_result()
    return df

 

  • 分别进行单策略回测,得到各自的DataFrame,(该DataFrame包含交易时间、今仓、昨仓、手续费、滑点、当日净盈亏、累计净盈亏等基本信息,但是不包括最大回撤,夏普比率等统计信息),然后把DataFrame相加并且去除空值后即得到投资组合的DataFrame。

df1 = run_backtesting(
    strategy_class=AtrRsiStrategy, 
    setting={}, 
    vt_symbol="IF88.CFFEX",
    interval="1m", 
    start=datetime(2019, 1, 1), 
    end=datetime(2019, 4, 30),
    rate=0.3/10000,
    slippage=0.2,
    size=300,
    pricetick=0.2,
    capital=1_000_000,
    )

df2 = run_backtesting(
    strategy_class=BollChannelStrategy, 
    setting={'fixed_size': 16}, 
    vt_symbol="RB88.SHFE",
    interval="1m", 
    start=datetime(2019, 1, 1), 
    end=datetime(2019, 4, 30),
    rate=1/10000,
    slippage=1,
    size=10,
    pricetick=1,
    capital=1_000_000,
    )

dfp = df1 + df2
dfp =dfp.dropna() 

 

  • 创建show_portafolio()函数,同样也是创建新的BacktestingEngine对象,对传入的DataFrame计算如夏普比率等统计指标,并且画图。故该函数不仅能显示单策略回测效果,也能展示投资组合回测效果。

def show_portafolio(df):
    engine = BacktestingEngine()
    engine.calculate_statistics(df)
    engine.show_chart(df)

show_portafolio(dfp)

 

参数优化

参数优化模块主要由3部分构成:

参数设置

  • 设置参数优化区间:如boll_window设置起始值为18,终止值为24,步进为2,这样就得到了[18, 20, 22, 24] 这4个待优化的参数了。

  • 设置优化目标字段:如夏普比率、盈亏比、总收益率等。

  • 随机生成参数对组合:使用迭代工具产生参数对组合,然后把参数对组合打包到一个个字典组成的列表中

class OptimizationSetting:
    """
    Setting for runnning optimization.
    """

    def __init__(self):
        """"""
        self.params = {}
        self.target_name = ""

    def add_parameter(
        self, name: str, start: float, end: float = None, step: float = None
    ):
        """"""
        if not end and not step:
            self.params[name] = [start]
            return

        if start >= end:
            print("参数优化起始点必须小于终止点")
            return

        if step <= 0:
            print("参数优化步进必须大于0")
            return

        value = start
        value_list = []

        while value <= end:
            value_list.append(value)
            value += step

        self.params[name] = value_list

    def set_target(self, target_name: str):
        """"""
        self.target_name = target_name

    def generate_setting(self):
        """"""
        keys = self.params.keys()
        values = self.params.values()
        products = list(product(*values))

        settings = []
        for p in products:
            setting = dict(zip(keys, p))
            settings.append(setting)

        return settings

 

参数对组合回测

多进程优化时,每个进程都会运行optimize函数,输出参数对组合以及目标优化字段的结果。其步骤如下:

  • 调用回测引擎

  • 输入回测相关设置

  • 输入参数对组合到策略中

  • 运行回测

  • 返回回测结果,包括:参数对组合、目标优化字段数值、策略统计指标

def optimize(
    target_name: str,
    strategy_class: CtaTemplate,
    setting: dict,
    vt_symbol: str,
    interval: Interval,
    start: datetime,
    rate: float,
    slippage: float,
    size: float,
    pricetick: float,
    capital: int,
    end: datetime,
    mode: BacktestingMode,
):
    """
    Function for running in multiprocessing.pool
    """
    engine = BacktestingEngine()
    engine.set_parameters(
        vt_symbol=vt_symbol,
        interval=interval,
        start=start,
        rate=rate,
        slippage=slippage,
        size=size,
        pricetick=pricetick,
        capital=capital,
        end=end,
        mode=mode
    )

    engine.add_strategy(strategy_class, setting)
    engine.load_data()
    engine.run_backtesting()
    engine.calculate_result()
    statistics = engine.calculate_statistics()

    target_value = statistics[target_name]
    return (str(setting), target_value, statistics)

 

多进程优化

  • 根据CPU的核数来创建进程:若CPU为4核,则创建4个进程

  • 在每个进程都调用apply_async( )的方法运行参数对组合回测,其回测结果添加到results中 (apply_async是异步非阻塞的,即不用等待当前进程执行完毕,随时根据系统调度来进行进程切换。)

  • pool.close()与pool.join()用于进程跑完任务后,去关闭进程。

  • 对results的内容通过目标优化字段标准进行排序,输出结果。

        pool = multiprocessing.Pool(multiprocessing.cpu_count())

        results = []
        for setting in settings:
            result = (pool.apply_async(optimize, (
                target_name,
                self.strategy_class,
                setting,
                self.vt_symbol,
                self.interval,
                self.start,
                self.rate,
                self.slippage,
                self.size,
                self.pricetick,
                self.capital,
                self.end,
                self.mode
            )))
            results.append(result)

        pool.close()
        pool.join()

        # Sort results and output
        result_values = [result.get() for result in results]
        result_values.sort(reverse=True, key=lambda result: result[1])

        for value in result_values:
            msg = f"参数:{value[0]}, 目标:{value[1]}"
            self.output(msg)

        return result_values

 

实盘运行

在实盘环境,用户可以基于编写好的CTA策略来创建新的实例,一键初始化、启动、停止策略。

创建策略实例

用户可以基于编写好的CTA策略来创建新的实例,策略实例的好处在于同一个策略可以同时去运行多个品种合约,并且每个实例的参数可以是不同的。 在创建实例的时候需要填写如图的实例名称、合约品种、参数设置。注意:实例名称不能重名;合约名称是vt_symbol的格式,如IF1905.CFFEX。

https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/cta_strategy/add_strategy.png

创建策略流程如下:

  • 检查策略实例重名

  • 添加策略配置信息(strategy_name, vt_symbol, setting)到strategies字典上

  • 添加该策略要订阅行情的合约信息到symbol_strategy_map字典中;

  • 把策略配置信息保存到json文件内;

  • 在图形化界面更新状态信息。

    def add_strategy(
        self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
    ):
        """
        Add a new strategy.
        """
        if strategy_name in self.strategies:
            self.write_log(f"创建策略失败,存在重名{strategy_name}")
            return

        strategy_class = self.classes[class_name]

        strategy = strategy_class(self, strategy_name, vt_symbol, setting)
        self.strategies[strategy_name] = strategy

        # Add vt_symbol to strategy map.
        strategies = self.symbol_strategy_map[vt_symbol]
        strategies.append(strategy)

        # Update to setting file.
        self.update_strategy_setting(strategy_name, setting)

        self.put_strategy_event(strategy)

 

初始化策略

  • 调用策略类的on_init()回调函数,并且载入历史数据;

  • 恢复上次退出之前的策略状态;

  • 从.vntrader/cta_strategy_data.json内读取策略参数,最新的技术指标,以及持仓数量;

  • 调用接口的subcribe()函数订阅指定行情信息;

  • 策略初始化状态变成True,并且更新到日志上。

    def _init_strategy(self):
        """
        Init strategies in queue.
        """
        while not self.init_queue.empty():
            strategy_name = self.init_queue.get()
            strategy = self.strategies[strategy_name]

            if strategy.inited:
                self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
                continue

            self.write_log(f"{strategy_name}开始执行初始化")

            # Call on_init function of strategy
            self.call_strategy_func(strategy, strategy.on_init)

            # Restore strategy data(variables)
            data = self.strategy_data.get(strategy_name, None)
            if data:
                for name in strategy.variables:
                    value = data.get(name, None)
                    if value:
                        setattr(strategy, name, value)

            # Subscribe market data
            contract = self.main_engine.get_contract(strategy.vt_symbol)
            if contract:
                req = SubscribeRequest(
                    symbol=contract.symbol, exchange=contract.exchange)
                self.main_engine.subscribe(req, contract.gateway_name)
            else:
                self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)

            # Put event to update init completed status.
            strategy.inited = True
            self.put_strategy_event(strategy)
            self.write_log(f"{strategy_name}初始化完成")
        
        self.init_thread = None

 

启动策略

  • 检查策略初始化状态;

  • 检查策略启动状态,避免重复启动;

  • 调用策略类的on_start()函数启动策略;

  • 策略启动状态变成True,并且更新到图形化界面上。

    def start_strategy(self, strategy_name: str):
        """
        Start a strategy.
        """
        strategy = self.strategies[strategy_name]
        if not strategy.inited:
            self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
            return

        if strategy.trading:
            self.write_log(f"{strategy_name}已经启动,请勿重复操作")
            return

        self.call_strategy_func(strategy, strategy.on_start)
        strategy.trading = True

        self.put_strategy_event(strategy)

 

停止策略

  • 检查策略启动状态;

  • 调用策略类的on_stop()函数停止策略;

  • 更新策略启动状态为False;

  • 对所有为成交的委托(市价单/限价单/本地停止单)进行撤单操作;

  • 把策略参数,最新的技术指标,以及持仓数量保存到.vntrader/cta_strategy_data.json内;

  • 在图形化界面更新策略状态。

    def stop_strategy(self, strategy_name: str):
        """
        Stop a strategy.
        """
        strategy = self.strategies[strategy_name]
        if not strategy.trading:
            return

        # Call on_stop function of the strategy
        self.call_strategy_func(strategy, strategy.on_stop)

        # Change trading status of strategy to False
        strategy.trading = False

        # Cancel all orders of the strategy
        self.cancel_all(strategy)

        # Sync strategy variables to data file
        self.sync_strategy_data(strategy)

        # Update GUI
        self.put_strategy_event(strategy)

 

编辑策略

  • 重新配置策略参数字典setting;

  • 更新参数字典到策略中;

  • 在图像化界面更新策略状态。

    def edit_strategy(self, strategy_name: str, setting: dict):
        """
        Edit parameters of a strategy.
        """
        strategy = self.strategies[strategy_name]
        strategy.update_setting(setting)

        self.update_strategy_setting(strategy_name, setting)
        self.put_strategy_event(strategy)

 

移除策略

  • 检查策略状态,只有停止策略后从可以移除策略;

  • 从json文件移除策略配置信息(strategy_name, vt_symbol, setting);

  • 从symbol_strategy_map字典中移除该策略订阅的合约信息;

  • 从strategy_orderid_map字典移除活动委托记录;

  • 从strategies字典移除该策略的相关配置信息。

    def remove_strategy(self, strategy_name: str):
        """
        Remove a strategy.
        """
        strategy = self.strategies[strategy_name]
        if strategy.trading:
            self.write_log(f"策略{strategy.strategy_name}移除失败,请先停止")
            return

        # Remove setting
        self.remove_strategy_setting(strategy_name)

        # Remove from symbol strategy map
        strategies = self.symbol_strategy_map[strategy.vt_symbol]
        strategies.remove(strategy)

        # Remove from active orderid map
        if strategy_name in self.strategy_orderid_map:
            vt_orderids = self.strategy_orderid_map.pop(strategy_name)

            # Remove vt_orderid strategy map
            for vt_orderid in vt_orderids:
                if vt_orderid in self.orderid_strategy_map:
                    self.orderid_strategy_map.pop(vt_orderid)

        # Remove from strategies
        self.strategies.pop(strategy_name)

        return True

 

锁仓操作

用户在编写策略时,可以通过填写lock字段来让策略完成锁仓操作,即禁止平今,通过反向开仓来代替。

  • 在cta策略模板template中,可以看到如下具体委托函数都有lock字段,并且默认为False。

    def buy(self, price: float, volume: float, stop: bool = False, lock: bool = False):
        """
        Send buy order to open a long position.
        """
        return self.send_order(Direction.LONG, Offset.OPEN, price, volume, stop, lock)

    def sell(self, price: float, volume: float, stop: bool = False, lock: bool = False):
        """
        Send sell order to close a long position.
        """
        return self.send_order(Direction.SHORT, Offset.CLOSE, price, volume, stop, lock)

    def short(self, price: float, volume: float, stop: bool = False, lock: bool = False):
        """
        Send short order to open as short position.
        """
        return self.send_order(Direction.SHORT, Offset.OPEN, price, volume, stop, lock)

    def cover(self, price: float, volume: float, stop: bool = False, lock: bool = False):
        """
        Send cover order to close a short position.
        """
        return self.send_order(Direction.LONG, Offset.CLOSE, price, volume, stop, lock)

    def send_order(
        self,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False
    ):
        """
        Send a new order.
        """
        if self.trading:
            vt_orderids = self.cta_engine.send_order(
                self, direction, offset, price, volume, stop, lock
            )
            return vt_orderids
        else:
            return []

 

  • 设置lock=True后,cta实盘引擎send_order()函数发生响应,并且调用其最根本的委托函数send_server_order()去处理锁仓委托转换。首先是创建原始委托original_req,然后调用converter文件里面OffsetConverter类的convert_order_request来进行相关转换。

    def send_order(
        self,
        strategy: CtaTemplate,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool,
        lock: bool
    ):
        """
        """
        contract = self.main_engine.get_contract(strategy.vt_symbol)
        if not contract:
            self.write_log(f"委托失败,找不到合约:{strategy.vt_symbol}", strategy)
            return ""

        if stop:
            if contract.stop_supported:
                return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock)
            else:
                return self.send_local_stop_order(strategy, direction, offset, price, volume, lock)
        else:
            return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock)

    def send_limit_order(
        self,
        strategy: CtaTemplate,
        contract: ContractData,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        lock: bool
    ):
        """
        Send a limit order to server.
        """
        return self.send_server_order(
            strategy,
            contract,
            direction,
            offset,
            price,
            volume,
            OrderType.LIMIT,
            lock
        )

    def send_server_order(
        self,
        strategy: CtaTemplate,
        contract: ContractData,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        type: OrderType,
        lock: bool
    ):
        """
        Send a new order to server.
        """
        # Create request and send order.
        original_req = OrderRequest(
            symbol=contract.symbol,
            exchange=contract.exchange,
            direction=direction,
            offset=offset,
            type=type,
            price=price,
            volume=volume,
        )

        # Convert with offset converter
        req_list = self.offset_converter.convert_order_request(original_req, lock)

        # Send Orders
        vt_orderids = []

        for req in req_list:
            vt_orderid = self.main_engine.send_order(
                req, contract.gateway_name)
            vt_orderids.append(vt_orderid)

            self.offset_converter.update_order_request(req, vt_orderid)
            
            # Save relationship between orderid and strategy.
            self.orderid_strategy_map[vt_orderid] = strategy
            self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid)

        return vt_orderids        

 

  • 在convert_order_request_lock()函数中,先计算今仓的量和昨可用量;然后进行判断:若有今仓,只能开仓(锁仓);无今仓时候,若平仓量小于等于昨可用,全部平昨,反之,先平昨,剩下的反向开仓。

    def convert_order_request_lock(self, req: OrderRequest):
        """"""
        if req.direction == Direction.LONG:
            td_volume = self.short_td
            yd_available = self.short_yd - self.short_yd_frozen
        else:
            td_volume = self.long_td
            yd_available = self.long_yd - self.long_yd_frozen

        # If there is td_volume, we can only lock position
        if td_volume:
            req_open = copy(req)
            req_open.offset = Offset.OPEN
            return [req_open]
        # If no td_volume, we close opposite yd position first
        # then open new position
        else:
            open_volume = max(0,  req.volume - yd_available)
            req_list = []

            if yd_available:
                req_yd = copy(req)
                if self.exchange == Exchange.SHFE:
                    req_yd.offset = Offset.CLOSEYESTERDAY
                else:
                    req_yd.offset = Offset.CLOSE
                req_list.append(req_yd)

            if open_volume:
                req_open = copy(req)
                req_open.offset = Offset.OPEN
                req_open.volume = open_volume
                req_list.append(req_open)

            return req_list