VeighNa量化社区
你的开源社区量化交易平台

置顶主题

彻底解决K线生成器的问题——一个日内对齐等交易时长的K线生成器

先厘清大思路,后面逐步完成。

1.搞量化交易,一个好用的K线生成器是最基本的要求!

vnpy系统自带了一个BarGenerator,它可以帮助我们生成1分钟,n分钟,n小时,日周期的K线,也叫bar。可是除了1分钟比较完美之外,有很多问题。它在读取历史数据、回测的时候多K线的处理和实盘却有不一样的效果。具体的问题我已经在解决vnpy 2.9.0版本的BarGenerator产生30分钟Bar的错误!这个帖子中做过尝试,但也不是很成功。因为系统的BarGenerator靠时间窗口与1分钟bar的时间分钟关系来决定是否该新建和结束一个bar,这个有问题。于是我改用对1分钟bar进行计数来决定是否该新建和结束一个bar,这也是有不可靠的问题,遇到行情比较清淡的时候,可能有的分钟就没有1分钟bar产生,这是完全有可能的!
K线几乎是绝大部分交易策略分析的基础,除非你从事的是极高频交易,否则你就得用它。可是如果你连生成一个稳健可靠的K线都不能够保证,那么运行在K线基础上的指标及由此产生的交易信号就无从谈起,K线错了,它们就是错误的,以此为依据做出点交易指令有可能是南辕北辙,所以必须解决它!

2.日内对齐等交易时长K线需要什么参数

2.1 日内对齐等交易时长K线是最常用的

K线不是交易所发布的,它有很多种产生机制。其对齐方式、表现形式多种多样。关于K线的分类本人在以往的帖子中做出过比较详细的说明,有兴趣的读者可以去我以往的帖子中查看,这里就不再赘述。
市面上的绝大部分软件如通达信、大智慧、文华财经等软件,除非用户特别设定,他们最常提供给用户的K线多是日内对齐等交易时长K线。常用是一定是有道理的,因为它们已经为广大用户和投资者所接受。

2.2 日内对齐等交易时长K线需要什么参数

1)什么是日内对齐等交易时长K线?
它具有这些特点:以每日开盘为起点,每根K线都包含相同交易时间的数据,跳过中间的休市时间,直至当前交易日的收盘,收盘不足n分钟也就是K线。实盘中,每日的第一个n分钟K线含集合竞价的那个tick数据。
2)为什么这种K线能够被普遍接受?
为它尽可能地保证一个交易日内的所有K线所表达的意义内容上是一致的,它们包含相等的交易时长。这非常重要,因为你把一个5分钟时长的K线与一个30分钟时长的K线放在一起谈论是没有意义的。但是如果为了保证K线在交易时长上的一致性,让n分钟K线跨日的话也是不太合理,因为这跨日,跨周末时间太长,这中间会发生什么意外事情,可能会产生出非常巨大的幅度大K线,掩盖了隔日跳空的行情变化,这对解读行情是不利的。当然n日的K线日跨日的,但是它是n个交易日的K线融合而成的,不过其融合的每个日K线也是对齐各自的日开盘的。
另外日内对齐等交易时长K线还有一个好处,那就是你以任何之前的时间为起点,在读取历史数据重新生成该日的n分钟K线的时候,得到的改日的K线是一致的。举个例子,如果我们的CTA策略在init()中常常是这么一句:

self.load_bar(20)  # 先加载20日历史1分钟数据

这么简单的一句,包含着很多你意识不到的变化——你今天运行策略和明天运行你的策略,其中的历史数据的范围发生了变化,也就是说加载数据的起点变了。如果我们合成的K线的对齐方式不采用日内对齐的话,而采用对齐加载时间起点的话,你今天、明天加载出来之前的某日的K线就可能完全是不同的。而采用日内对齐等交易时长的K线则不存在这个问题。

3)需要知道合约的交易时间段
既然要对齐每日开盘,还有跳过各个休市时间,还要知道收市时间,那么我们就知道生成这种K线必须知道其所表达合约或对象的交易时间段,交易时间段中包含了这些信息,不知道这些信息,BarGenerator就不知道如何生成这种bar。这是必须的!

3. 如何获取合约的交易时间段

3.1 vnpy系统和CTP接口找不到交易时间段信息

目前vnpy系统中的是没有合约的交易时间段的。到哪里获取合约的交易时间段的呢?
1) 它与合约相关,应该到保存合约的数据类ContractData中去找,没有找到。
2) 是否可以提供接口,从交易所获得,这个也是比较基础的数据。于是到CTP接口中(我使用的是CTP接口,您也许不一样) ,在最新版本的CTP接口文档中也没有找到任何与交易时间段相关的信息,绝望!

3.2 有两种方法可以得到交易时间段信息

  1. 米筐数据接口中有,其中有个得到市场中所有合约的函数all_instruments(),它的返回值中就包含交易时间段信息trading_hours,还有另外一些如get_trading_hours()也可以直接获得这些交易时间段信息,好!
  2. 实在没有的话,咱们手工创建一个文件,按照一定格式,把我们需要创建K线的合约准备好交易时间段信息,这也是可行的。

3.3 直接从米筐数据接口获取交易时间段信息的问题

  1. 你必须购买过米筐数,否则无从获得
  2. 直接使用从米筐数据接口获取交易时间段信息,会有效率问题。本人试过,运行get_trading_hours()常会用卡顿,其目前是在65000多个合约中为你寻找一个合约的交易时间段信息,在K线合成这种使用非常频繁的地方,效率是必须的!况且米筐对每个用户的流量也是有限制的,如果超过了也是会限流的!
  3. 对于有些米筐没有的品种难道我们就不能使用K线了吗?

解决方法:

  1. 基于以上这些原因,采用解耦具体数据提供商的方法会更好!把这些信息保存到一个文件或者数据库中,只要您能够办法获得这些信息,按照规定的格式存储,哪怕是手工输入也是可以的。
  2. 新的K线生成器只需要对规定格式的交易时间段信息进行处理,按照一定的规则就可以进行K线生成了!

3.4 从米筐获取交易时间段信息

3.4.1 扩展DataFeed

打开vnpy.trader.datafeed.py文件为Datafeed的基类BaseDatafeed扩展下面的接口

class BaseDatafeed(ABC):
    """
    Abstract datafeed class for connecting to different datafeed.
    """

    def init(self) -> bool:
        """
        Initialize datafeed service connection.
        """
        pass

    def update_all_trading_hours(self) -> bool:     # hxxjava add 
        """ 更新所有合约的交易时间段到trading_hours.json文件中 """
        pass

    def load_all_trading_hours(self) -> dict:       # hxxjava add 
        """ 从trading_hours.json文件中读取所有合约的交易时间段 """
        pass

    def query_bar_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
        """
        Query history bar data.
        """
        pass

    def query_tick_history(self, req: HistoryRequest) -> Optional[List[TickData]]:
        """
        Query history tick data.
        """
        pass

其中的trading_hours.json文件我会在后面的文章中做详细的介绍。有了它我们才能展开其他的设计。

3.4.2 扩展RqdataDataFeed

在vnpy_rqdata\rqdata_datafeed.py中增加下面的代码

  • 引用部分增加:
from datetime import timedelta,date # hxxjava add
  • 在class RqdataDatafeed(BaseDatafeed)中增加下面的代码 :

    def update_all_trading_hours(self) -> bool:     # hxxjava add 
        """ 更新所有合约的交易时间段到trading_hours.json文件中 """

        if not self.inited:
            self.init()

        if not self.inited:
            return False

        ths_dict = load_json(self.trading_hours_file)

        # instruments = all_instruments(type=['Future','Stock','Index','Spot'])

        trade_hours = {}

        for stype in ['Future','Stock','Index','Fund','Spot']:
            instruments = all_instruments(type=[stype])
            # print(f"{stype} instruments count={len(instruments)}")

            for idx,inst in instruments.iterrows():
                # 获取每个最新发布的合约的建议时间段
                if ('trading_hours' not in inst) or not(isinstance(inst.trading_hours,str)):
                    # 跳过没有交易时间段或者交易时间段无效的合约
                    continue

                inst_name = inst.trading_code if stype == 'Future' else inst.order_book_id 
                inst_name = inst_name.upper() 
                if inst_name.find('.') < 0:
                    inst_name += '.' + inst.exchange

                if inst_name not in ths_dict:
                    str_trading_hours = inst.trading_hours

                    # 把'01-'或'31-'者替换成'00-'或'30-'
                    suffix_pair = [('1-','0-'),('6-','5-')]
                    for s1,s2 in suffix_pair:
                        str_trading_hours = str_trading_hours.replace(s1,s2)

                    # 如果原来没有,提取出来
                    trade_hours[inst_name] = {"name": inst.symbol,"trading_hours": str_trading_hours}

        # print(f"trade_hours old count {len(ths_dict)},append count={len(trade_hours)}")
        if trade_hours:
            ths_dict.update(trade_hours)
            save_json(self.trading_hours_file,ths_dict)

        return True

    def load_all_trading_hours(self) -> dict:       # hxxjava add 
        """ 从trading_hours.json文件中读取所有合约的交易时间段 """
        json_file = get_file_path(self.trading_hours_file)
        if not json_file.exists():
            return {}
        else:
            return load_json(self.trading_hours_file)

3.4.3 为main_engine安装一个可以获取交易时间段信息的接口

在vnpy\trader\engine.py中:

  • 该文件的引用部分:
from .datafeed import get_datafeed                  # hxxjava add
  • 为MainEngine类增加下面函数

    def get_trading_hours(self,vt_symbol:str) -> str:   # hxxjava add
        """ get vt_symbol's trading hours """
        ths = self.all_trading_hours.get(vt_symbol.upper(),"")       
        return ths["trading_hours"] if ths else ""

为什么在MainEngine类增加可以获取交易时间段信息的接口?

因为无论你运行vnpy中的哪个app,你都会启动main_engine,无需绕弯子就可以得到这些信息,而我们的用户策略中都包含各自策略的引擎,这样就方便获取交易时间段信息。

如CTA策略中包含cta_engine,而cta_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取您交易品种的交易时间段信息:

       trading_hours = self.cta_engine.main_engine.get_trading_hours(selt.vt_symbol)

如PortFolioStrategy策略中包含strategy_engine,而strategy_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取多个交易品种的交易时间段信息:

       trading_hours_list = [self.cta_engine.main_engine.get_trading_hours(vt_symbol) for vt_symbol in self.vt_symbols]

是不是很方便呢?

3.4.4 在系统的投研中执行更新所有品种(含期货、股票、指数和基金)的交易时间段

vnpy 3.0的启动界面中已经集成了一个叫“投研”的功能,其实它是jupyter lab,启动之后输入下面的代码:

# 测试update_all_trading_hours()函数和load_all_trading_hours()
from vnpy.trader.datafeed import get_datafeed

df = get_datafeed()
df.init()

df.update_all_trading_hours()   # 更新所有合约的交易时间段到本地文件中

ths = df.load_all_trading_hours() # 从本地文件中读取所有合约的交易时间段

当然您可以在vnpy的trader中主界面的菜单中增加一项,方便您在需要的时候执行下面语句。不过这个更新交易时间段的功能并不需要频繁执行,手动也就够了,记得就好。

3.4.5 你还可以手工打开trading_hours.json,直接输入

经过上面步骤3.4.4,您就在本地得到了一个trading_hours.json文件,该文件在您的用户目录下的.vntrader\中,其内容如下:

{
    "A0303.DCE": {
        "name": "豆一0303",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0305.DCE": {
        "name": "豆一0305",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0307.DCE": {
        "name": "豆一0307",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0309.DCE": {
        "name": "豆一0309",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0311.DCE": {
        "name": "豆一0311",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0401.DCE": {
        "name": "豆一0401",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
   ... ...
}

观察其格式,在你没有米筐数据接口或者这里没有的合约,您也可以手动输入合约交易时间段信息。

按照程序中算法,这个文件文件中一共包含约16500多个合约的交易时间段信息。可以覆盖国内金融市场几乎全部都产品,但是不包括金融二次衍生品期权。
为什么没有期权交易时间段信息,因为不需要。期权合约有其对应的标的物,从其名称和编号就可以解析出来。期权合约的交易时间段其和标的物的交易时间段是完全相同的,因此不需要保存到该文件中。



【Veighna量化策略实验室】RSJ高频波动率择时指标 - 3 - 信号组合

发布于vn.py社区公众号【vnpy-community】
 
原文作者:Bili | 发布时间:2021-09-07
 

description

 

前情回顾

 

在前面的文章当中,我们对RSJ指标有了初步的了解,并且在RSJ指标样本内外实验中也取得一个不错的回测结果。接下来,我们将继续根据陶勤英博士的财通证券研究所的研报中所指出的将RSJ指标与其他技术指标组合使用来进行进一步的实验研究观察回测后的效果。

 

测试原理

 

首先,我们根据文章选取了另外三个技术指标,分别是DMI指标、RSI指标和ROC指标分别与RSJ指标进行叠加使用进行回测。所以接下来我们需要了解各个指标的基本原理以及他们的信号逻辑:

 

DMI指标

 

基本原理:DMI指标基本原理是在于寻找股票价格涨跌过程中,股价藉以创新高价或新低价的功能,研判多空力量,进而寻求买卖双方的均衡点及股价在双方互动下波动的循环过程。与大多数技术指标相比DMI指标把每日的高低波动的幅度因素计算在内,从而更加准确的反应行情的走势及更好的预测行情未来的发展变化。

信号逻辑:趋向技术指标 DMI 包括多空指标 PDI、MDI 以及趋向指标 ADX。我们针对 DMI 指标建立相应的择时策略,即若昨日的 ADX 大于前一日的 ADX,则今日在收盘的时候看多;若昨日的 ADX、PDI、MDI 均小于前一日的相对应的指标值,则今日在收盘的时候看空(其中,ADX 值若不在[20,60]之内,则不进行交易)。

 

RSI指标

 

基本原理:RSI的原理简单来说是以数字计算的方法求出买卖双方的力量对比,譬如有100个人面对一-件商品, 如果50个人以上要买,竞相抬价,商品价格必涨。相反,如果50个人以上争着卖出,价格自然下跌。

信号逻辑:对于相对强弱指标 RSI,两种力量的对比决定了个股及大盘所处的状态:强势或弱势。从 RSI 值的变动范围来看:当 20 < RSI <50 时,弱势,卖出,空仓;当 50 < RSI <80 时,强势,买入,持仓。因此,若昨日的 RSI 值在(50,80]内,则今日在收盘的时候看多;若昨日的RSI 值在[20,50)内,则今日在收盘的时候看空。

 

ROC指标

 

基本原理:变动率指标(ROC),是以当日的收盘价和N天前的收盘价比较,通过计算股价某一段时间内收盘价变动的比例,应用价格的移动比较来测量价位动量,达到事先探测股价买卖供需力量的强弱,进而分析股价的趋势及其是否有转势的意愿,属于反趋势指标之一。

信号逻辑:ROC指标测量股价动量,可以用来监视常态性和极端性两种行情,对买卖股票提供强有力的参考。ROC 变大,代表未来市场仍会在短时间内上涨,反之则代表市场可能下跌。因此,若昨日的 ROC 大于前一日的 ROC,则今日在收盘的时候看多;反之,发出看空信号。

 

代码分析

 

接下来,我们将正式进入我们的实验阶段。为了避免文章内容的重复,我们仅以RSJ+DMI为例,为大家进行代码分析:

在本策略当中,我们需要使用到两种K线——5分钟K线和日K线,所以我们还需创建日K线生成器DailyBarGenerator:

class DailyBarGenerator:

    def __init__(self, on_daily_bar: Callable) -> None:
        """日K线合成器"""        
        self.on_daily_bar: Callable = on_daily_bar        
        self.daily_bar: BarData = None        
        self.last_bar: BarData = None

接下来构造update_bar函数用于更新K线到容器中,缓存K线本身的OHLCV数据:

 def update_bar(self, bar: BarData) -> None:        
     """更新分钟K线"""        
     if not self.daily_bar:            
         dt = bar.datetime.replace(                
             hour=0,                
             minute=0,                
             second=0,                
             microsecond=0
        )

        self.daily_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price
        )        
    # Otherwise, update high/low price into window bar        
    else:            
        self.daily_bar.high_price = max(
            self.daily_bar.high_price,
            bar.high_price
        )            
        self.daily_bar.low_price = min(
            self.daily_bar.low_price,
            bar.low_price
        )

        # Update close price/volume/turnover into window bar   
        self.daily_bar.close_price = bar.close_price      
        self.daily_bar.volume += bar.volume  
        self.daily_bar.open_interest = bar.open_interest

        # Check if window bar completed       
        if (    
            self.last_bar       
            and self.last_bar.datetime.date() != bar.datetime.date()        
        ):            
            self.on_daily_bar(self.daily_bar)       
            self.daily_bar = None

        # Cache last bar object      
        self.last_bar = bar

交易信号执行:

5分钟K线用于计算RSJ指标,我们需要先使用BarGenerator将1分钟K线合成为5分钟K线,日K线用于计算DMI指标:

def __init__(self, cta_engine, strategy_name, vt_symbol, setting):  
    """"""        
    super().__init__(cta_engine, strategy_name, vt_symbol, setting)

    self.bg = BarGenerator(self.on_bar, 5, self.on_5min_bar)    
    self.am = NewArrayManager()

    self.daily_bg = DailyBarGenerator(self.on_daily_bar)   
    self.daily_am = NewArrayManager()

计算技术指标:

分别将RSJ、ADX、PDI以及MDI指标计算出来:

    # 计算技术指标        
    self.rsj_value = self.am.rsj(self.rsj_window)

    adx_array = self.daily_am.adx(self.dmi_window, array=True)   
    pdi_array = self.daily_am.plus_di(self.dmi_window, array=True)      
    mdi_array = self.daily_am.minus_di(self.dmi_window, array=True)

    adx_value = adx_array[-1]

判断交易信号:

当 RSJ 和 DMI指标同时发出看多信号,则在当日收盘的时候发出看多信号;若两个指标同时发出看空信号,则在当日收盘的时候发出看空信号;否则,不发出任何信号。

    # 判断交易信号      
    target_pos = 0

    if bar.datetime.time() == time(14, 50):  
        # 满足开仓交易的ADX要求           
        if 20 <= adx_value <= 60:   
            # 做多条件               
            if (                   
                self.rsj_value < 0 
                and adx_array[-1] > adx_array[-2]         
            ):                 
                target_pos = 1           
            # 做空条件           
            elif (             
                self.rsj_value > 0   
                and adx_array[-1] < adx_array[-2]     
                and pdi_array[-1] < pdi_array[-2] 
                and mdi_array[-1] < mdi_array[-2] 
            ):                   
                target_pos = -1

交易逻辑:

基于目标仓位和实际仓位,决定如何交易:

            trade_volume = target_pos - self.pos
            if trade_volume > 0:    
                 if self.pos >= 0:      
                     self.buy(bar.close_price + 5, trade_volume)    
                 else:     
                    self.cover(bar.close_price + 5, trade_volume)           
             elif trade_volume < 0: 
                if self.pos <= 0:   
                    self.short(bar.close_price - 5, abs(trade_volume))     
                else:       
                    self.sell(bar.close_price - 5, abs(trade_volume))

tips:

由于策略中涉及到多个技术指标,代码变得较为复杂。所以在这时将计算技术指标、判断交易信号和交易逻辑三个部分分开来写,会让我们的代码变得更加清晰从而增加代码的可读性。

 

回测结果

 

我们用 2017 年 7 月 23 日-2020 年 7 月 22 日的数据,对该日内指标进行回测。指标当日发出信号后,分别于当日收盘分别对上证 50、沪深 300、中证 500指数进行操作。

实验一:RSJ + DMI

其中 DMI 组合指标时间参数为14。

  • 本地代码:IF888.CFFEX、IH888.CFFEX、IC888.CFFEX
  • K线周期:1分钟
  • 开始日期:2017-7-23
  • 结束日期:2020-7-22
  • 手续费率:0.00003
  • 交易滑点:0.4
  • 合约乘数:300
  • 价格跳动:0.2
  • 回测资金:100W

description

资金曲线的形状和财通研报中的结果基本一致 ,可以认为比较正确的复现了策略逻辑:

description

实验二:RSJ + RSI

其中 RSI 指标的时间参数为 6。

  • 本地代码:IF888.CFFEX、IH888.CFFEX、IC888.CFFEX

  • K线周期:1分钟

  • 开始日期:2017-7-23

  • 结束日期:2020-7-22

  • 手续费率:0.00003

  • 交易滑点:0.4

  • 合约乘数:300

  • 价格跳动:0.2

  • 回测资金:100W

description

资金曲线的形状和财通研报中的结果基本一致 ,可以认为比较正确的复现了策略逻辑:

description

实验三:RSJ + ROC

其中 ROC 指标参数为 12。

  • 本地代码:IF888.CFFEX、IH888.CFFEX、IC888.CFFEX

  • K线周期:1分钟

  • 开始日期:2017-7-23

  • 结束日期:2020-7-22

  • 手续费率:0.00003

  • 交易滑点:0.4

  • 合约乘数:300

  • 价格跳动:0.2

  • 回测资金:100W

description

资金曲线的形状和财通研报中的结果基本一致 ,可以认为比较正确的复现了策略逻辑:

description

结论:

RSJ指标分别与DMI、RSI和ROC指标组合使用的回测效果整体上资金曲线都是呈上扬趋势,但是对于部分品种来说资金曲线不太稳键,还有较大的改进提升空间。

 

代码获取

 

本文中提到的三个策略代码较长,就不在这里贴出了。请关注本公众号(vnpy-community)后,回复内容【RSJ】获取源代码下载链接。
 



彻底解决tick数据的过滤问题

1. 问题的由来

  • 在文章 发现了vnpy的BarGenerator两个隐藏很深的错误 !中我就已经分析过tick数据对bar生成器的影响。
  • 当前vnpy系统对集合竞价tick与其他tick没有区分能力
  • 当前vnpy系统没有充分利用行情接口提供的状态信息,无法识别有效tick与无效tick,一股脑地发送到策略和应用中,导致bar合成的错误。

2. 问题的解决方法

在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。

tick数据过滤器的功能:

  1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用,每个网关对应一个ick数据过滤;
    要做到这一条,就必须做到对所有已经订阅过的合约的tick的缓存,否则你再次重启系统的时候是无法知道你收到第一个tick是否已经参与过之前bar的合成了。这样你可能重复使用该tick,这是错误的。
    为此我们需要将所有已经订阅过的合约的最新tick进行实时更新,并定期做持久化保存,且在每次系统启动的时候读取加载到系统中。
  2. 过滤无效tick,转发有效交易状态下的tick到系统中,不在有效交易状态下tick做丢弃处理,有效交易状态包括:集合竞价状态和连续竞价状态;
    CTP系统的行情接口中包含的实时更新的合约交易状态通知推送接口,OnRtnInstrumentStatus()。关于这个问题我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中做了详细的介绍,再次就不赘述。总之合约交易状态通知可以让我识别一个tick是否是有些大tick。
  3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持。
    合约交易状态通知可以让我们知道那些tick是tick,同时可以可以让我们区分那个tick是集合竞价tick,那些是连续竞价tick。对有效tick进性分析利用于我们策略或者应用生成出正确的bar。
  4. 本文只对CtpGateway,CtaEngine、CtaTemplate进行了更改,其他网关系统的道理都是相同的。如果您觉得对您有启发,也可以按同样的方法修改。

3. 过滤无效tick数据的实现代码

声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。

3.1 相关数据类型定义

4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:

class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"


# 有效交易状态
VALID_TRADE_STATUSES = [
    InstrumentStatus.CONTINOUS,
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]

# 集合竞价交易状态
AUCTION_STATUS = [
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]


class StatusEnterReason(Enum):
    """
    品种进入交易状态原因类型 hxxjava debug
    """
    AUTOMATIC = "自动切换"
    MANUAL = "手动切换"
    FUSE = "熔断"

在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:

@dataclass
class StatusData(BaseData):
    """
    hxxjava debug
    """
    symbol:str       
    exchange : Exchange    
    settlement_group_id : str = ""  
    instrument_status : InstrumentStatus = None   
    trading_segment_sn : int = None 
    enter_time : str = ""      
    enter_reason : str = ""  
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def belongs_to(self,vt_symbol:str):
        symbol,exchange_str = vt_symbol.split(".")
        instrument = left_alphas(symbol).upper()
        return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)

3.2 相关消息定义

在vnpy\trader\event.py中增加交易状态消息类型

EVENT_STATUS = "eStatus"                        # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick."            # hxxjava debug

3.3 Gateway的修改

在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:

引用部分增加:

from .event import EVENT_ORIGIN_TICK,EVENT_STATUS         # hxxjava add
from .object import StatusData    # hxxjava add

修改class gateway的on_tick()接口,增加on_status()接口:

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_ORIGIN_TICK, tick)  # hxxjava add             
        # self.on_event(EVENT_TICK, tick)                   
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)  

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)
        self.on_event(EVENT_STATUS + status.vt_symbol, status)

3.4 CtpGateway的修改

修改vnpy_cpt\ctp_gateway.py:

增加引用部分

from vnpy.trader.constant import InstrumentStatus,StatusEnterReason  # hxxjava debug
rom vnpy.trader.object import StatusData,     # hxxjava debug

增加几个映射字典:

# 品种状态进入原因映射  hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
    "0": InstrumentStatus.BEFORE_TRADING,
    "1": InstrumentStatus.NO_TRADING,
    "2": InstrumentStatus.CONTINOUS,
    "3": InstrumentStatus.AUCTION_ORDERING,
    "4": InstrumentStatus.AUCTION_BALANCE,
    "5": InstrumentStatus.AUCTION_MATCH,
    "6": InstrumentStatus.CLOSE,
    "7": InstrumentStatus.CLOSE
}


# 品种状态进入原因映射  hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
    "1": StatusEnterReason.AUTOMATIC,
    "2": StatusEnterReason.MANUAL,
    "3": StatusEnterReason.FUSE
}

为class CtpTdApi增加下面合约状态推送接口:

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约品种状态信息 # hxxjava debug 
        """
        if data:
            # print(f"【data={data}】")
            status =  StatusData(
                symbol = data["InstrumentID"],
                exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
                settlement_group_id = data["SettlementGroupID"],
                instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
                trading_segment_sn = data["TradingSegmentSN"],
                enter_time = data["EnterTime"],
                enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
                exchange_inst_id = data["ExchangeInstID"],
                gateway_name=self.gateway_name
            )
            # print(f"status={status}")
            self.gateway.on_status(status)

3.5 对CtaEngine的进行扩展

增加引用部分

from vnpy.trader.event import EVENT_AUCTION_TICK  # hxxjava add

增加一个对CtaEgine的扩展MyCtaEngine

class MyCtaEngine(CtaEngine):
    """  """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 """

        tick:TickData = event.data
        strategies = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

把vnpy_ctastrategy目录下 的__init__.py中的CtaStrategyApp做如下修改

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

3.6 CtaTemplate的修改

修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

3.7 为数据库增加最新Tick保存函数

3.7.1 修改vnpy\trader\database.py

为class BaseDatabase增加下面两个接口函数:

    @abstractmethod
    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        pass

    @abstractmethod
    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        pass

3.7.2 修改vnpy_mysql\mysql_database.py

class MyDateTimeField(DateTimeField):
    def get_modifiers(self):
        return [6]

class DbLastTick(Model):    # hxxjava add
    """ 最新TICK数据表映射对象 """

    id = AutoField()

    gateway_name: str = CharField()

    symbol: str = CharField()
    exchange: str = CharField()
    datetime: datetime = MyDateTimeField()

    name: str = CharField()
    volume: float = FloatField()
    turnover: float = FloatField()
    open_interest: float = FloatField()
    last_price: float = FloatField()
    last_volume: float = FloatField()
    limit_up: float = FloatField()
    limit_down: float = FloatField()

    open_price: float = FloatField()
    high_price: float = FloatField()
    low_price: float = FloatField()
    pre_close: float = FloatField()

    bid_price_1: float = FloatField()
    bid_price_2: float = FloatField(null=True)
    bid_price_3: float = FloatField(null=True)
    bid_price_4: float = FloatField(null=True)
    bid_price_5: float = FloatField(null=True)

    ask_price_1: float = FloatField()
    ask_price_2: float = FloatField(null=True)
    ask_price_3: float = FloatField(null=True)
    ask_price_4: float = FloatField(null=True)
    ask_price_5: float = FloatField(null=True)

    bid_volume_1: float = FloatField()
    bid_volume_2: float = FloatField(null=True)
    bid_volume_3: float = FloatField(null=True)
    bid_volume_4: float = FloatField(null=True)
    bid_volume_5: float = FloatField(null=True)

    ask_volume_1: float = FloatField()
    ask_volume_2: float = FloatField(null=True)
    ask_volume_3: float = FloatField(null=True)
    ask_volume_4: float = FloatField(null=True)
    ask_volume_5: float = FloatField(null=True)

    localtime: datetime = DateTimeField(null=True)

    class Meta:
        database = db
        indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)

class MysqlDatabase的初始化做如下修改:

    def __init__(self) -> None:
        """"""
        self.db = db
        self.db.connect()
        self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview])   # hxxjava add DbLastTick,DbContractData

再为class MysqlDatabase添加下面两个函数:

    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        vt_symbols = [t.vt_symbol for t in ticks]

        # 删除ticks列表中包含合约的旧的tick记录
        d: ModelDelete = DbLastTick.delete().where(
            (DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
        )
        count = d.execute()
        # print(f"delete {count} last ticks")

        # 构造最新的ticks列表数据
        data = []
        for t in ticks:
            tick:TickData = deepcopy(t)     # hxxjava change
            tick.datetime = tick.datetime

            d = tick.__dict__
            d["exchange"] = d["exchange"].value
            d.pop("vt_symbol")
            data.append(d)
            # print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))

        # 使用upsert操作将数据更新到数据库中
        with self.db.atomic():
            for c in chunked(data, 50):
                DbLastTick.insert_many(c).on_conflict_replace().execute()

        return True

    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        try:
            # 从DbLastTick查询符合条件的最新tick记录
            s: ModelSelect = (
                DbLastTick.select().where(
                    (DbLastTick.gateway_name == gateway_name)
                    & (exchange is None or DbLastTick.exchange == exchange.value)
                    & (symbol is None or DbLastTick.symbol == symbol)
                ).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
            )

            # 利用最新tick记录构造ticks列表
            ticks: List[TickData] = []
            for db_tick in s:
                tick:TickData = TickData(
                    symbol=db_tick.symbol,
                    exchange=Exchange(db_tick.exchange),
                    datetime=to_china_tz(db_tick.datetime),
                    name=db_tick.name,
                    volume=db_tick.volume,
                    turnover=db_tick.turnover,
                    open_interest=db_tick.open_interest,
                    last_price=db_tick.last_price,
                    last_volume=db_tick.last_volume,
                    limit_up=db_tick.limit_up,
                    limit_down=db_tick.limit_down,
                    open_price=db_tick.open_price,
                    high_price=db_tick.high_price,
                    low_price=db_tick.low_price,
                    pre_close=db_tick.pre_close,
                    bid_price_1=db_tick.bid_price_1,
                    bid_price_2=db_tick.bid_price_2,
                    bid_price_3=db_tick.bid_price_3,
                    bid_price_4=db_tick.bid_price_4,
                    bid_price_5=db_tick.bid_price_5,
                    ask_price_1=db_tick.ask_price_1,
                    ask_price_2=db_tick.ask_price_2,
                    ask_price_3=db_tick.ask_price_3,
                    ask_price_4=db_tick.ask_price_4,
                    ask_price_5=db_tick.ask_price_5,
                    bid_volume_1=db_tick.bid_volume_1,
                    bid_volume_2=db_tick.bid_volume_2,
                    bid_volume_3=db_tick.bid_volume_3,
                    bid_volume_4=db_tick.bid_volume_4,
                    bid_volume_5=db_tick.bid_volume_5,
                    ask_volume_1=db_tick.ask_volume_1,
                    ask_volume_2=db_tick.ask_volume_2,
                    ask_volume_3=db_tick.ask_volume_3,
                    ask_volume_4=db_tick.ask_volume_4,
                    ask_volume_5=db_tick.ask_volume_5,
                    localtime=db_tick.localtime,
                    gateway_name=db_tick.gateway_name
                )
                ticks.append(tick)

            return ticks

        except:
            # 当DbLastTick表不存在的时候,会发生错误
            return []

3.8 tick数据过滤器的实现

在vnpy.usertools下创建tickfilter.py文件,其内容如下:

"""
本文件主要实现tick数据过滤器——TickFilter。

tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持

作者:hxxjava
日期:2022-06-16
修改日期:              修改原因:  
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
    EVENT_ORIGIN_TICK,
    EVENT_AUCTION_TICK,
    EVENT_TICK,
    EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol


def left_alphas(instr:str):
    """
    得到字符串左边的字符部分
    """
    ret_str = ''
    for s in instr:
        if s.isalpha():
            ret_str += s
        else:
            break
    return ret_str

def get_vt_instrument(vt_symbol:str):
    """
    从完整合约代码转换到完整品种代码
    """    
    symbol,exchange = extract_vt_symbol(vt_symbol)
    instrument = left_alphas(symbol)
    return f"{instrument}.{exchange.value}"


class TickFilter():
    """ tick数据过滤器 """
    CHECK_INTERVAL:int = 5  # 更新到数据库间隔

    def __init__(self,event_engine:EventEngine,gateway_name:str):
        """ tick数据过滤器初始化 """
        self.event_engine = event_engine
        self.gateway_name = gateway_name
        self.db = get_database()

        # 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
        self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}

        # 品种及合约状态字典 { vt_symbol : StatusData }
        self.statuses:Dict[str,StatusData] = {}
        self.second_cnt = 0

        self.load_last_ticks()
        self.register_event()

        # print(f"TickFilter {gateway_name}")

    def load_last_ticks(self):
        """ 
        加载属于网关名称为self.gateway_name的最新tick列表 
        """
        last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
        for tick in last_ticks:
            self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)

        # print(f"load {len(last_ticks)} last ticks")

    def register_event(self):
        """ 注册消息 """
        self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_STATUS,self.process_status_event)
        self.event_engine.register(EVENT_TIMER,self.check_last_ticks)        

    def process_tick_event(self,event:Event):
        """ 对原始tick进行过滤 """
        tick:TickData = event.data

        # 检查tick合约的经验状态是否位有效交易状态
        status:StatusData = self.statuses.get(tick.vt_symbol,None)
        if not status:
            vt_instrument = get_vt_instrument(tick.vt_symbol)
            status = self.statuses.get(vt_instrument,None)
            if not status:
                # 未收到交易状态,返回
                return

        if status.instrument_status not in VALID_TRADE_STATUSES:
            # 不在有效交易状态,返回
            return

        key = (tick.gateway_name,tick.vt_symbol)
        _,oldtick = self.last_ticks.get(key,(None,None))
        valid_tick = False
        if not oldtick:
            # 没有该合约的历史tick
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        elif tick.datetime > oldtick.datetime:
            # 
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        else:
            print(f"【特别tick = {tick}】")

        if valid_tick == True:
            # 如果是有效的tick
            if status.instrument_status != InstrumentStatus.CONTINOUS:
                # 发送集合竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
                self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))  

            else:
                # 发送连续竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_TICK,tick))
                self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))  

    def process_status_event(self, event: Event):  
        """ 交易状态通知消息处理 """
        status:StatusData = event.data
        self.statuses[status.vt_symbol] = status

        # print(f"【{status.gateway_name} {status}】")

    def check_last_ticks(self,event:Event) -> None:
        """ 原始tick过滤器 """
        self.second_cnt += 1
        if self.second_cnt % self.CHECK_INTERVAL == 0:
            # 如果到了定时间隔

            # 查询所有更新的tick
            changed_ticks = [] 

            for key,(update,tick) in self.last_ticks.items():
                if update:
                    changed_ticks.append(tick)
                    self.last_ticks[key] = (False,tick)

            if changed_ticks:
                # 如果存在更新的tick,保存到数据库
                t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
                t.start()
                # print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")

3.9 把tick数据过滤器安装到主引擎MainEngine上去

修改vnpy\trader\engine.py

添加引用部分

from vnpy.usertools.tickfilter import TickFilter    # hxxjava add

修改MainEngine的

在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:

self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add

修改其add_gateway(),内容如下:

    def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
        """
        Add gateway.
        """
        # Use default name if gateway_name not passed
        if not gateway_name:
            gateway_name = gateway_class.default_name

        gateway = gateway_class(self.event_engine, gateway_name)
        self.gateways[gateway_name] = gateway

        # Add gateway supported exchanges into engine
        for exchange in gateway.exchanges:
            if exchange not in self.exchanges:
                self.exchanges.append(exchange)

        # add a tick data filter for the gateway    #  hxxjava add  
        if gateway_name not in self.tick_filters:
            self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)

        return gateway

4. 经过上面的一系列修改,你获得了哪些好处?

  • 你的策略再也不会收到重复数据和垃圾数据
  • 此以后你的CTA策略中必须加入一个on_auction_tick()接口函数,用来接受每个交易日集合竞价所产生的tick。如何使用这个tick你有你的方法。
  • 在合成K线的时候你才可能构成正确的K线,比如BarGenerator对跨日tick时间戳的处理错误问题,在此也会迎刃而解。

4.1 现在来梳理下我们都干了哪些事情

  1. 在CtpGateway中引入了合约交易状态,这可以用来过滤无效数据,同时还能够识别集合竞价tick。
  2. 在database中增加了最新tick持久化保存,这为新的tick是否是重复的判断提供支持。
  3. 提供有效tick的分类,在CTA策略的模板中增加on_auction_tick()接口使得BarGenerator正确处1分钟bar的成交量和成交额成为可能。

4.2 非CTP网关使用者是否也可以这样做?

只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。

5. 解决BarGenerator统计bar成交量和成交额错误的方法

5.1 这是对BarGenerator做出点修改,

  • 修改BarGenerator的初始化函数
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ):
        """ Constructor """      

        ... ...  # 其他代码省略

        self.auction_tick:TickData = None
        self.last_tick: TickData = None
  • 增加BarGenerator的集合竞价tick处理函数
    def update_auction_tick(self,tick:TickData):
        """ 更新集合竞价tick """
        self.auction_tick = tick
  • 修改BarGenerator的1分钟bar合成函数
    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        if self.auction_tick:
            # 合约集合竞价tick到当前tick
            tick.high_price = max(tick.high_price,self.auction_tick.high_price)
            tick.low_price = min(tick.low_price,self.auction_tick.low_price)

            # 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
            self.last_tick = deepcopy(self.auction_tick)
            # 成交量和成交额每天从0开始单调递增
            self.last_tick.volume = 0.0   
            self.last_tick.turnover = 0.0

            # 用完集合竞价tick就丢弃
            self.auction_tick = None

      ... ...  # 其他代码省略

5.2 您的策略关于集合竞价tick更新的回调函数:

    def on_auction_tick(self, tick: TickData):
        """
        集合竞价tick处理
        """
        self.bg.update_auction_tick(tick)    # 假设self.bg是已经创建过的bar生成器

两点说明:

  1. 如果你在阅读本文的时候觉得有点一头雾水,可以搜索'hxxjava'字符串,将会显示大部分修改的代码,仔细揣摩下,就会知道我做了什么了!
  2. 另外本贴中还有一部分涉及到条件单的代码,如果出现错误,可以查找我的关于条件单的帖子比停止单更好用的条件单——ConditionOrder,这里就不再重复贴出那部分代码了。


查询仓位,持仓均价,未成交委托单一个函数搞定

1.首先完善converter.py

class PositionHolding:
    """"""

    def __init__(self, contract: ContractData = None):
        """"""
        if contract:
            self.vt_symbol = contract.vt_symbol
            self.exchange = contract.exchange

        self.active_orders = {}
        self.order_id = ""
        self.long_pos = 0
        self.long_pnl = 0
        self.long_price = 0
        self.long_yd = 0
        self.long_td = 0
        self.short_pos = 0
        self.short_pnl = 0
        self.short_price = 0        
        self.short_yd = 0
        self.short_td = 0
        self.long_pos_frozen = 0
        self.long_yd_frozen = 0
        self.long_td_frozen = 0

        self.short_pos_frozen = 0
        self.short_yd_frozen = 0
        self.short_td_frozen = 0

    def update_position(self, position: PositionData):
        """"""
        if position.direction == Direction.LONG:
            self.long_pos = position.volume
            self.long_pnl = position.pnl
            self.long_yd = position.yd_volume
            self.long_td = self.long_pos - self.long_yd
            self.long_price = position.price
            self.long_pos_frozen = position.frozen
        else:
            self.short_pos = position.volume
            self.short_pnl = position.pnl            
            self.short_yd = position.yd_volume
            self.short_td = self.short_pos - self.short_yd
            self.short_price = position.price
            self.short_pos_frozen = position.frozen
    def update_order(self, order: OrderData):
        """"""
        #active_orders只记录未成交和部分成交委托单
        if order.status in [Status.NOTTRADED, Status.PARTTRADED]:
            self.active_orders[order.vt_orderid] = order
        else:
            if order.vt_orderid in self.active_orders:
                self.active_orders.pop(order.vt_orderid)

        self.calculate_frozen()

    def update_order_request(self, req: OrderRequest, vt_orderid: str):
        """"""
        #分离gateway_name和orderid
        gateway_name,*split_orderid = vt_orderid.split("_")
        if len(split_orderid) == 1:
            self.order_id = split_orderid[0]
        elif len(split_orderid) == 2:
            self.order_id = "_".join([split_orderid[0],split_orderid[1]])
        elif len(split_orderid) == 3:
            self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2]])
        elif len(split_orderid) == 4:
            self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2],split_orderid[3]])
        if self.order_id:
            order = req.create_order_data(self.order_id, gateway_name)
            self.update_order(order)

    def update_trade(self, trade: TradeData):
        """"""
        if trade.direction == Direction.LONG:
            if trade.offset == Offset.OPEN:
                self.long_td += trade.volume
            elif trade.offset == Offset.CLOSETODAY:
                self.short_td -= trade.volume
            elif trade.offset == Offset.CLOSEYESTERDAY:
                self.short_yd -= trade.volume
            elif trade.offset == Offset.CLOSE:
                if trade.exchange in [Exchange.SHFE, Exchange.INE]:
                    self.short_yd -= trade.volume
                else:
                    self.short_td -= trade.volume

                    if self.short_td < 0:
                        self.short_yd += self.short_td
                        self.short_td = 0
        else:
            if trade.offset == Offset.OPEN:
                self.short_td += trade.volume
            elif trade.offset == Offset.CLOSETODAY:
                self.long_td -= trade.volume
            elif trade.offset == Offset.CLOSEYESTERDAY:
                self.long_yd -= trade.volume
            elif trade.offset == Offset.CLOSE:
                if trade.exchange in [Exchange.SHFE, Exchange.INE]:
                    self.long_yd -= trade.volume
                else:
                    self.long_td -= trade.volume

                    if self.long_td < 0:
                        self.long_yd += self.long_td
                        self.long_td = 0

        self.long_pos = self.long_td + self.long_yd
        self.short_pos = self.short_td + self.short_yd

    def calculate_frozen(self):
        """"""
        self.long_pos_frozen = 0
        self.long_yd_frozen = 0
        self.long_td_frozen = 0

        self.short_pos_frozen = 0
        self.short_yd_frozen = 0
        self.short_td_frozen = 0

        for order in self.active_orders.values():
            # Ignore position open orders
            if order.offset == Offset.OPEN:
                continue

            frozen = order.volume - order.traded

            if order.direction == Direction.LONG:
                if order.offset == Offset.CLOSETODAY:
                    self.short_td_frozen += frozen
                elif order.offset == Offset.CLOSEYESTERDAY:
                    self.short_yd_frozen += frozen
                elif order.offset == Offset.CLOSE:
                    self.short_td_frozen += frozen

                    if self.short_td_frozen > self.short_td:
                        self.short_yd_frozen += (
                            self.short_td_frozen - self.short_td)
                        self.short_td_frozen = self.short_td
            elif order.direction == Direction.SHORT:
                if order.offset == Offset.CLOSETODAY:
                    self.long_td_frozen += frozen
                elif order.offset == Offset.CLOSEYESTERDAY:
                    self.long_yd_frozen += frozen
                elif order.offset == Offset.CLOSE:
                    self.long_td_frozen += frozen

                    if self.long_td_frozen > self.long_td:
                        self.long_yd_frozen += (
                            self.long_td_frozen - self.long_td)
                        self.long_td_frozen = self.long_td

            self.long_pos_frozen = self.long_td_frozen + self.long_yd_frozen
            self.short_pos_frozen = self.short_td_frozen + self.short_yd_frozen

    def convert_order_request_shfe(self, req: OrderRequest):
        """"""
        if req.offset == Offset.OPEN:
            return [req]

        if req.direction == Direction.LONG:
            pos_available = self.short_pos - self.short_pos_frozen
            td_available = self.short_td - self.short_td_frozen
        else:
            pos_available = self.long_pos - self.long_pos_frozen
            td_available = self.long_td - self.long_td_frozen

        if req.volume > pos_available:
            return []
        elif req.volume <= td_available:
            req_td = copy(req)
            req_td.offset = Offset.CLOSETODAY
            return [req_td]
        else:
            req_list = []

            if td_available > 0:
                req_td = copy(req)
                req_td.offset = Offset.CLOSETODAY
                req_td.volume = td_available
                req_list.append(req_td)

            req_yd = copy(req)
            req_yd.offset = Offset.CLOSEYESTERDAY
            req_yd.volume = req.volume - td_available
            req_list.append(req_yd)

            return req_list

    def convert_order_request_lock(self, req: OrderRequest):
        """"""
        if req.direction == Direction.LONG:
            td_volume = self.short_td
            yd_available = self.short_yd - self.short_yd_frozen
        else:
            td_volume = self.long_td
            yd_available = self.long_yd - self.long_yd_frozen

        # If there is td_volume, we can only lock position
        if td_volume:
            req_open = copy(req)
            req_open.offset = Offset.OPEN
            return [req_open]
        # If no td_volume, we close opposite yd position first
        # then open new position
        else:
            open_volume = max(0, req.volume - yd_available)
            req_list = []

            if yd_available:
                req_yd = copy(req)
                if self.exchange in [Exchange.SHFE, Exchange.INE]:
                    req_yd.offset = Offset.CLOSEYESTERDAY
                else:
                    req_yd.offset = Offset.CLOSE
                req_list.append(req_yd)

            if open_volume:
                req_open = copy(req)
                req_open.offset = Offset.OPEN
                req_open.volume = open_volume
                req_list.append(req_open)

            return req_list


给CTP增加备用行情服务器

单台行情服务器,有出现Tick丢失的情况,这个策略有一定的 影响,可以通过增加多台行情服务器的方法,解决问题

from collections import deque
from vnpy.gateway.ctp.ctp_gateway import *

class CtpGatewayDouble(CtpGateway):
    default_setting = {
        "用户名": "",
        "密码": "",
        "经纪商代码": "",
        "交易服务器": "",
        "主行情服务器": "",
        "次行情服务器": "",
        "产品名称": "",
        "授权编码": "",
        "产品信息": "",
    }

    def __init__(self, event_engine):
        super(CtpGateway, self).__init__(event_engine, "Double")
        self.td_api = CtpTdApi(self)
        # 主行情
        self.md_api = CtpMdApi(self)
        # 备用行情
        self.sub_md_api = CtpMdApi(self)
        # 缓存
        self.tick_buffer = deque(maxlen=100_000)

    def connect(self, setting: dict):
        """"""
        userid = setting["用户名"]
        password = setting["密码"]
        brokerid = setting["经纪商代码"]
        td_address = setting["交易服务器"]
        md_address = setting["主行情服务器"]
        sub_md_address = setting["次行情服务器"]
        appid = setting["产品名称"]
        auth_code = setting["授权编码"]
        product_info = setting["产品信息"]

        if (not td_address.startswith("tcp://")) and (
            not td_address.startswith("ssl://")
        ):
            td_address = "tcp://" + td_address

        if (not md_address.startswith("tcp://")) and (
            not md_address.startswith("ssl://")
        ):
            md_address = "tcp://" + md_address

        if (not sub_md_address.startswith("tcp://")) and (
            not sub_md_address.startswith("ssl://")
        ):
            sub_md_address = "tcp://" + sub_md_address

        self.td_api.connect(
            td_address, userid, password, brokerid, auth_code, appid, product_info
        )
        self.md_api.connect(md_address, userid, password, brokerid)
        self.sub_md_api.connect(sub_md_address, userid, password, brokerid)

        self.init_query()

    def subscribe(self, req: SubscribeRequest):
        """"""
        self.md_api.subscribe(req)
        self.sub_md_api.subscribe(req)

    def close(self):
        """"""
        self.td_api.close()
        self.md_api.close()
        self.sub_md_api.close()

    def on_tick(self, tick):
        tick_hash = hash(tick)
        if tick_hash not in self.tick_buffer:
            self.tick_buffer.append(tick_hash)
            super().on_tick(tick)


《vn.py 3.0.0源代码深入分析》

我学Python的目的很明确,就是量化交易。从一开始就有关注vn.py,但我学的是Python3,那时vn.py还处于版本1.x时期,所以只能望vn.py兴叹。
vn.py 2.0出来之后我并没有及时注意,等反应过来已经是2.0.7版。很兴奋,认真研究,并将心得写成《vn.py 2.0.7源代码深入分析》,分享在vn.py社区的经验分享板块。
出于对量化交易的爱好,出于对Python在量化交易中作用的认同,一定程度受vn.py强大功能的鼓舞,我与同事合写了《Python量化交易从入门到实战》一书,对vn.py的讨论是其中很重要的一部分内容。
后续又写了《vn.py 2.1.4源代码深入分析》和《vn.py 2.2.0源代码深入分析》两个文档,感谢各位老师的认可。
vn.py 3.0.0版发布于2022-03-23,这是我一直期待的一个版本,所以它刚一推出,我就立刻开始试用,并着手整理《vn.py 3.0.0源代码深入分析》。夜以继日,终于在前天完成。先发到了书籍的资源群中,接受了两天批评,现分享到此处。
写作本文档的一个主要目的是对vn.py的开源精神做出一点支持,希望本文档能够对大家学习使用vn.py有所帮助。

百度网盘链接:https://pan.baidu.com/s/1cl2MA9hNFhHlxfHM0gGe2A
提取码:s7u6



cta策略在不重启的情况下,重载策略文件的方法

刚接触PYTHON,经过测试可用,如有不妥欢迎指正。
新增加了刷新策略按钮,按下后刷新策略文件并重新加载。

description

修改以下文件:
1:添加按钮和方法
vnpy\app\cta_strategy\ui\widget.py
在add_button.clicked.connect(self.add_strategy) 下方添加以下两行

description

reload_button = QtWidgets.QPushButton("刷新策略")
reload_button.clicked.connect(self.reload_class)

在hbox1.addWidget(add_button)下方添加以下一行

description

hbox1.addWidget(reload_button)

在 def update_class_combo(self): 上方添加

description

    def reload_class(self):
        self.cta_engine.load_strategy_class()
        self.class_combo.clear()
        self.update_class_combo()

2:修改加载逻辑
vnpy\app\cta_strategy\engine.py
修改load_strategy_class方法

description

在path1 = Path(file).parent.joinpath("strategies")上方添加

        for loadClass in self.classes:
            del loadClass
        self.classes.clear()

修改load_strategy_class_from_module方法

description

在module = importlib.import_module(module_name)的下方添加

module = importlib.reload(module)

这样可以在不重启vntrader的情况下,修改策略文件,并重新加载,方便调试。



关于期货交易CTP接口的五档行情接$,和设置实现

就像2020年总结提到,最近想在中高频策略搞搞。对于高频交易,如果有深度行情当然是必须的,看到说上期所五档行情免费提供,而且我用其他行情软件可以查到,就想在VNPY保存到数据库先研究下。有点发现,写下来记录下。

首先看了下代码,VNPY的ctp_gateway已经支持五档行情接收,也不需要什么配置,但是没有发现五档行情数据,无论是界面端还是数据库。

然后研究下,因为ctp_gateway行情接收是采用继承接口被调用的模式,是由CTP API工作线程驱动;没法被直接debug,倒是可以直接插入print方法输出。发现是有返回值,但是是1.79769313486232e+308这样的溢出值,被一个VNPY专门写静态方法adjust_price给过滤掉了。顺便说下,1.9.2版本也实现ctp五档行情读取,不过要改改ctp_gateway代码,把上期所加入。

我想是不是VNPY封装的CTP接口太老呢,不支持呢,因为看Github历史记录是2年前更新的。就研究了下CTP-API,发现原来是由下面几个点要注意,稍微说下。

首先按照CTP-API接口文档说的支持的通讯模式有三种:对话通讯模式,私有通讯模式,广播通讯模式,

对话通讯模式
对话通讯模式是指由会员端主动发起的通讯请求。该请求被交易所端接收和处理,并给予响应。例如报单、查询等。这种通讯模式与普通的客户/服务器模式相同。CTP-API中的命名Req------或者ReqQry------这样都是发起API

私有通讯模式
私有通讯模式是指交易所端主动,向某个特定的会员发出的信息。例如成交回报等,一般是On-----这样名字,也是上面提到继承接口被CTP调用。

广播通讯模式(公有流)
广播通讯模式又称公有流,是指交易所端主动,向市场中的所有会员都发出相同的信息。例如公告、市场公共信息等。另外,广播模式是不能在公网搞得,ctp所说的广播就是针对在对应所有会员也就是期货公司的内网广播。

但是,在CTP-API文档里面很明确的说明,五档行情是使用组播模式,针对同一组的机器进行广播。

二代组播行情(下文简称二代行情):交易所以组播方式提供的实时五档行情。因为是组播,所以接收端必须在内部网络并且要加入组播组。

目前交易所不允许投资者直接连接交易所报盘网去接收组播行情,如果期货公司将行情转发出来给投资者使用,投资者便能享受到快速的组播行情。我和我期货公司沟通,必须要把机器放在期货公司委托机房才可以接收转发组播的五档行情。这边补充下,查了些文档,组播也可以在公网搞,但是因为UDP没有确认,容易掉包,CTP-API提供增量方法来保证针对这个情况加以弥补,但是很少期货公司提供公网组播,比较吃了不讨好,如果那个提供我就去开户了,哈。

就算机器放在托管机房了,还有一个地方要设置,就是设置订阅组播行情前置,CTP-API方法如下

static CThostFtdcMdApi CreateFtdcMdApi(const char pszFlowPath = "", const bool bIsUsingUdp=false, const bool bIsMulticast=false);

各类型行情字段组合如下:
  bIsUsingUdp bIsMulticast
TCP行情前置 false false
UDP行情前置 true false
组播行情前置 true true

那么说,五档行情时候必须参数bIsUsingUdp和bIsMulticast都为True才可以。

我看了下VNPY 2.1.8的ctp-api,vnctpmd.h头文件的接口是createFtdcMdApi(string pszFlowPath = ""),没有提供bIsUsingUdp和bIsMulticast参数录入。可能是vnpy封装的CTP-API的版本太老,因为6.3.15api中的好像最早版本的行情部分并不支持二代,必须自己拼接。

总结下,就是要拿到免费五档行情,需要机器放在托管机房,另外ctp行情订阅必须把bIsUsingUdp和bIsMulticast都为True,对于VNPY需要重新封装以下CTP-API,更新下ctp_gateway。

参考了这篇文章 https://zhuanlan.zhihu.com/p/103178845



解密并强化日内经典策略R-Breaker

R-Breaker是一种中高频的日内交易策略,这个策略也长期被Future Truth杂志评为最赚钱的策略之一。R-Breaker策略结合了趋势和反转两种交易方式,所以交易机会相对较多,比较适合日内1分钟K线或者5分钟K线级别的数据。

 
 

R-Breaker策略逻辑

 

R-Breaker的策略逻辑由以下4部分构成:

1)计算6个目标价位

根据昨日的开高低收价位计算出今日的6个目标价位,按照价格高低依次是:

  • 突破买入价(Bbreak)
  • 观察卖出价(Ssetup)
  • 反转卖出价(Senter)
  • 反转买入价(Benter)
  • 观察买入价(Bsetup)
  • 突破卖出价(Sbreak)

 

他们的计算方法如下:(其中a、b、c、d为策略参数)

  • 观察卖出价(Ssetup)= High + a * (Close – Low)
  • 观察买入(Bsetup)= Low – a * (High – Close)
  • 反转卖出价(Senter)= b / 2 * (High + Low) – c * Low
  • 反转买入价(Benter)= b / 2 * (High + Low) – c * High
  • 突破卖出价(Sbreak)= Ssetup - d * (Ssetup – Bsetup)
  • 突破买入价(Bbreak)= Bsetup + d * (Ssetup – Bsetup)

 

description

2)设计委托逻辑

趋势策略情况:

  • 若价格>突破买入价,开仓做多;
  • 若价格<突破卖出价,开仓做空;

 

反转策略情况:

  • 若日最高价>观察卖出价,然后下跌导致价格<反转卖出价,开仓做空或者反手(先平仓再反向开仓)做空;
  • 若日最低价<观察买入价,然后上涨导致价格>反转买入价,开仓做多或者反手(先平仓再反向开仓)做多;

 

3)设定相应的止盈止损。

 

4)日内策略要求收盘前平仓。

 

上面是原版R-Breaker策略逻辑,但是使用RQData从2010年至今(即2019年10月)的1分钟沪深300股指期货主力连续合约(IF88)测试,效果并不理想。

 
 

策略逻辑优化

 

实际上R-Breaker策略可以拆分成趋势策略和反转策略。下面分别对这对2种策略逻辑进行优化:

1)趋势策略:

  • 若当前x分钟的最高价>观察卖出价,认为它具有上升趋势,在突破买入价挂上买入开仓的停止单;
  • 若当前x分钟的最低价<观察买入价,认为它具有下跌趋势,在突破卖出价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 增加过滤条件:为防止横盘行情导致不断的开平仓,日内每次开仓买入开仓(卖出开仓)委托的价位都比上一次更高(更低);
  • 收盘前,必须平调所持有的仓位。

 

2)反转策略:

  • 若当前x分钟的最高价>观察卖出价,认为它已经到了当日阻力位,可能发生行情反转,在反转卖出价挂上卖出开仓的停止单;
  • 若当前x分钟的最低价>观察买入价,认为它已经到了当日支撑位,可能发生行情反转,在反转买入价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 收盘前,必须平调所持有的仓位。

 

其代码实现逻辑如下:

self.tend_high, self.tend_low = am.donchian(self.donchian_window)

if bar.datetime.time() < self.exit_time:

    if self.pos == 0:
        self.intra_trade_low = bar.low_price
        self.intra_trade_high = bar.high_price

        # Trend Condition
        if self.tend_high > self.sell_setup:
            long_entry = max(self.buy_break, self.day_high)
            self.buy(long_entry, self.fixed_size, stop=True)

            self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)

        elif self.tend_low < self.buy_setup:
            short_entry = min(self.sell_break, self.day_low)
            self.short(short_entry, self.fixed_size, stop=True)

            self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)

    elif self.pos > 0:
        self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
        long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
        self.sell(long_stop, abs(self.pos), stop=True)

    elif self.pos < 0:
        self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
        short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
        self.cover(short_stop, abs(self.pos), stop=True)

# Close existing position
else:
    if self.pos > 0:
        self.sell(bar.close_price * 0.99, abs(self.pos))
    elif self.pos < 0:
        self.cover(bar.close_price * 1.01, abs(self.pos))

 
 

策略效果

 

同样使用10年的1分钟IF88数据进行回测。不过,在展示强化版R-Breaker策略效果前,先分别展示一下拆分后的趋势策略和反转策略。

1)趋势策略:

  • 趋势策略夏普比率1.96,日均成交2.6笔,资金曲线是整体上扬的;
  • 但是在2017~2018年的盘整阶段,具有较大并且持续时间较长的回撤;
  • 这凸显出趋势类策略自身无法规避的缺点:在趋势行情中盈利,意味着震荡行情必然亏损。

description

 

2)反转策略

  • 反转策略夏普比率0.75,日均成交0.4笔,资金曲线缓慢上扬;
  • 但是在2017~2018年的盘整阶段,资金曲线上扬最快,而且这个阶段是最平滑的;
  • 这凸显出反转类策略优点:尽管在趋势行情亏损,在震荡行情必然能盈利。

description

 

综合对比2种策略的日均成交笔数和资金曲线,我们可以知道:

  • 由于趋势策略日均交易笔数较多(2.6笔),它主要负责贡献R-Breaker策略的alpha;
  • 趋势策略的亏损也是主要导致R-Breaker策略亏损的原因,但这时候的亏损由反转策略的盈利来填补。

由于趋势策略和反转策略是互斥的,在某些方面呈现出此消彼长的特点。那么,根据投资组合理论,可以把反转策略看作是看跌期权,买入一定规模的看跌期权来对消非系统性风险,那么组合的收益会更加稳健,即夏普比率更高。

由于趋势策略和反转策略日均成交手数比是2.6:0.4,若它们都只委托1手的话,反转策略的对冲效果微乎其微。

为了方便演示,我们设置趋势策略每次交易1手;反转策略则是3手。然后合成R-Breaker策略。发现夏普比率提高到2,资金曲线整体上扬,而且没有较大且持续时间较长的回撤。

description

 
 

结论

 

R-Breaker策略成功之处在于它并不是纯粹的趋势类策略,它属于复合型策略,它的alpha由2部分构成:趋势策略alpha;反转策略alpha。

这类复合型策略可以看作是轻量级的投资组合,因为它的交易标的只有一个:沪深300股指期货的主力合约。

更复杂的话,可以交易多个标的,如在商品期货做虚拟钢厂套利(同时交易螺纹钢、铁矿石、焦炭),在IF股指期货上做日内CTA策略。考虑到市场容量不同,价差套利能分配更多的资金。这样在价差套利提供稳定收益率基础上,CTA策略能在行情好的时候贡献更多alpha(高盈亏比特征导致的)。

从上面例子可以看出,一个合理的投资组合,往往比单个策略具有更高的夏普比率。因为夏普比率=超额收益/风险。夏普比率高意味着资金曲线非常平滑;这也意味着我们可以有效控制使用杠杆的风险。

当某个投资组合策略夏普足够高,而且策略资金容量允许,交易成本能有效控制等情况下,就可以通过杠杆来提升组合收益了。例如向银行贷款或者发放债券,这时候交易团队是债务人角色,即在承担更大风险同时,追求更到收益。债权人享受利息收益(类无风险收益)。

向公众发产品是另一种增加杠杆的方式,但此时投资组合风险已经转移到了客户这方,交易团队可以享受着管理费收益(类无风险收益)。根据目标客户的不同:

  • 私募基金面向高净值客户,这些客户群体风险承受能力较高;并且私募监管比较放松,能使用的衍生品较多,有提升业绩的自由度。故私募基金除了管理费,更追求业绩提成。
  • 公募基金面向普通群众,他们风险承受能力较低;并且公募监管非常严格,投资约束非常多,提升业绩难度较大。但是公募牌照的稀缺性必然导致该行业是盈利的。如万亿级别的管理规模,其管理费的收益,也不是一般的自营交易公司或者私募基金比得上的。

 
 

附录

 

最后附上策略源代码:

from datetime import time
from vnpy.app.cta_strategy import (
    CtaTemplate,
    StopOrder,
    TickData,
    BarData,
    TradeData,
    OrderData,
    BarGenerator,
    ArrayManager
)
​
​
class RBreakStrategy(CtaTemplate):
    """"""
    author = "KeKe"
​
    setup_coef = 0.25
    break_coef = 0.2
    enter_coef_1 = 1.07
    enter_coef_2 = 0.07
    fixed_size = 1
    donchian_window = 30
​
    trailing_long = 0.4
    trailing_short = 0.4
    multiplier = 3
​
    buy_break = 0   # 突破买入价
    sell_setup = 0  # 观察卖出价
    sell_enter = 0  # 反转卖出价
    buy_enter = 0   # 反转买入价
    buy_setup = 0   # 观察买入价
    sell_break = 0  # 突破卖出价
​
    intra_trade_high = 0
    intra_trade_low = 0
​
    day_high = 0
    day_open = 0
    day_close = 0
    day_low = 0
    tend_high = 0
    tend_low = 0
​
    exit_time = time(hour=14, minute=55)
​
    parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
    variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]
​
    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(RBreakStrategy, self).__init__(
            cta_engine, strategy_name, vt_symbol, setting
        )
​
        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()
        self.bars = []
​
    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.load_bar(10)
​
    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")
​
    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")
​
    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)
​
    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.cancel_all()
​
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return
​
        self.bars.append(bar)
        if len(self.bars) <= 2:
            return
        else:
            self.bars.pop(0)
        last_bar = self.bars[-2]
​
        # New Day
        if last_bar.datetime.date() != bar.datetime.date():
            if self.day_open:
​
                self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close)  # 观察买入价
                self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low)  # 观察卖出价
​
                self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high  # 反转买入价
                self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low  # 反转卖出价
​
                self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup)  # 突破买入价
                self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup)  # 突破卖出价
​
            self.day_open = bar.open_price
            self.day_high = bar.high_price
            self.day_close = bar.close_price
            self.day_low = bar.low_price
​
        # Today
        else:
            self.day_high = max(self.day_high, bar.high_price)
            self.day_low = min(self.day_low, bar.low_price)
            self.day_close = bar.close_price
​
        if not self.sell_setup:
            return
​
        self.tend_high, self.tend_low = am.donchian(self.donchian_window)
​
        if bar.datetime.time() < self.exit_time:
​
            if self.pos == 0:
                self.intra_trade_low = bar.low_price
                self.intra_trade_high = bar.high_price
​
                if self.tend_high > self.sell_setup:
                    long_entry = max(self.buy_break, self.day_high)
                    self.buy(long_entry, self.fixed_size, stop=True)
​
                    self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
​
                elif self.tend_low < self.buy_setup:
                    short_entry = min(self.sell_break, self.day_low)
                    self.short(short_entry, self.fixed_size, stop=True)
​
                    self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
​
            elif self.pos > 0:
                self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
                long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
                self.sell(long_stop, abs(self.pos), stop=True)
​
            elif self.pos < 0:
                self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
                short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
                self.cover(short_stop, abs(self.pos), stop=True)
​
        # Close existing position
        else:
            if self.pos > 0:
                self.sell(bar.close_price * 0.99, abs(self.pos))
            elif self.pos < 0:
                self.cover(bar.close_price * 1.01, abs(self.pos))
​
       self.put_event()
​
    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        pass
​
    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.put_event()
​
    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        pass


千呼万唤始出来~2022年小班特训营来袭! 【VeighNa套利价差交易】和【VeighNa源码深入解析】

发布于veighna社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2022-06-06
 

2022年以来陆续已经有不少同学咨询小班特训营的开始时间,但是由于上海疫情的影响加上开发任务的增加,今年的小班特训营一直拖着。

最近终于迎来了上海解封,2022年的VeighNa小班特训营也正式开始报名!!计划今年推出的场次不会太多,感兴趣的同学请抓紧机会~~

目前先开放两场名额(有些老铁们去年就已经报名锁定了今年的名额,感谢对我们课程的认可与支持),老规矩还是放几张之前课程的照片:

description

准备完毕,静候同学们到达

description

学习量化,先从掌握核心框架

description

深入代码,分析策略逻辑细节

所有小班特训营时间定在周末两天,一共包含周六周日两个下午共计10+小时的课程,设立特训营专属答疑群,包括后续三个月的助教跟踪辅导,提供VeighNa小班特训营专属内部核心资料。

线下课程的地点在上海浦东,考虑到新冠以来大家对于坐火车飞机的健康风险顾虑,不想来上海的同学我们也提供远程线上听课(直播+录播)。对于所有参加小班特训营的学员,在课程结束后都会拿到课程的完整录播视频,可永久回看。

 

VeighNa套利价差交易

 
日期:2022年7月16日(周六)和7月17日(周日)

时间:两天下午1点-6点,共计10小时

大纲

  1. 初识套利价差交易

    a. 套利类策略的特点:靠高胜率获得核心优势

    b. 如何寻找好的套利价差,从数据面和基本面着手

    c. 价差组合时间序列建模:相关性分析、协整算法

  2. 价差数据结构设计

    a. 价差腿LegData和价差组合SpreadData

    b. 价差盘口计算原理:价格、数量、统计算法

    c. 基于动态解析的灵活价差数据计算

    d. 实盘数据流驱动,底层接口到上层算法

  3. 价差交易算法实现

    a. 价差执行算法和价差量化策略的异同

    b. 基于SpreadAlgoTemplate实现狙击算法

    c. 价差做市算法实现,盘口细粒度委托控制

  4. 价差量化策略开发

    a. 半自动固定范围买卖策略

    b. 全自动统计套利模型策略

    c. 网格区间价差交易策略

  5. 价差交易实战进阶:

    a. 价差策略回测:TICK模式和K线模式

    b. 实盘策略运维原则,安全、稳定

    c. 主动腿挂单做市算法的实现

价格:11999元(老学员和Elite会员折扣不变)

 

VeighNa源码深入解析

 
日期:2022年8月27日(周六)和8月28日(周日)

时间:两天下午1点-6点,共计10小时

大纲

  1. 交易接口封装

    a. 从0开发CtpGateway:掌握交易API的工作原理

    b. VeighNa数据结构:贯穿整个系统的数据对象定义

    c. 统一交易接口功能:标准化对接各类交易API接口

  2. 核心引擎设计

    a. 事件驱动引擎原理:如何实现Event-Driven架构

    b. 高性能数据缓存:O(1)查询复杂度容器应用

    c. CTA交易引擎构架:开发量化策略实盘交易引擎

    d. CTA回测引擎详解:保证实盘一致性的回测体系

  3. 图形界面开发

    a. 上手PySide6开发:新一代的Qt UI开发工具库

    b. 掌握图形控件使用:实现和策略引擎的数据交互

    c. 数据实时监控刷新:QTableWidget的高性能用法

    d. 窗口和对话框管理:开发满足实盘交易需求的界面布局

  4. 项目代码管理

    a. 开通Github仓库:大型项目代码开发和维护的神器

    b.项目代码管理流程:开发->测试->提交->合并->发布

    c. Type Hinting实践:Python语言中的类型提示管理

    d. 自动化代码检查工具:大幅减少代码中的各种类型Bug

  5. 应用开发实战

    a. 全自动任务运行:无人值守模式下的策略交易管理

    b. 守护父进程实现:实现交易子进程的监控和定时启停

    c. 异常监控报警:捕捉到策略或者系统异常后实时通知

    d. 应用模块开发:构造属于你的专属交易应用app模块

价格:11999元(老学员和Elite会员折扣不变)

 
报名方式和之前一样,请发送邮件到vn.py@foxmail.com,注明想参加的课程、姓名、手机、公司、职位即可。或者也可以扫描下方二维码添加小助手咨询报名:

description

 

课程对于之前参加过小班特训营的学员优先开放,同时提供9折的价格优惠。
 


新消息

统计

主题
7210
帖子
28235
已注册用户
34946
最新用户
在线用户
286
在线来宾用户
3925
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】