VeighNa量化社区
你的开源社区量化交易平台

置顶主题

CTA回测:增加对账户净值/净值回撤/每日盈亏图表的交叉显示

如题,为了方便CTA回测显示,效果如下:

description

description

涉及代码修改:
vnpy/app/cta_strategy/ui/widget.py
将原文件中的BacktesterChart类用以下代码替代即可

class BacktesterChart(pg.GraphicsWindow):
""""""

def __init__(self):
    """"""
    super().__init__(title="Backtester Chart")

    self.dates = {}

    self.init_ui()

def init_ui(self):
    """"""
    pg.setConfigOptions(antialias=True)

    # Create plot widgets
    self.balance_plot = self.addPlot(
        title="账户净值",
        axisItems={"bottom": DateAxis(self.dates, orientation="bottom")}
    )
    self.region_size = [0,100]
    self.nextRow()

    self.drawdown_plot = self.addPlot(
        title="净值回撤",
        axisItems={"bottom": DateAxis(self.dates, orientation="bottom")}
    )

    self.nextRow()

    self.pnl_plot = self.addPlot(
        title="每日盈亏",
        axisItems={"bottom": DateAxis(self.dates, orientation="bottom")}
    )
    self.nextRow()

    self.distribution_plot = self.addPlot(title="盈亏分布")

    # Add curves and bars on plot widgets
    self.balance_curve = self.balance_plot.plot(
        pen=pg.mkPen("#ffc107", width=3)
    )

    dd_color = "#303f9f"
    self.drawdown_curve = self.drawdown_plot.plot(
        fillLevel=-0.3, brush=dd_color, pen=dd_color
    )

    profit_color = 'r'
    loss_color = 'g'
    self.profit_pnl_bar = pg.BarGraphItem(
        x=[], height=[], width=0.3, brush=profit_color, pen=profit_color
    )
    self.loss_pnl_bar = pg.BarGraphItem(
        x=[], height=[], width=0.3, brush=loss_color, pen=loss_color
    )
    self.pnl_plot.addItem(self.profit_pnl_bar)
    self.pnl_plot.addItem(self.loss_pnl_bar)

    distribution_color = "#6d4c41"
    self.distribution_curve = self.distribution_plot.plot(
        fillLevel=-0.3, brush=distribution_color, pen=distribution_color
    )

def clear_data(self):
    """"""
    self.balance_plot.replot()
    self.drawdown_plot.replot()
    self.pnl_plot.replot()
    self.balance_curve.setData([], [])
    self.drawdown_curve.setData([], [])
    self.profit_pnl_bar.setOpts(x=[], height=[])
    self.loss_pnl_bar.setOpts(x=[], height=[])
    self.distribution_curve.setData([], [])

def set_data(self, df):
    """"""
    if df is None:
        return

    count = len(df)

    self.dates.clear()
    for n, date in enumerate(df.index):
        self.dates[n] = date

    self.region_size = [df.shape[0]-99, df.shape[0]-1]
    # 设置交叉显示
    self.region = pg.LinearRegionItem(self.region_size)  # 创建区域,可用于同步显示另一个图像
    self.region.setZValue(1)
    self.balance_plot.addItem(self.region, ignoreBounds=True)
    self.region.sigRegionChanged.connect(self.drawdown_plot_update)
    self.region.sigRegionChanged.connect(self.pnl_plot_update)

    self.drawdown_plot.sigXRangeChanged.connect(self.updateRegion_by_drawdown_plot)
    self.pnl_plot.sigXRangeChanged.connect(self.updateRegion_by_pnl_plot)

    # Set data for curve of balance and drawdown
    self.balance_curve.setData(df["balance"])
    self.drawdown_curve.setData(df["drawdown"])

    # Set data for daily pnl bar
    profit_pnl_x = []
    profit_pnl_height = []
    loss_pnl_x = []
    loss_pnl_height = []

    for count, pnl in enumerate(df["net_pnl"]):
        if pnl >= 0:
            profit_pnl_height.append(pnl)
            profit_pnl_x.append(count)
        else:
            loss_pnl_height.append(pnl)
            loss_pnl_x.append(count)

    self.profit_pnl_bar.setOpts(x=profit_pnl_x, height=profit_pnl_height)
    self.loss_pnl_bar.setOpts(x=loss_pnl_x, height=loss_pnl_height)

    # Set data for pnl distribution
    hist, x = np.histogram(df["net_pnl"], bins="auto")
    x = x[:-1]
    self.distribution_curve.setData(x, hist)

def drawdown_plot_update(self):
    self.region.setZValue(1)
    minX, maxX = self.region.getRegion()
    self.drawdown_plot.setXRange(minX, maxX, padding=0)

def pnl_plot_update(self):
    self.region.setZValue(1)
    minX, maxX = self.region.getRegion()
    self.pnl_plot.setXRange(minX, maxX, padding=0)

def updateRegion_by_drawdown_plot(self):
    self.region.setRegion(self.drawdown_plot.getViewBox().viewRange()[0])

def updateRegion_by_pnl_plot(self):
    self.region.setRegion(self.pnl_plot.getViewBox().viewRange()[0])


聊聊期权量化 - 1 - 期权策略投研中的难点

发布于VeighNa社区公众号【vnpy-community】
 
原文作者: 陈晓优 | 发布时间:2024-05-18

 

系列文章计划

 

VeighNa Elite版中针对期权量化策略交易的OptionStrategy模块发布上线已经超过半年,同时【VeighNa全实战进阶系列 - 精研期权价差策略】课程也已在两个月前更新完毕。

但可能由于期权量化策略在互联网上的公开资料相对较少,目前在VeighNa社区中依然有许多对OptionStrategy感兴趣的用户苦于不知道如何上手。

因此我们策划了《聊聊期权量化》这个系列文章,希望能够帮助大家初探期权量化策略开发、投研和实盘的整体流程,计划中的主题包括:

  1. 期权策略投研中的难点
  2. 搭建本地化期权数据库
  3. 期权策略的开发和回测
  4. 实盘期权策略交易运维

 

来点学习动力

 

先来贴一下【精研期权价差策略】课程中给出的趋势动态调仓复合价差策略AdvancedSpreadStrategy,在中金所沪深300指数期权(IO)品种上的回测绩效:

description

description

从回测图表和绩效数据中,不难看出AdvancedSpreadStrategy期权策略的一些特点:

  • 资金曲线的整体形状更偏均值回归类策略

    • 大部分交易日呈现稳定的小幅盈利,偶尔出现一次性的显著回撤,和趋势跟踪类的CTA策略曲线形成鲜明的对比;
    • 统计数据上,总计727个交易日,超过半数(388个交易日)实现了盈利;
  • 3年回测中策略的整体盈利表现较为稳定

    • 年化收益率15.05%,差不多是最大回撤(-6.86%)的2.2倍;
    • 回测的Sharpe Ratio为1.55,表现出来就是较为平滑的资金曲线;
  • 策略调仓频率较低对交易成本不那么敏感

    • 日均调仓笔数仅为0.71(每天交易次数不到1笔),也就意味着平均上基本是日线级别的中低频调仓;
    • 总盈利金额是45.5W,差不多是滑点(14.9W)的3倍,即使滑点有所上升也不容易造成亏损;
    • 上述滑点体现了整体交易成本,其中包括交易时的Bid-Ask盘口(即滑点本身),以及固定金额的期权交易手续费(用每张合约费率除以合约乘数)。

 

梳理技术难点

 

看到这里可能有些同学已经迫不及待想要上手开始,但是在那之前要给大家先泼个凉水:相较于面向期货的CTA趋势跟踪策略,期权量化策略在开发和回测中的技术难度要大的多得多。

对于绝大多数量化策略来说,无论是CTA趋势策略、价差套利策略,还是本文探讨的期权量化策略,其核心都在于解决以下四个关键问题:

  1. 开仓时机:什么时候值得建仓?
  2. 标的选择:买卖选择什么合约?
  3. 仓位风险:买卖合约的具体数量?
  4. 平仓时机:何时将已开仓位平掉?

而期权量化策略在解决上述四个关键问题时面临更多挑战,三个显著的难点包括:

  • 逐日回放历史数据的回测模式

    • 期权市场每日新旧合约的交替导致可交易合约范围的持续变动。为此,必须建立并维护一个详尽的期权历史合约信息数据库,以确保每个交易日的合约范围得到准确确认。
    • 在期权策略回测过程中,由于无法预先确定具体要交易的合约,因此必须采用全量数据回放的模式。回测中的每个交易日,回测引擎都需要同时读取加载上百个合约的历史数据。
  • 多合约历史数据截面对齐回放

    • 为了保证和实盘的一致性,数据必须以截面方式推送。在加载所有合约的行情数据后,首先需对时间戳进行对齐,然后按时间顺序排序。
    • 期权策略不仅依赖于期权数据,还可能需要标的物数据,甚至某些复杂的期权策略还需涉及其他线性合约的数据。
  • 策略中支持动态选择交易标的

    • 策略相关的数据容器中,需要维护期权品种上不同月份和不同行权价合约之间整体关系,并管理期权合约的基本信息,如到期日、行权价、类型等。
    • 需要在策略中提供方便的数据更新机制和查询定位函数,通过基于标的物价格的相对坐标体系,用户可以灵活选择特定到期月份和虚值档位的期权进行交易。

针对上述期权策略开发中的挑战,VeighNa Elite版中引入了OptionStrategy期权量化策略模块,该模块与CtaStrategy(针对CTA策略)和PortfolioStrategy(针对组合策略)具有相似的功能,专注于期权策略的历史回测和实盘交易。OptionStrategy模块通过提供高效的策略开发、回测和优化工具,帮助用户将策略无缝应用于实盘交易中。这一过程能够显著减少交易期权时对交易员主观判断的依赖,同时提升策略执行的效率和一致性。

值得一提的是,VeighNa开源社区版中的OptionMaster模块则是专注于期权波动率交易。该模块为交易员提供了实时波动率曲面监控和期权波动率算法执行的交易功能,尽管如此,核心的交易决策仍然依赖于交易员的主观判断。

以下表格列出了两个期权模块的对比区别:

description

你对于期权量化策略的开发有什么疑问,或者希望在后续文章中看到的内容?欢迎在评论区留言告诉我们!!!

 

免责声明

文章中的信息或观点仅供参考,作者不对其准确性或完整性做出任何保证。读者应以其独立判断做出投资决策,作者不对因使用本报告的内容而引致的损失承担任何责任。

 



盘前获取合约码及其基本信息后写入本地sqliste数据库

class MyTdApi(TdApi):
    """
    CTP的交易API
    """

    if os.path.exists('D:/global/contract.db'): os.remove('D:/global/contract.db')

    column = "(ExchangeID,InstrumentName,ProductClass,DeliveryYear,DeliveryMonth,MaxMarketOrderVolume,MinMarketOrderVolume,MaxLimitOrderVolume,MinLimitOrderVolume,VolumeMultiple,PriceTick,CreateDate,OpenDate,ExpireDate,StartDelivDate,EndDelivDate,InstLifePhase,IsTrading,PositionType,PositionDateType,LongMarginRatio,ShortMarginRatio,MaxMarginSideAlgorithm,StrikePrice,OptionsType,UnderlyingMultiple,CombinationType,InstrumentID,ExchangeInstID,ProductID,UnderlyingInstrID)"

    values = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"

    conn = sqlite3.connect('D:/global/contract.db') # 如果没有 contract.db 则创建
    cursor = conn.cursor()

    # 创建表 如果还没有则创建
    for t in ["contract", "CFFEX", "SHFE", "DCE", "CZCE", "INE", "GFEX", "CFFEX_O", "SHFE_O", "DCE_O", "CZCE_O", "INE_O", "GFEX_O"]:

        cursor.execute(f'CREATE TABLE IF NOT EXISTS {t} (ExchangeID text,InstrumentName text,ProductClass text,DeliveryYear integer,DeliveryMonth integer,MaxMarketOrderVolume integer,MinMarketOrderVolume integer,MaxLimitOrderVolume integer,MinLimitOrderVolume integer,VolumeMultiple integer,PriceTick real,CreateDate text,OpenDate text,ExpireDate text,StartDelivDate text,EndDelivDate text,InstLifePhase text,IsTrading integer,PositionType text,PositionDateType text,LongMarginRatio real,ShortMarginRatio real,MaxMarginSideAlgorithm text,StrikePrice real,OptionsType text,UnderlyingMultiple real,CombinationType text,InstrumentID text,ExchangeInstID text,ProductID text,UnderlyingInstrID text)')
        conn.commit()

    cursor.close()
    conn.close()

    def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """合约查询回报"""

        conn = sqlite3.connect('D:/global/contract.db')
        cursor = conn.cursor()

        product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
        if product:

            ExchangeID_flag, option_flag = "", ""
            value_data = []

            for k, v in data.items():
                if "reserve" not in k:
                    if k == "ExchangeID": 
                        ExchangeID_flag = v

                    if k == 'OptionsType':
                        if v == "1": 
                            v = 'call'
                            option_flag = 'call'
                        if v == "2": 
                            v = 'put'
                            option_flag = 'put'
                    value_data.append(v)

            value_data = tuple(value_data) # 拟插入的单条数据

            # ["contract", "CFFEX", "SHFE", "DCE", "CZCE", "INE", "GFEX", "CFFEX_O", "SHFE_O", "DCE_O", "CZCE_O", "INE_O", "GFEX_O"]
            cursor.execute(f'INSERT INTO contract {self.column} VALUES {self.values}', value_data) # 汇总表
            conn.commit()

            if ExchangeID_flag == "CFFEX":
                if option_flag: # 期权
                    cursor.execute(f'INSERT INTO CFFEX_O {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()
                else: # 期货
                    cursor.execute(f'INSERT INTO CFFEX {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()

            if ExchangeID_flag == "SHFE":
                if option_flag: # 期权
                    cursor.execute(f'INSERT INTO SHFE_O {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()
                else: # 期货
                    cursor.execute(f'INSERT INTO SHFE {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()

            if ExchangeID_flag == "DCE":
                if option_flag: # 期权
                    cursor.execute(f'INSERT INTO DCE_O {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()
                else: # 期货
                    cursor.execute(f'INSERT INTO DCE {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()

            if ExchangeID_flag == "CZCE":
                if option_flag: # 期权
                    value_data = list(value_data)
                    value_data[-2] = value_data[-2][:-1] # 移除郑商所期权产品名称带有的C/P后缀
                    value_data = tuple(value_data)
                    cursor.execute(f'INSERT INTO CZCE_O {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()
                else: # 期货
                    cursor.execute(f'INSERT INTO CZCE {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()

            if ExchangeID_flag == "INE":
                if option_flag: # 期权
                    cursor.execute(f'INSERT INTO INE_O {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()
                else: # 期货
                    cursor.execute(f'INSERT INTO INE {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()

            if ExchangeID_flag == "GFEX":
                if option_flag: # 期权
                    cursor.execute(f'INSERT INTO GFEX_O {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()
                else: # 期货
                    cursor.execute(f'INSERT INTO GFEX {self.column} VALUES {self.values}', value_data) # 汇总表
                    conn.commit()
        if last:
            self.contract_inited = True
            self.gateway.write_log("合约信息查询成功")

            print("合约信息查询成功并存入本地 contract.db")
            cursor.close()
            conn.close()


基于迅投研的量化数据自运维下载更新

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2024-01-20
 

K线历史时序数据,不管对于期货CTA、股票多因子还是期权波动率等量化策略的开发来说,都是不可或缺的原材料。在大多数Quant的日常工作流程中,固定一块内容就是对每日增量数据的下载更新。

这块数据更新工作如果通过手动的方式执行各项步骤无疑会相当繁琐(而且容易误操作),所以本文中将会分享一套基于迅投研的量化数据自运维下载更新方案。

关于迅投研数据服务的详细介绍可以参考之前的这篇VeighNa发布v3.9.0 - 迅投研数据服务!》。

 

前期准备

 

试用申请

迅投研数据服务为VeighNa社区用户提供了14天免费试用,点击以下专属链接即可申请:

https://xuntou.net/#/signup?utm_source=vnpy

目前只需要提供手机号验证即可申请,试用账号中包括了股票、指数、期货、期权、基金等数据权限。

试用申请完成后,回到首页(https://xuntou.net/)登录账号,然后点击右上角的用户中心:

description

在弹出页面中底部即可找到【接口TOKEN】,点击【复制】按钮即可快速复制token内容,该token将用于后续VeighNa Trader中的数据服务配置:

description

同时在页面左下方【行情服务】的到期日,可以看到试用权限的到期时间。

 

模块安装

VeighNa Studio的3.9.0版中已经包含了迅投研相关的功能模块。使用其他Python环境的用户可以运行下述命令安装:

pip install xtquant vnpy_xt --index=https://pypi.vnpy.com

 

账号配置

启动VeighNa Trader,点击顶部菜单栏的【配置】按钮,在弹出的对话框中修改下述字段:

  • datafeed.name:xt
  • datafeed.username:token
  • datafeed.password:之前在用户中心复制的token内容

修改完成后点击对话框底部的【确定】按钮,重启VeighNa Trader即可生效。老用户也同样可以通过修改.vntrader目录下的vt_setting.json文件来进行配置。

 

数据更新

 

下载合约信息

迅投研数据服务的客户端xtquant在工作机制上和其他数据服务有一个较大的区别,即需要先从服务器下载所需的数据到本地缓存目录的数据文件,然后才能从数据文件中查询获取所需的数据。

合约更新任务的代码本身较为简单:

def update_history_data() -> None:
    """更新历史合约信息"""
    # 在子进程中加载xtquant
    from xtquant.xtdata import download_history_data

    # 初始化数据服务
    datafeed = get_datafeed()
    datafeed.init()

    # 下载历史合约信息
    download_history_data("", "historycontract")

    print("xtquant历史合约信息下载完成")

但对于合约信息数据,xtquant模块会在当前进程中首次初始化时读取,后续即使执行了下载更新操作,在当前进程的运行过程中也不会重载,必须等到下一次进程重启后才会更新。

因此对于历史合约信息更新函数update_history_data,需要使用multiprocessing模块在子进程中运行:

# 使用子进程更新历史合约信息
    process: Process = Process(target=update_history_data)
    process.start()
    process.join()      # 等待子进程执行完成

这样后续在主进程中获取合约信息时(等同于重启了新的进程),就是已经更新后的数据。

 

获取合约信息

为了实现自运维交易所数据更新,需要能够获取每日交易所新上市的合约代码以及之前已有存续的合约代码信息,这里需要用到xtquant.xtdata模块中所提供的get_stock_list_in_sector函数:

# 查询交易所历史合约代码
    active_symbols: list[str] = get_stock_list_in_sector(sector_name)
    expire_symbols: list[str] = get_stock_list_in_sector("过期" + sector_name)

    xt_symbols: list[str] = active_symbols + expire_symbols

函数中所传入的sector_name为迅投研定义的交易市场名称信息,一些比较常用的名称包括:

  • 中金所、上期所、大商所、郑商所、能源中心
  • 上证A股、深圳A股
  • 上证期权、深圳期权

同时对于每个市场,都包含了当前处于挂牌状态的合约,以及已经到期退市的合约(市场名称前需要加上【过期】前缀),需要分开查询后拼接成为该市场目前的整体合约代码列表。

 

数据增量过滤

对于本地已有K线数据的合约,每日仅需要下载更新增量的部分,而没有必要从头开始下载(浪费运行时间和数据流量)。这里借助于VeighNa数据库组件所提供的BarOverview汇总来查询当前本地已有的数据范围:

# 获取本地已有数据汇总
    data: list[BarOverview] = database.get_bar_overview()

    overviews: dict[str, BarOverview] = {}
    for o in data:
        vt_symbol: str = f"{o.symbol}.{o.exchange.value}"
        overviews[vt_symbol] = o

在遍历整个市场合约代码列表的过程中,可以先查询该合约的到期时间,然后只有在满足下列条件时才执行更新:

  • 本地从未下载过该合约的数据(合约刚上市或者首次运行下载)
  • 合约到期时间大于当前时间(尚未到期所以还有数据更新)

条件过滤代码如下:

# 查询合约信息
        data: dict = get_instrument_detail(xt_symbol, True)

        # 获取合约到期时间
        expiry: datetime = None
        if data["ExpireDate"]:
            expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

        # 拆分迅投研代码
        symbol, xt_exchange = xt_symbol.split(".")

        # 生成本地代码
        exchange: Exchange = EXCHANGE_XT2VT[xt_exchange]
        vt_symbol: str = f"{symbol}.{exchange.value}"

        # 查询数据汇总
        overview: BarOverview = overviews.get(vt_symbol, None)

        # 如果已经到期,则跳过
        if overview and expiry and expiry < now:
            continue

 

下载数据入库

对于通过了前述过滤筛选的合约,则执行K线下载并将数据写入到数据库:

# 实现增量查询
        start: datetime = START_TIME
        if overview:
            start = overview.end

        # 执行数据查询和更新入库
        req: HistoryRequest = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            start=start,
            end=now,
            interval=interval
        )

        bars: list[BarData] = datafeed.query_bar_history(req)

        if bars:
            database.save_bar_data(bars)

            start_dt: datetime = bars[0].datetime
            end_dt: datetime = bars[-1].datetime
            msg: str = f"{vt_symbol}数据更新成功,{start_dt} - {end_dt}"
            print(msg)

以上针对每个市场的K线数据更新功能,整体封装在了update_bar_data函数中,用户可以根据需求传入所需的市场名称参数执行更新任务:

    # 这里只更新两个期货交易所的数据    update_bar_data("中金所")    update_bar_data("上期所")

 

完整代码

 

老规矩还是附上完整的程序代码:

from multiprocessing import Process
from datetime import datetime

from vnpy.trader.database import BarOverview
from vnpy.trader.datafeed import get_datafeed
from vnpy.trader.database import get_database
from vnpy.trader.object import BarData, HistoryRequest
from vnpy.trader.constant import Exchange, Interval


# 交易所映射关系
EXCHANGE_XT2VT = {
    "SH": Exchange.SSE,
    "SZ": Exchange.SZSE,
    "BJ": Exchange.BSE,
    "SF": Exchange.SHFE,
    "IF": Exchange.CFFEX,
    "INE": Exchange.INE,
    "DF": Exchange.DCE,
    "ZF": Exchange.CZCE,
    "GF": Exchange.GFEX
}

# 开始查询时间
START_TIME = datetime(2018, 1, 1)


def update_history_data() -> None:
    """更新历史合约信息"""
    # 在子进程中加载xtquant
    from xtquant.xtdata import download_history_data

    # 初始化数据服务
    datafeed = get_datafeed()
    datafeed.init()

    # 下载历史合约信息
    download_history_data("", "historycontract")

    print("xtquant历史合约信息下载完成")


def update_bar_data(
    sector_name: str,
    interval: Interval = Interval.MINUTE
) -> None:
    """更新K线数据"""
    # 在子进程中加载xtquant
    from xtquant.xtdata import (
        get_stock_list_in_sector,
        get_instrument_detail
    )

    # 初始化数据服务
    datafeed = get_datafeed()
    datafeed.init()

    # 连接数据库
    database = get_database()

    # 获取当前时间戳
    now: datetime = datetime.now()

    # 获取本地已有数据汇总
    data: list[BarOverview] = database.get_bar_overview()

    overviews: dict[str, BarOverview] = {}
    for o in data:
        vt_symbol: str = f"{o.symbol}.{o.exchange.value}"
        overviews[vt_symbol] = o

    # 查询交易所历史合约代码
    xt_symbols: list[str] = get_stock_list_in_sector(sector_name)

    # 遍历列表查询合约信息
    for xt_symbol in xt_symbols:
        # 查询合约信息
        data: dict = get_instrument_detail(xt_symbol, True)

        # 获取合约到期时间
        expiry: datetime = None
        if data["ExpireDate"]:
            expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

        # 拆分迅投研代码
        symbol, xt_exchange = xt_symbol.split(".")

        # 生成本地代码
        exchange: Exchange = EXCHANGE_XT2VT[xt_exchange]
        vt_symbol: str = f"{symbol}.{exchange.value}"

        # 查询数据汇总
        overview: BarOverview = overviews.get(vt_symbol, None)

        # 如果已经到期,则跳过
        if overview and expiry and expiry < now:
            continue

        # 实现增量查询
        start: datetime = START_TIME
        if overview:
            start = overview.end

        # 执行数据查询和更新入库
        req: HistoryRequest = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            start=start,
            end=now,
            interval=interval
        )

        bars: list[BarData] = datafeed.query_bar_history(req)

        if bars:
            database.save_bar_data(bars)

            start_dt: datetime = bars[0].datetime
            end_dt: datetime = bars[-1].datetime
            msg: str = f"{vt_symbol}数据更新成功,{start_dt} - {end_dt}"
            print(msg)


if __name__ == "__main__":
    # 使用子进程更新历史合约信息
    process: Process = Process(target=update_history_data)
    process.start()
    process.join()      # 等待子进程执行完成

    # 更新历史数据
    update_bar_data("上期所")
    update_bar_data("过期上期所")

 



彻底解决tick数据的过滤问题

1. 问题的由来

  • 在文章 发现了vnpy的BarGenerator两个隐藏很深的错误 !中我就已经分析过tick数据对bar生成器的影响。
  • 当前vnpy系统对集合竞价tick与其他tick没有区分能力
  • 当前vnpy系统没有充分利用行情接口提供的状态信息,无法识别有效tick与无效tick,一股脑地发送到策略和应用中,导致bar合成的错误。

2. 问题的解决方法

在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。

tick数据过滤器的功能:

  1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用,每个网关对应一个ick数据过滤;
    要做到这一条,就必须做到对所有已经订阅过的合约的tick的缓存,否则你再次重启系统的时候是无法知道你收到第一个tick是否已经参与过之前bar的合成了。这样你可能重复使用该tick,这是错误的。
    为此我们需要将所有已经订阅过的合约的最新tick进行实时更新,并定期做持久化保存,且在每次系统启动的时候读取加载到系统中。
  2. 过滤无效tick,转发有效交易状态下的tick到系统中,不在有效交易状态下tick做丢弃处理,有效交易状态包括:集合竞价状态和连续竞价状态;
    CTP系统的行情接口中包含的实时更新的合约交易状态通知推送接口,OnRtnInstrumentStatus()。关于这个问题我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中做了详细的介绍,再次就不赘述。总之合约交易状态通知可以让我识别一个tick是否是有些大tick。
  3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持。
    合约交易状态通知可以让我们知道那些tick是tick,同时可以可以让我们区分那个tick是集合竞价tick,那些是连续竞价tick。对有效tick进性分析利用于我们策略或者应用生成出正确的bar。
  4. 本文只对CtpGateway,CtaEngine、CtaTemplate进行了更改,其他网关系统的道理都是相同的。如果您觉得对您有启发,也可以按同样的方法修改。

3. 过滤无效tick数据的实现代码

声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。

3.1 相关数据类型定义

4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:

class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"


# 有效交易状态
VALID_TRADE_STATUSES = [
    InstrumentStatus.CONTINOUS,
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]

# 集合竞价交易状态
AUCTION_STATUS = [
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]


class StatusEnterReason(Enum):
    """
    品种进入交易状态原因类型 hxxjava debug
    """
    AUTOMATIC = "自动切换"
    MANUAL = "手动切换"
    FUSE = "熔断"

在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:

@dataclass
class StatusData(BaseData):
    """
    hxxjava debug
    """
    symbol:str       
    exchange : Exchange    
    settlement_group_id : str = ""  
    instrument_status : InstrumentStatus = None   
    trading_segment_sn : int = None 
    enter_time : str = ""      
    enter_reason : str = ""  
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def belongs_to(self,vt_symbol:str):
        symbol,exchange_str = vt_symbol.split(".")
        instrument = left_alphas(symbol).upper()
        return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)

3.2 相关消息定义

在vnpy\trader\event.py中增加交易状态消息类型

EVENT_STATUS = "eStatus"                        # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick."            # hxxjava debug

3.3 Gateway的修改

在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:

引用部分增加:

from .event import EVENT_ORIGIN_TICK,EVENT_STATUS         # hxxjava add
from .object import StatusData    # hxxjava add

修改class BaseGateway的on_tick()接口,增加on_status()接口:

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_ORIGIN_TICK, tick)  # hxxjava add             
        # self.on_event(EVENT_TICK, tick)                   
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)  

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)
        self.on_event(EVENT_STATUS + status.vt_symbol, status)

3.4 CtpGateway的修改

修改vnpy_cpt\ctp_gateway.py:

增加引用部分

from vnpy.trader.constant import InstrumentStatus,StatusEnterReason  # hxxjava debug
rom vnpy.trader.object import StatusData,     # hxxjava debug

增加几个映射字典:

# 品种状态进入原因映射  hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
    "0": InstrumentStatus.BEFORE_TRADING,
    "1": InstrumentStatus.NO_TRADING,
    "2": InstrumentStatus.CONTINOUS,
    "3": InstrumentStatus.AUCTION_ORDERING,
    "4": InstrumentStatus.AUCTION_BALANCE,
    "5": InstrumentStatus.AUCTION_MATCH,
    "6": InstrumentStatus.CLOSE,
    "7": InstrumentStatus.CLOSE
}


# 品种状态进入原因映射  hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
    "1": StatusEnterReason.AUTOMATIC,
    "2": StatusEnterReason.MANUAL,
    "3": StatusEnterReason.FUSE
}

为class CtpTdApi增加下面合约状态推送接口:

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约品种状态信息 # hxxjava debug 
        """
        if data:
            # print(f"【data={data}】")
            status =  StatusData(
                symbol = data["InstrumentID"],
                exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
                settlement_group_id = data["SettlementGroupID"],
                instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
                trading_segment_sn = data["TradingSegmentSN"],
                enter_time = data["EnterTime"],
                enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
                exchange_inst_id = data["ExchangeInstID"],
                gateway_name=self.gateway_name
            )
            # print(f"status={status}")
            self.gateway.on_status(status)

3.5 对CtaEngine的进行扩展

增加引用部分

from vnpy.trader.event import EVENT_AUCTION_TICK  # hxxjava add

增加一个对CtaEgine的扩展MyCtaEngine

class MyCtaEngine(CtaEngine):
    """  """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 """

        tick:TickData = event.data
        strategies = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

把vnpy_ctastrategy目录下 的__init__.py中的CtaStrategyApp做如下修改

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

3.6 CtaTemplate的修改

修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

3.7 为数据库增加最新Tick保存函数

3.7.1 修改vnpy\trader\database.py

为class BaseDatabase增加下面两个接口函数:

    @abstractmethod
    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        pass

    @abstractmethod
    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        pass

3.7.2 修改vnpy_mysql\mysql_database.py

class MyDateTimeField(DateTimeField):
    def get_modifiers(self):
        return [6]

class DbLastTick(Model):    # hxxjava add
    """ 最新TICK数据表映射对象 """

    id = AutoField()

    gateway_name: str = CharField()

    symbol: str = CharField()
    exchange: str = CharField()
    datetime: datetime = MyDateTimeField()

    name: str = CharField()
    volume: float = FloatField()
    turnover: float = FloatField()
    open_interest: float = FloatField()
    last_price: float = FloatField()
    last_volume: float = FloatField()
    limit_up: float = FloatField()
    limit_down: float = FloatField()

    open_price: float = FloatField()
    high_price: float = FloatField()
    low_price: float = FloatField()
    pre_close: float = FloatField()

    bid_price_1: float = FloatField()
    bid_price_2: float = FloatField(null=True)
    bid_price_3: float = FloatField(null=True)
    bid_price_4: float = FloatField(null=True)
    bid_price_5: float = FloatField(null=True)

    ask_price_1: float = FloatField()
    ask_price_2: float = FloatField(null=True)
    ask_price_3: float = FloatField(null=True)
    ask_price_4: float = FloatField(null=True)
    ask_price_5: float = FloatField(null=True)

    bid_volume_1: float = FloatField()
    bid_volume_2: float = FloatField(null=True)
    bid_volume_3: float = FloatField(null=True)
    bid_volume_4: float = FloatField(null=True)
    bid_volume_5: float = FloatField(null=True)

    ask_volume_1: float = FloatField()
    ask_volume_2: float = FloatField(null=True)
    ask_volume_3: float = FloatField(null=True)
    ask_volume_4: float = FloatField(null=True)
    ask_volume_5: float = FloatField(null=True)

    localtime: datetime = DateTimeField(null=True)

    class Meta:
        database = db
        indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)

class MysqlDatabase的初始化做如下修改:

    def __init__(self) -> None:
        """"""
        self.db = db
        self.db.connect()
        self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview])   # hxxjava add DbLastTick,DbContractData

再为class MysqlDatabase添加下面两个函数:

    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        vt_symbols = [t.vt_symbol for t in ticks]

        # 删除ticks列表中包含合约的旧的tick记录
        d: ModelDelete = DbLastTick.delete().where(
            (DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
        )
        count = d.execute()
        # print(f"delete {count} last ticks")

        # 构造最新的ticks列表数据
        data = []
        for t in ticks:
            tick:TickData = deepcopy(t)     # hxxjava change
            tick.datetime = tick.datetime

            d = tick.__dict__
            d["exchange"] = d["exchange"].value
            d.pop("vt_symbol")
            data.append(d)
            # print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))

        # 使用upsert操作将数据更新到数据库中
        with self.db.atomic():
            for c in chunked(data, 50):
                DbLastTick.insert_many(c).on_conflict_replace().execute()

        return True

    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        try:
            # 从DbLastTick查询符合条件的最新tick记录
            s: ModelSelect = (
                DbLastTick.select().where(
                    (DbLastTick.gateway_name == gateway_name)
                    & (exchange is None or DbLastTick.exchange == exchange.value)
                    & (symbol is None or DbLastTick.symbol == symbol)
                ).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
            )

            # 利用最新tick记录构造ticks列表
            ticks: List[TickData] = []
            for db_tick in s:
                tick:TickData = TickData(
                    symbol=db_tick.symbol,
                    exchange=Exchange(db_tick.exchange),
                    datetime=to_china_tz(db_tick.datetime),
                    name=db_tick.name,
                    volume=db_tick.volume,
                    turnover=db_tick.turnover,
                    open_interest=db_tick.open_interest,
                    last_price=db_tick.last_price,
                    last_volume=db_tick.last_volume,
                    limit_up=db_tick.limit_up,
                    limit_down=db_tick.limit_down,
                    open_price=db_tick.open_price,
                    high_price=db_tick.high_price,
                    low_price=db_tick.low_price,
                    pre_close=db_tick.pre_close,
                    bid_price_1=db_tick.bid_price_1,
                    bid_price_2=db_tick.bid_price_2,
                    bid_price_3=db_tick.bid_price_3,
                    bid_price_4=db_tick.bid_price_4,
                    bid_price_5=db_tick.bid_price_5,
                    ask_price_1=db_tick.ask_price_1,
                    ask_price_2=db_tick.ask_price_2,
                    ask_price_3=db_tick.ask_price_3,
                    ask_price_4=db_tick.ask_price_4,
                    ask_price_5=db_tick.ask_price_5,
                    bid_volume_1=db_tick.bid_volume_1,
                    bid_volume_2=db_tick.bid_volume_2,
                    bid_volume_3=db_tick.bid_volume_3,
                    bid_volume_4=db_tick.bid_volume_4,
                    bid_volume_5=db_tick.bid_volume_5,
                    ask_volume_1=db_tick.ask_volume_1,
                    ask_volume_2=db_tick.ask_volume_2,
                    ask_volume_3=db_tick.ask_volume_3,
                    ask_volume_4=db_tick.ask_volume_4,
                    ask_volume_5=db_tick.ask_volume_5,
                    localtime=db_tick.localtime,
                    gateway_name=db_tick.gateway_name
                )
                ticks.append(tick)

            return ticks

        except:
            # 当DbLastTick表不存在的时候,会发生错误
            return []

3.8 tick数据过滤器的实现

在vnpy.usertools下创建tickfilter.py文件,其内容如下:

"""
本文件主要实现tick数据过滤器——TickFilter。

tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持

作者:hxxjava
日期:2022-06-16
修改日期:              修改原因:  
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
    EVENT_ORIGIN_TICK,
    EVENT_AUCTION_TICK,
    EVENT_TICK,
    EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol


def left_alphas(instr:str):
    """
    得到字符串左边的字符部分
    """
    ret_str = ''
    for s in instr:
        if s.isalpha():
            ret_str += s
        else:
            break
    return ret_str

def get_vt_instrument(vt_symbol:str):
    """
    从完整合约代码转换到完整品种代码
    """    
    symbol,exchange = extract_vt_symbol(vt_symbol)
    instrument = left_alphas(symbol)
    return f"{instrument}.{exchange.value}"


class TickFilter():
    """ tick数据过滤器 """
    CHECK_INTERVAL:int = 5  # 更新到数据库间隔

    def __init__(self,event_engine:EventEngine,gateway_name:str):
        """ tick数据过滤器初始化 """
        self.event_engine = event_engine
        self.gateway_name = gateway_name
        self.db = get_database()

        # 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
        self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}

        # 品种及合约状态字典 { vt_symbol : StatusData }
        self.statuses:Dict[str,StatusData] = {}
        self.second_cnt = 0

        self.load_last_ticks()
        self.register_event()

        # print(f"TickFilter {gateway_name}")

    def load_last_ticks(self):
        """ 
        加载属于网关名称为self.gateway_name的最新tick列表 
        """
        last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
        for tick in last_ticks:
            self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)

        # print(f"load {len(last_ticks)} last ticks")

    def register_event(self):
        """ 注册消息 """
        self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_STATUS,self.process_status_event)
        self.event_engine.register(EVENT_TIMER,self.check_last_ticks)        

    def process_tick_event(self,event:Event):
        """ 对原始tick进行过滤 """
        tick:TickData = event.data

        # 检查tick合约的经验状态是否位有效交易状态
        status:StatusData = self.statuses.get(tick.vt_symbol,None)
        if not status:
            vt_instrument = get_vt_instrument(tick.vt_symbol)
            status = self.statuses.get(vt_instrument,None)
            if not status:
                # 未收到交易状态,返回
                return

        if status.instrument_status not in VALID_TRADE_STATUSES:
            # 不在有效交易状态,返回
            return

        key = (tick.gateway_name,tick.vt_symbol)
        _,oldtick = self.last_ticks.get(key,(None,None))
        valid_tick = False
        if not oldtick:
            # 没有该合约的历史tick
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        elif tick.datetime > oldtick.datetime:
            # 
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        else:
            print(f"【特别tick = {tick}】")

        if valid_tick == True:
            # 如果是有效的tick
            if status.instrument_status != InstrumentStatus.CONTINOUS:
                # 发送集合竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
                self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))  

            else:
                # 发送连续竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_TICK,tick))
                self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))  

    def process_status_event(self, event: Event):  
        """ 交易状态通知消息处理 """
        status:StatusData = event.data
        self.statuses[status.vt_symbol] = status

        # print(f"【{status.gateway_name} {status}】")

    def check_last_ticks(self,event:Event) -> None:
        """ 原始tick过滤器 """
        self.second_cnt += 1
        if self.second_cnt % self.CHECK_INTERVAL == 0:
            # 如果到了定时间隔

            # 查询所有更新的tick
            changed_ticks = [] 

            for key,(update,tick) in self.last_ticks.items():
                if update:
                    changed_ticks.append(tick)
                    self.last_ticks[key] = (False,tick)

            if changed_ticks:
                # 如果存在更新的tick,保存到数据库
                t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
                t.start()
                # print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")

3.9 把tick数据过滤器安装到主引擎MainEngine上去

修改vnpy\trader\engine.py

添加引用部分

from vnpy.usertools.tickfilter import TickFilter    # hxxjava add

修改MainEngine的

在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:

self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add

修改其add_gateway(),内容如下:

    def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
        """
        Add gateway.
        """
        # Use default name if gateway_name not passed
        if not gateway_name:
            gateway_name = gateway_class.default_name

        gateway = gateway_class(self.event_engine, gateway_name)
        self.gateways[gateway_name] = gateway

        # Add gateway supported exchanges into engine
        for exchange in gateway.exchanges:
            if exchange not in self.exchanges:
                self.exchanges.append(exchange)

        # add a tick data filter for the gateway    #  hxxjava add  
        if gateway_name not in self.tick_filters:
            self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)

        return gateway

4. 经过上面的一系列修改,你获得了哪些好处?

  • 你的策略再也不会收到重复数据和垃圾数据
  • 此以后你的CTA策略中必须加入一个on_auction_tick()接口函数,用来接受每个交易日集合竞价所产生的tick。如何使用这个tick你有你的方法。
  • 在合成K线的时候你才可能构成正确的K线,比如BarGenerator对跨日tick时间戳的处理错误问题,在此也会迎刃而解。

4.1 现在来梳理下我们都干了哪些事情

  1. 在CtpGateway中引入了合约交易状态,这可以用来过滤无效数据,同时还能够识别集合竞价tick。
  2. 在database中增加了最新tick持久化保存,这为新的tick是否是重复的判断提供支持。
  3. 提供有效tick的分类,在CTA策略的模板中增加on_auction_tick()接口使得BarGenerator正确处1分钟bar的成交量和成交额成为可能。

4.2 非CTP网关使用者是否也可以这样做?

只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。

5. 解决BarGenerator统计bar成交量和成交额错误的方法

5.1 这是对BarGenerator做出点修改,

  • 修改BarGenerator的初始化函数
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ):
        """ Constructor """      

        ... ...  # 其他代码省略

        self.auction_tick:TickData = None
        self.last_tick: TickData = None
  • 增加BarGenerator的集合竞价tick处理函数
    def update_auction_tick(self,tick:TickData):
        """ 更新集合竞价tick """
        self.auction_tick = tick
  • 修改BarGenerator的1分钟bar合成函数
    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        if self.auction_tick:
            # 合约集合竞价tick到当前tick
            tick.high_price = max(tick.high_price,self.auction_tick.high_price)
            tick.low_price = min(tick.low_price,self.auction_tick.low_price)

            # 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
            self.last_tick = deepcopy(self.auction_tick)
            # 成交量和成交额每天从0开始单调递增
            self.last_tick.volume = 0.0   
            self.last_tick.turnover = 0.0

            # 用完集合竞价tick就丢弃
            self.auction_tick = None

      ... ...  # 其他代码省略

5.2 您的策略关于集合竞价tick更新的回调函数:

    def on_auction_tick(self, tick: TickData):
        """
        集合竞价tick处理
        """
        self.bg.update_auction_tick(tick)    # 假设self.bg是已经创建过的bar生成器

两点说明:

  1. 如果你在阅读本文的时候觉得有点一头雾水,可以搜索'hxxjava'字符串,将会显示大部分修改的代码,仔细揣摩下,就会知道我做了什么了!
  2. 另外本贴中还有一部分涉及到条件单的代码,如果出现错误,可以查找我的关于条件单的帖子比停止单更好用的条件单——ConditionOrder,这里就不再重复贴出那部分代码了。


开源版本稳健业绩评价指标-RAR,RCubed以及Robust_Sharpe

阅读过知乎文章【Elite量化策略实验室】RUMI策略2之后, 打算在VeighNa社区开源版本上续写一个增加RAR(Regressed Annual Return), R_Cubed以及Robust_Shape指标的功能.

先附上详细运行过程文件ipynb, 位于github地址

接下来介绍一下各个指标的计算方法.

第一, RAR根据各个时间段的累计收益, 对时间长度做回归, 再转换成年化收益.

RAR回归方程示例
description

Backtesting for optimized Regressed Annual Return parameters, RAR参数优化之后的回测曲线
description

第二, R_Cubed根据书本<海龟交易法则>中描述, 以RAR为分子, 前五最大回撤平均值 * 对应回撤的平均天数为分母, 整体再乘上365天/年得到.

但是在计算的结果中发现, 原版公式最后优化得到的策略曲线并不优秀, 反复打印查看代码逻辑之后并没有发现错误点.(也可能是笔者能力有限, 欢迎大牛前来指正). 优化得到结果如下

Backtesting for optimized original R-Cubed parameters, R-Cubed参数优化后的回测曲线
description

由于上述结果不佳, 对R_Cubed稍作修改, 仍以RAR为分子, 分母改成仅前五最大回撤的平均值. 修改后的R_Cubed比原版稍微正常一点.

Backtesting for simplified R-Cubed parameters, 简化后的R-Cubed最优回测曲线
description

最后一个指标, Robust_Sharpe以RAR作为分子, 年化收益标准差为分母计算得到.

Backtesting for optimized Robust Sharpe parameters, Robust Sharpe参数优化后的回测曲线
description

完整的引擎代码如下

from pandas import DataFrame
from statsmodels.api import OLS
import statsmodels.api as sm
from pandas import Series
import numpy as np
from datetime import date, datetime, timedelta
from typing import List, Dict

from vnpy_ctastrategy.backtesting import (BacktestingEngine, 
                                          CtaTemplate, 
                                          Interval, 
                                          BacktestingMode,
                                          OptimizationSetting,
                                          check_optimization_setting,
                                          run_bf_optimization,
                                          run_ga_optimization,
                                          get_target_value,
                                          partial)

class BacktestingEngineNewStatistics(BacktestingEngine):
    """"""
    def __init__(self):
        super().__init__()
        self.ddpercent_only: bool = True # 用于控制计算R_cubed参数时, 是否考虑回撤周期长度的布尔开关.

    def calculate_statistics(self, df: DataFrame = None, output=True) -> dict:
        self.output("开始计算策略统计指标")

        # Check DataFrame input exterior
        if df is None:
            df: DataFrame = self.daily_df

        # Init all statistics default value
        start_date: str = ""
        end_date: str = ""
        total_days: int = 0
        profit_days: int = 0
        loss_days: int = 0
        end_balance: float = 0
        max_drawdown: float = 0
        max_ddpercent: float = 0
        max_drawdown_duration: int = 0
        total_net_pnl: float = 0
        daily_net_pnl: float = 0
        total_commission: float = 0
        daily_commission: float = 0
        total_slippage: float = 0
        daily_slippage: float = 0
        total_turnover: float = 0
        daily_turnover: float = 0
        total_trade_count: int = 0
        daily_trade_count: int = 0
        total_return: float = 0
        annual_return: float = 0
        daily_return: float = 0
        return_std: float = 0
        sharpe_ratio: float = 0
        return_drawdown_ratio: float = 0
        regressed_annual_return: float = 0
        r_cubed: float = 0
        robust_sharpe_ratio: float = 0
        # Check if balance is always positive
        positive_balance: bool = False

        if df is not None:
            # Calculate balance related time series data
            df["balance"] = df["net_pnl"].cumsum() + self.capital
            # When balance falls below 0, set daily return to 0
            pre_balance: Series = df["balance"].shift(1)
            pre_balance.iloc[0] = self.capital
            x = df["balance"] / pre_balance
            x[x <= 0] = np.nan
            df["return"] = np.log(x).fillna(0)

            df["highlevel"] = (
                df["balance"].rolling(
                    min_periods=1, window=len(df), center=False).max()
            )
            df["drawdown"] = df["balance"] - df["highlevel"]
            df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

            # 添加部分 calculate regressed_annual_return
            # df["cumreturn"] = (df["balance"] / self.capital - 1) * 100
            df["cumreturn"] = df["return"].expanding(1).sum() * 100



            # All balance value needs to be positive
            positive_balance = (df["balance"] > 0).all()
            if not positive_balance:
                self.output("回测中出现爆仓(资金小于等于0),无法计算策略统计指标")

        # Calculate statistics value
        if positive_balance:
            # Calculate statistics value
            start_date = df.index[0]
            end_date = df.index[-1]

            total_days: int = len(df)
            profit_days: int = len(df[df["net_pnl"] > 0])
            loss_days: int = len(df[df["net_pnl"] < 0])

            end_balance = df["balance"].iloc[-1]
            max_drawdown = df["drawdown"].min()
            max_ddpercent = df["ddpercent"].min()
            max_drawdown_end = df["drawdown"].idxmin()

            if isinstance(max_drawdown_end, date):
                max_drawdown_start = df["balance"][:max_drawdown_end].idxmax()
                max_drawdown_duration: int = (max_drawdown_end - max_drawdown_start).days
            else:
                max_drawdown_duration: int = 0

            total_net_pnl: float = df["net_pnl"].sum()
            daily_net_pnl: float = total_net_pnl / total_days

            total_commission: float = df["commission"].sum()
            daily_commission: float = total_commission / total_days

            total_slippage: float = df["slippage"].sum()
            daily_slippage: float = total_slippage / total_days

            total_turnover: float = df["turnover"].sum()
            daily_turnover: float = total_turnover / total_days

            total_trade_count: int = df["trade_count"].sum()
            daily_trade_count: int = total_trade_count / total_days

            total_return: float = (end_balance / self.capital - 1) * 100
            annual_return: float = total_return / total_days * self.annual_days
            daily_return: float = df["return"].mean() * 100
            return_std: float = df["return"].std() * 100

            if return_std:
                daily_risk_free: float = self.risk_free / np.sqrt(self.annual_days)
                sharpe_ratio: float = (daily_return - daily_risk_free) / return_std * np.sqrt(self.annual_days)
            else:
                sharpe_ratio: float = 0

            if max_ddpercent:
                return_drawdown_ratio: float = -total_return / max_ddpercent
            else:
                return_drawdown_ratio = 0

            # 计算regressed_annual_return
            regressed_annual_return = calculate_regressed_annual_return(df, self.annual_days)
            df.to_csv("df.csv")
            # 计算各阶段回撤 calculate period dropdowns
            dropdowns = find_periodic_dropdowns(df)
            # 计算r-cubed指标
            r_cubed = calculate_r_cubed(dropdowns, regressed_annual_return, self.ddpercent_only)
            # 计算robust_sharpe_ratio
            robust_sharpe_ratio = regressed_annual_return / (return_std * np.sqrt(self.annual_days))


        # Output
        if output:
            self.output("-" * 30)
            self.output(f"首个交易日:\t{start_date}")
            self.output(f"最后交易日:\t{end_date}")

            self.output(f"总交易日:\t{total_days}")
            self.output(f"盈利交易日:\t{profit_days}")
            self.output(f"亏损交易日:\t{loss_days}")

            self.output(f"起始资金:\t{self.capital:,.2f}")
            self.output(f"结束资金:\t{end_balance:,.2f}")

            self.output(f"总收益率:\t{total_return:,.2f}%")
            self.output(f"年化收益:\t{annual_return:,.2f}%")
            self.output(f"最大回撤: \t{max_drawdown:,.2f}")
            self.output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
            self.output(f"最长回撤天数: \t{max_drawdown_duration}")

            self.output(f"总盈亏:\t{total_net_pnl:,.2f}")
            self.output(f"总手续费:\t{total_commission:,.2f}")
            self.output(f"总滑点:\t{total_slippage:,.2f}")
            self.output(f"总成交金额:\t{total_turnover:,.2f}")
            self.output(f"总成交笔数:\t{total_trade_count}")

            self.output(f"日均盈亏:\t{daily_net_pnl:,.2f}")
            self.output(f"日均手续费:\t{daily_commission:,.2f}")
            self.output(f"日均滑点:\t{daily_slippage:,.2f}")
            self.output(f"日均成交金额:\t{daily_turnover:,.2f}")
            self.output(f"日均成交笔数:\t{daily_trade_count}")

            self.output(f"日均收益率:\t{daily_return:,.2f}%")
            self.output(f"收益标准差:\t{return_std:,.2f}%")
            self.output(f"Sharpe Ratio:\t{sharpe_ratio:,.2f}")
            self.output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")
            self.output(f"Regressed Annual Return: \t {regressed_annual_return:,.4f}")
            self.output(f"R-cubed Ratio: \t{r_cubed:,.4f}")
            self.output(f"Robust Sharpe Ratio: \t{robust_sharpe_ratio:,.4f}")

        statistics: dict = {
            "start_date": start_date,
            "end_date": end_date,
            "total_days": total_days,
            "profit_days": profit_days,
            "loss_days": loss_days,
            "capital": self.capital,
            "end_balance": end_balance,
            "max_drawdown": max_drawdown,
            "max_ddpercent": max_ddpercent,
            "max_drawdown_duration": max_drawdown_duration,
            "total_net_pnl": total_net_pnl,
            "daily_net_pnl": daily_net_pnl,
            "total_commission": total_commission,
            "daily_commission": daily_commission,
            "total_slippage": total_slippage,
            "daily_slippage": daily_slippage,
            "total_turnover": total_turnover,
            "daily_turnover": daily_turnover,
            "total_trade_count": total_trade_count,
            "daily_trade_count": daily_trade_count,
            "total_return": total_return,
            "annual_return": annual_return,
            "daily_return": daily_return,
            "return_std": return_std,
            "sharpe_ratio": sharpe_ratio,
            "return_drawdown_ratio": return_drawdown_ratio,
            "regressed_annual_return": regressed_annual_return,
            "r_cubed_ratio": r_cubed,
            "robust_sharpe_ratio": robust_sharpe_ratio,
        }
        # Filter potential error infinite value
        for key, value in statistics.items():
            if value in (np.inf, -np.inf):
                value = 0
            statistics[key] = np.nan_to_num(value)

        self.output("策略统计指标计算完成")

        return statistics

    def run_bf_optimization(
        self,
        optimization_setting: OptimizationSetting,
        output: bool = True,
        max_workers: int = None
    ) -> list:
        """"""
        if not check_optimization_setting(optimization_setting):
            return

        evaluate_func: callable = wrap_evaluate(self, optimization_setting.target_name)
        results: list = run_bf_optimization(
            evaluate_func,
            optimization_setting,
            get_target_value,
            max_workers=max_workers,
            output=self.output
        )

        if output:
            for result in results:
                msg: str = f"参数:{result[0]}, 目标:{result[1]}"
                self.output(msg)

        return results

    run_optimization = run_bf_optimization

    def run_ga_optimization(
        self,
        optimization_setting: OptimizationSetting,
        output: bool = True,
        max_workers: int = None
    ) -> list:
        """"""
        if not check_optimization_setting(optimization_setting):
            return

        evaluate_func: callable = wrap_evaluate(self, optimization_setting.target_name)
        results: list = run_ga_optimization(
            evaluate_func,
            optimization_setting,
            get_target_value,
            max_workers=max_workers,
            output=self.output
        )

        if output:
            for result in results:
                msg: str = f"参数:{result[0]}, 目标:{result[1]}"
                self.output(msg)

        return results


def calculate_regressed_annual_return(df: DataFrame, annual_days) -> float:
    """计算regressed_annual_return"""
    X = np.linspace(1, len(df), len(df))
    # 回归方程不需要添加常数项, 以y=ax为模型
    Y1 = df["cumreturn"].values
    model1 = OLS(Y1, X)
    result1 = model1.fit()
    # annualise fitted return 
    regressed_annual_return = result1.params[0] * annual_days
    return regressed_annual_return

def calculate_r_cubed(dropdowns: List[Dict], regressed_annual_return: float, ddpercent_only: bool) -> float:
    # 回测中有出现净值不断下跌的情况, 此时dropdowns会是个空值列表
    if dropdowns:
        dropdowns_sorted = sorted(dropdowns, key = lambda x:x["max_ddpercent"])
        if len(dropdowns_sorted)  > 4:
            top_dropdowns_sorted = dropdowns_sorted[:5]
        else:
            top_dropdowns_sorted = dropdowns_sorted
        top_max_dropdown_percents = [i["max_ddpercent"] for i in top_dropdowns_sorted]
        top_max_dropdown_lengths = [i["max_ddpercent_length"] for i in top_dropdowns_sorted]
        # 公式计算r-cubed
        # 由于计算dropdown_length使用的是自然日, 则年化需要用365/年的常数.
        average_top_dropdowns = abs(np.mean(top_max_dropdown_percents))
        average_top_dropdowns_length = np.mean(top_max_dropdown_lengths)

        if not ddpercent_only:
            # 计算RAR/top平均回撤/top回撤时间*365天年化
            r_cubed = regressed_annual_return * 365 / average_top_dropdowns / average_top_dropdowns_length
        else:
            # 只计算RAR/前N词平均回撤
            r_cubed = regressed_annual_return / average_top_dropdowns
    else: 
        r_cubed = 0.0

    return r_cubed

def find_periodic_dropdowns(df: DataFrame) -> List[Dict[str, float]]:
    dropdowns = []
    current_dropdown = None

    for index, row in df.iterrows():
        ddpercent = row['ddpercent']

        if not current_dropdown and ddpercent < 0:
            # 新建一个 current_dropdown
            current_dropdown = {
                'start': index,
                'max_ddpercent': ddpercent,
                'end': index,
                'max_ddpercent_length': 1
            }
        elif current_dropdown:
            # 如果balance新高, 意味着本次回撤期结束.
            if ddpercent == 0:
                current_dropdown['end'] = index
                dropdowns.append(current_dropdown)
                current_dropdown = None
            else:
                current_dropdown['end'] = index
                current_dropdown['max_ddpercent_length'] = (current_dropdown["end"] - current_dropdown["start"]).days
                if ddpercent < current_dropdown['max_ddpercent']:
                    current_dropdown['max_ddpercent'] = ddpercent

    return dropdowns

def new_evaluate(
    target_name: str,
    strategy_class: CtaTemplate,
    vt_symbol: str,
    interval: Interval,
    start: datetime,
    rate: float,
    slippage: float,
    size: float,
    pricetick: float,
    capital: int,
    end: datetime,
    mode: BacktestingMode,
    ddpercent_only: bool,
    setting: dict,

) -> tuple:
    """
    Function for running in multiprocessing.pool
    """
    engine: BacktestingEngineNewStatistics = BacktestingEngineNewStatistics()

    engine.set_parameters(
        vt_symbol=vt_symbol,
        interval=interval,
        start=start,
        rate=rate,
        slippage=slippage,
        size=size,
        pricetick=pricetick,
        capital=capital,
        end=end,
        mode=mode
    )
    engine.ddpercent_only = ddpercent_only

    engine.add_strategy(strategy_class, setting)
    engine.load_data()
    engine.run_backtesting()
    engine.calculate_result()
    statistics: dict = engine.calculate_statistics(output=False)

    target_value: float = statistics[target_name]
    return (str(setting), target_value, statistics)

def wrap_evaluate(engine: BacktestingEngine, target_name: str) -> callable:
    """
    Wrap evaluate function with given setting from backtesting engine.
    """
    func: callable = partial(
        new_evaluate,
        target_name,
        engine.strategy_class,
        engine.vt_symbol,
        engine.interval,
        engine.start,
        engine.rate,
        engine.slippage,
        engine.size,
        engine.pricetick,
        engine.capital,
        engine.end,
        engine.mode,
        engine.ddpercent_only
    )
    return func


用户策略怎么可以没有资金参数 ?—— CTA策略账户

1 几乎所有例子的仓位都是self.fixed_size=1

进阶课程里看过陈晓优老师讲的很多策略例子,都是讲如何买和如何卖,如何止盈和止损的,又是仿真、优化、实盘的,看到人是心潮澎湃!
于是看是着手仿照例子编写自己的策略,策略经过计算买卖信号后,总是要下单的,那我下多大仓位呢? 回过头这时候参考例子才发现,几乎所有的仓位都是self.fixed_size=1,就没有讲如何动态决定仓位的例子!
于是VNPY的QQ群里问前辈、先知,无人回答,再在论坛里问老师,终于回答了:”不建议动态仓位,这么重要的事情必须交给手工完成!“,这个答复让我有点懵——做量化交易你让为手动决定仓位???

2 是不建议动态仓位,还是无法动态仓位?

动态仓位需要什么条件:

  • 开仓价格(Price)
  • 可用资金(Money)
  • 合约乘数(Size)
  • 保证金(率)(Margin)
  • 手续费(率)(FeeRate)
  • 开仓的量 (N)
  • 开仓资金(KM)

假设是按手续费率(而不是按手收费),那么开仓资金为:

KM = Price*N*Size*Margin (1+ FeeRate)

那么必须符合添加KM < Money,进而推出

 N < Money/(Price*Size*Margin (1+ FeeRate))。

当然你不可能满仓干,也许还要一个最大开仓资金比例R,例如:

N = int(Money*R/(Price*Size*Margin (1+ FeeRate)))。

当R=40%表示用你账户里40%的资金,可以动态开仓的手数。这样不就可以动态开仓了吗?
当然实际开仓时的仓位计算可能比这复杂多了,比如你可以考虑交易合约的波动水平,需要考虑投资者愿意承担的风险水平等等,但不管怎么变化,策略动态开仓都必须要有如下这几个参数:

  • 开仓价格(Price)
  • 合约乘数(Size)
  • 可用资金(Money)
  • 保证金(率)(Margin)
  • 手续费(率)(FeeRate)

3 用户策略怎么可以与”钱“无关?

经过艰苦和漫长的代码研读和梳理,发现CTA策略交易中只有pos、trading和inited这些策略成员,没有与资金相关的东西。我们来看看这几个动态下单必须具备的参数是否提供了:

  • 开仓价格(Price):策略从合约的行情数据中提取
  • 合约乘数(Size):main_engine中可以使用vt_symbol作为参数提取,函数是main_engine.get_contract()
  • 可用资金(Money):目前有,可以通过 main_engine.get_account()函数提供 但是是当你运行多个策略的时候,这些策略是共用同一个账户的,无法直接使用。
  • 保证金(率)(Margin):目前有,但是这是各个交易所公布的统一的保证金(率),不是你开户的期货各种给你的保证金(率),如rb2010.SHFE的保证金是率8%,可是也许你的开户的期货公司给你的保证金率为却为12%或者15%,怎么用?
  • 手续费(率)(FeeRate):目前没有,而且这个费率回是因人而异的,这样是看你怎么和你的期货经纪人怎么谈判的,所以也没有办法用!

结论:

目前VNPY的CTA策略因为缺少上述几个关键参数,无法实现动态仓位交易。是不能也,而非不可以!

4 为CTA账户引入策略账户!

作为交易赚钱的CTA策略,怎么可以不与这些资金相关的参数打交道?因人而异的保证金和手续费不应该成为不提供这些参数的理由!
当多个策略在同时运行的时候,你的实际账户权益的消长,到底是哪个策略赚的,哪个策略赔的都无法说清楚,运行3天后就已经是一本糊涂账了,这怎么可以!

虽然有上面的困难,但是办法总比困难多!可以参考文华财经8.3或者库安9.0的办法(熟悉文华财经客户端的人应该都知道),它们的方法是用模组账户的方法来为每个用户模组创建一个虚拟的模组账户,很好地解决用户算法对资金、保证金和手续费等参数的设定!

策略账户的功能:

  1. 分配交易的初始资金,还可以做出金入金等虚拟操作,目的就是控制策略的交易规模。
  2. 设置各个合约实际使用的保证金(率)
  3. 设置各个合约可以使用的手续费(率),包括开仓、平仓和平今仓手续费(率)
  4. 可以记录策略产出的委托单
  5. 可以记录策略初始的成交单
  6. 可以为策略提供合约的当前可用资金、保证金(率)和手续费(率)
  7. 算策略的历史交易盈亏和当前交易的浮动盈亏
  8. 提供当前策略账户的权益和可用资金
  9. 提供策略自创建以来的所有历史委托单和成交单查询,解决目前CTA策略之知道最新活动委托单,当日成交单和未平仓的仓位,而不知道历史交易的情况的问题。

如何实现策略账户 ?

策略账户的已经基本上实现了,目前只在测试中,且看我一步一步慢慢为大家分享......

5. 最新进展

有兴趣的可以先看看 策略账户界面展示



CTP接口之保证金率手续费率查询

保证金率和手续费率都是程序化交易者非常关心的话题,保证金率涉及到开仓时账户资金的冻结和占用,撤单或平仓则会解冻和解占用。手续费率则是真正要付给期货公司的佣金,而且通常开仓,平昨和平今手续费还各有不同。
这里面各种各样的坑,比如有人反应期货公司有的按交易所保证金率收,有的按交易所两倍来收,这样同样的钱在有的期货公司够开2$,而在有的就只能开1手;手续费那更是收的五花八门,各种各样的都有。甚至还有人反应出现过实际收的和开户时说的不一样这种事。
其实这两个费率都是可以通过CTP API直接查询得到的,但这两个查询和其他查询(如查询合约,资金,结算单等)有点不一样,所以本篇单独讲一下。

一、 查询保证金率

官方文档中涉及到保证金率的接口好几个,新手开发看到经常一脸懵逼,不知道究竟哪个才是实际中用到的。这里我们先罗列一下相关的:

//请求查询合约保证金率
def ReqQryInstrumentMarginRate(self, 
    pQryInstrumentMarginRate: 'CThostFtdcQryInstrumentMarginRateField', 
    nRequestID: 'int') -> "int"
//查询合约的返回中会带每条合约的保证金率
def ReqQryInstrument(self, 
    pQryInstrument: 'CThostFtdcQryInstrumentField', 
    nRequestID: 'int') -> "int"
//请求查询交易所保证金率
def ReqQryExchangeMarginRate(self, 
    pQryExchangeMarginRate: 'CThostFtdcQryExchangeMarginRateField', 
    nRequestID: 'int') -> "int"
//请求查询交易所调整保证金率
def ReqQryExchangeMarginRateAdjust(self, 
    pQryExchangeMarginRateAdjust: 'CThostFtdcQryExchangeMarginRateAdjustField',     
    nRequestID: 'int') -> "int"

在实际交易中冻结和占用的计算其实是采用第一个查询函数ReqQryInstrumentMarginRate的返回结果。
而ReqQryInstrument中返回的保证金率通常是交易所保证金率,计算中并不用到。ReqQryExchangeMarginRate和ReqQryExchangeMarginRateAdjust查询得到的是交易所相关的保证金率,这也只在计算中间过程中会用到。
这里我们只讲实际计算中用到的ReqQryInstrumentMarginRate查询,其参数类型为CThostFtdcQryInstrumentMarginRateField,具体字段:

01必填字段

BrokerID //经纪公司代码
InvestorID //投资者代码
HedgeFlag //投机套保标志

前两者开户时可得到,投机套保标志一般投资者填投机就可以。注意这三个字段是一定要填的,不填的话返回值就为空。

02选填字段

InstrumentID //合约代码

如果InstrumentID填空,则返回客户当前持仓对应的合约保证金率,否则返回相应InstrumentID的保证金率。也就是说这个查询和其他查询不一样,如果什么都不填,是不会得到所有的合约保证金率的。
最后给出个查询保证金率的示例:

qryinfofield = api.CThostFtdcQryInstrumentMarginRateField()
qryinfofield.BrokerID='9999'
qryinfofield.InvestorID='000001'
qryinfofield.InstrumentID='rb2007'
qryinfofield.HedgeFlag = api.THOST_FTDC_HF_Speculation
tapi.ReqQryInstrumentMarginRate(qryinfofield,0)

二、 查询手续费率

查手续费的接口也有好几个,罗列如下:

//请求查询合约手续费率
def ReqQryInstrumentCommissionRate(self, 
    pQryInstrumentCommissionRate: 'CThostFtdcQryInstrumentCommissionRateField',     
    nRequestID: 'int') -> "int":
//请求查询报单手续费    
Def ReqQryInstrumentOrderCommRate(self, 
    pQryInstrumentOrderCommRate: 'CThostFtdcQryInstrumentOrderCommRateField',
     nRequestID: 'int') -> "int":    
// 请求查询期权或做市商合约手续费函数这里就不罗列了...

在期货交易中,实际计算用到的只有个前面两个函数,其中第一个查询的是合约的手续费率,第二个查询的是报单申报手续费(就是每报撤一次单子都会收一笔费用),目前来说这是中金所特有的。

也就是说中金所的手续费分为两部分,包括交易手续费和申报费两部分,前者可以通过第一个查询函数查到,后者通过第二个查询函数得到。
两个查询函数的参数类型虽然不一样,但是字段填写方法是一致的,这里我们以ReqQryInstrumentCommissionRate为例讲解,参数类型为CThostFtdcQryInstrumentCommissionRateField,具体字段:

01必填字段

BrokerID //经纪公司代码
InvestorID //投资者代码

02选填字段

InstrumentID //合约代码

如果InstrumentID填空,则返回客户当前持仓对应的合约手续费率,否则返回相应InstrumentID的合约手续费率。和上面查询保证金率的逻辑一样。

查询结果返回值还需要注意几点:

  1. 以开仓为例,查询手续费返回结果中有两项关于开仓的字段:OpenRatioByMoney(开仓手续费率)和OpenRatioByVolume(开仓手续费),这是两种计算手续费的方法。实际计算时的开仓手续费公式为:

手续费 = 成交数量*(成交价*合约乘数*OpenRatioByMoney+OpenRatioByVolume)

  1. 有时查询某合约(例如IF2009)的手续费时,得到的是其对应品种(例如IF)的手续费。这是因为该品种下所有合约的手续费都是一样的,所以就直接将手续费设到品种级别了。
  2. 交易所的手续费率这里查不到,大家可以去交易所官网上看,通常在交易数据/结算参数中。要注意的是大商所手续费中有个短线开平仓的概念,其实就是指今开今平。

最后给出个查询手续费率的示例:

qryinfofield = api.CThostFtdcQryInstrumentCommissionRateField()
qryinfofield.BrokerID='9999'
qryinfofield.InvestorID='000001'
qryinfofield.InstrumentID='rb2007'
tapi.ReqQryInstrumentCommissionRate(qryinfofield,0)


【VeighNa社区活动尊享卡】上线!!!社区活动来不了现场的小伙伴看过来!!!

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2024-05-07
 

过去几个月里VeighNa社区活动的举办频率基本保持在了每月至少一场,这里贴一些之前活动场次的照片(可以模糊看出陈总的脸型在变得越来越圆!):

description

12月23日 - 上海

description

1月7日 - 上海

description

1月27日 - 重庆

description

3月2日 - 上海

description

3月16日 - 南京

 

24年计划的社区活动场次较多,每次活动很多小伙伴都打飞的过来,确实比较辛苦,为了方便外地小伙伴参会,决定上线【VeighNa社区活动尊享卡】,权益包括:

  • 任选报名参加12场社区活动(价值1188元,99元/场),包括过往活动视频回看;
  • 报名社区活动参加方式可选择线上直播观看和线下参会且会后提供当场次的永久回看视频
  • 社区活动线下参会专属前排位置,更好的现场交流和学习效果;
  • 专属小助手报名,无需再次填写报名信息。

尊享卡权益不设到期时间,方便大家根据自己的时间安排报名活动。

尊享卡原价999元,早鸟价899元(3月底结束),购买请扫描下方二维码:

description

注意:付款后请扫描二维码添加小助手,后续报名活动时请直接私信联系小助手即可。
 



2024年第6次社区活动 - 【套利价差策略实战】和【Scikit-Learn机器学习CTA信号挖掘】 - 5月25日(深圳)

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2024-05-07
 

好长一段时间没来深圳了,所以本次的深圳社区活动计划将过去半年里大家感兴趣程度较高的两个主题一起分享

第一个主题是套利价差策略实战,价差交易(Spread Trading)也被称为套利交易,是一种利用相关的期货合约或期现品种之间的价差波动来获取利润的交易方法。在价差交易中,交易员不关注某个单一期货合约的价格向哪个方向变动,而是关注相关期货合约之间价格的差值是否在合理的区间范围。

价差交易由于其相对低风险且绩效稳健的特点,在专业交易员领域是一种极为常用的交易方法。结合均值回归类的价差交易策略和趋势跟踪类的CTA策略,可以构建Sharpe Ratio更加优秀的量化投资组合。本次活动中,我们将会分享VeighNa Elite版中价差交易相关功能的使用细节方法,以及一套价差网格策略开发的实践案例。

第二个主题是基于Scikit-Learn的机器学习CTA信号挖掘,机器学习(Machine Learning)各种算法在量化交易领域中的应用越发广泛,但由于目前互联网上的资料质量参差不齐,许多VeighNa社区的同学想要学习尝试但却不知道从何入手。

本次活动中,我们将会由浅入深介绍机器学习技术在CTA量化策略开发中的应用场景,并基于Scikit-Learn这款广受好评的机器学习算法库,给出一套具体的CTA策略信号挖掘实践案例:

 
KBins聚类特征分析

description

 
特征相关性热力图

description

 
向量化回测绩效图

description

活动仅提供线下参会(40人位置),感兴趣的同学请抓紧。活动地址将会通过微信群的方式发送,报名付款后请记得扫码加入社区活动微信群获取参会地址!

 
活动内容大纲:

  1. 价差交易策略开发入门

    a. 寻找合适的价差组合
    b. 价差时序数据建模分析
    c. 基于策略模板快速开发
    d. 价差网格策略案例分享

  2. Spread Trading价差交易模块

    a. 高自由度的价差配置
    b. 价差算法的交易执行
    c. Elite价差算法介绍:Taker、Maker、Exchange

  3. 机器学习在量化中的应用场景

    a. 当今CTA策略开发的核心难点
    b. 常见机器学习算法介绍
    c. 自动化因子挖掘和智能化信号生成

  4. 基于Scikit-Learn的实践案例

    a. 向量化时序特征计算准备
    b. 应用KBins聚类学习算法
    c. 对于无监督学习结果的有监督分析
    d. 构建完整策略进行事件驱动回测

  5. 闭门交流环节

 
时间:5月25日 14:00-17:00

地点:深圳(具体地址后续在微信群中通知)

报名费:99元(Elite会员免费参加)

报名方式:扫描下方二维码报名(报名后请扫码加入社区活动微信群获取参会地址)

description

 


新消息

统计

主题
9318
帖子
35229
已注册用户
47488
最新用户
在线用户
102
在线来宾用户
7801
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】