VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 420
声望: 181

1. 本地价数据差合成的过程

首先说明,这里所合成出来的价差K线只是简易价差K线,它和交易所发表的套利品种的K线相比还是有所差别的,主要的差别在最高价、最低价和开盘价,但收盘价是准确的。
description

description

2. 实现过程:

2.1 在app\spread_trading\base.py添加下面query_tick_from_rq()函数

# hxxjava debug spread_trading
def query_tick_from_rq(
    symbol: str, exchange: Exchange, start: datetime, end: datetime
):
    """
    Query tick data from RQData.
    """
    from vnpy.trader.rqdata import rqdata_client
    from vnpy.trader.object import HistoryRequest

    if not rqdata_client.inited:
        rqdata_client.init()

    req = HistoryRequest(
        symbol=symbol,
        exchange=exchange,
        interval=Interval.TICK,
        start=start,
        end=end
    )
    data = rqdata_client.query_tick_history(req)
    return data

2.2 在app\spread_trading\base.py修改load_tick_data()函数

有两种方式:一种是从米筐加载数据下载tick,另一种是从数据库中读取已经录制的该价差的tick数据。

@lru_cache(maxsize=999)
def load_tick_data(
    spread: SpreadData,
    start: datetime,
    end: datetime,
    pricetick: float = 0
):
    """"""
    # hxxjava debug spread_trading
    # 目前没有考虑反向合约的情况,以后解决
    spread_ticks: List[TickData] = []

    try:
        # 防止因为用户没有米筐tick数据权限而发生异常

        # Load tick data of each spread leg
        dt_legs: Dict[str, Dict] = {}   # datetime string : Dict[vt_symbol:tick]
        format_str = "%Y%m%d%H%M%S.%f"
        for vt_symbol in spread.legs.keys():
            symbol, exchange = extract_vt_symbol(vt_symbol)

            # hxxjava debug spread_trading
            tick_data = query_tick_from_rq(symbol=symbol, exchange=exchange,start=start,end=end)

            if tick_data:
                print(f"load from rqdatac {symbol}.{exchange} tick_data, len of = {len(tick_data)}")

            # save all the spread's legs tick into a dictionary by tick's datetime
            for tick in tick_data:
                dt_str = tick.datetime.strftime(format_str)
                if dt_str in dt_legs:
                    dt_legs[dt_str].update({vt_symbol:tick})
                else:
                    dt_legs[dt_str] = {vt_symbol:tick}

        # Calculate spread bar data
        # snapshot of all legs's ticks  
        snapshot:Dict[str,TickData] = {}
        spread_leg_count = len(spread.legs)

        for dt_str in sorted(dt_legs.keys()): 
            dt = datetime.strptime(dt_str,format_str).astimezone(LOCAL_TZ)
            # get each datetime  
            spread_price = 0
            spread_value = 0

            # get all legs's ticks dictionary at the datetime
            leg_ticks = dt_legs.get(dt_str)
            for vt_symbol,tick in leg_ticks.items():
                # save each tick into the snapshot
                snapshot.update({vt_symbol:tick})

            if len(snapshot) < spread_leg_count:
                # if not all legs tick saved in the snapshot
                continue

            # out_str = f"{dt_str} "
            # format_str1 = "%Y-%m-%d %H:%M:%S.%f "
            for vt_symbol,tick in snapshot.items():
                price_multiplier = spread.price_multipliers[vt_symbol]
                spread_price += price_multiplier * tick.last_price
                spread_value += abs(price_multiplier) * tick.last_price
                # out_str += f"[{vt_symbol} {tick.datetime.strftime(format_str1)} {tick.last_price}],"
            # print(out_str)

            if pricetick:
                spread_price = round_to(spread_price, pricetick)

            spread_tick = TickData(                
                symbol=spread.name,
                exchange=exchange.LOCAL,
                datetime=dt,
                open_price=spread_price,
                high_price=spread_price,
                low_price=spread_price,
                last_price=spread_price,
                gateway_name="SPREAD")

            spread_tick.value = spread_value
            spread_ticks.append(spread_tick)

        if spread_ticks:
            print(f"load {symbol}.{exchange}' ticks from rqdatac, len of = {len(tick_data)}")

    finally:
        if not spread_ticks:
            # 读取数据库中已经录制过的该价差的tick数据
            spread_ticks = database_manager.load_tick_data(spread.name, Exchange.LOCAL, start, end)

        return spread_ticks

3. 如何使用load_tick_data()?

3.1 修改价差策略的on_init()

只要在你的价差策略中on_init()中添加如下代码,就可以调用:

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.load_tick(days=3)

3.2 如何剔除节假日?

修改vnpy\app\spread_trading\engine.py

3.2.1 利用米筐接口函数剔除节假日

增加下面函数:

def get_previous_trading_date(dt:datetime,days:int): # hxxjava add
    """ 
    得到某个日期dt的去除了节假日的前days个交易日 
    """
    from vnpy.trader.rqdata import rqdata_client
    import rqdatac as rq

    if not rqdata_client.inited:
        rqdata_client.init()

    prev_td = rq.get_previous_trading_date(date=dt.date(),n=days)

    return prev_td

3.2.2 修改SpreadStrategyEngine的load_bar()和load_tick()

修改如下,修改后两个函数的days参数就代表交易日了。

    def load_bar(
        self, spread: SpreadData, days: int, interval: Interval, callback: Callable
    ):
        """"""
        end = datetime.now()
        # start = end - timedelta(days)
        start = get_previous_trading_date(dt = end,days=days)   # hxxjava change

        bars = load_bar_data(spread, interval, start, end)

        print(f"{spread.name} {start}-{end} len of bars = {len(bars)}") # hxxjava debug spead_trading
        for bar in bars:
            callback(bar)

    def load_tick(self, spread: SpreadData, days: int, callback: Callable):
        """"""
        end = datetime.now()
        # start = end - timedelta(days=days)
        start = get_previous_trading_date(dt = end,days=days) # hxxjava change

        ticks = load_tick_data(spread, start, end)

        for tick in ticks:
            callback(tick)
Administrator
avatar
加入于:
帖子: 4547
声望: 324

感谢分享!加个精华

Member
avatar
加入于:
帖子: 187
声望: 58

恰巧我有做价差交易,就补充一下: 先对齐时间戳,再合成价差

楼主的做法,是有tick数据就合成价差,这会引入脏数据。价差交易的标准做法,是通过回望过去n个spread data,得到均线和标准差,构建上下轨(可以理解为置信区间),但它突破的时候做价差回归交易。当缓存的spread data数组不准时候,交易信号自然有错。

假设t1时刻同时来2个tick,时间上先A,后B。那么只需要收到B数据时候,在合成价差就可以了。因为收到a时刻,合成是spread是B(t+0) - A(t + 1),正确的价差永远是对齐时间戳后,用最新数据相减得到的。

如何对齐时间戳,这里到可以讲一下:
1) 先判断行情数据的顺序,或者是否有序:

  • 可以用机器收一篮子行情数据,在共享内存里面打tsc(cpu原子钟计数)。先设置tsc计数是恒定的,不会因为超频到5GHz或者基频3GHz不同而有所区别。我这里是3个tsc=1纳秒。

  • 以上期所为例子,行情的顺序就按照字母 A -> Z。例如先收到AL,AU,BC,RB,,,ZN。在相同品种里面,再按照数字从小到大,即先收到AL2209,再AL2210,AL2211。上期为了保证时间的统一性,切片会同赋予同样的时间,但这个切片时间不等于真实行情时间,而且会有2毫秒的延迟,并且是累计的。这里猜测是交易所虽然每隔500毫秒切,但是切片到发送,里面一些数据转换或者io消耗达2毫秒。所以观察到开盘机器对时后,行情是延迟累计增加,到后面越来越大。

2)确定价差对哪个品种是最后收到(跨期一般是远月),以那品种为基准合成价差:

  • 这仅仅是理论做法,在实践中会遇到蛮多坑的。

  • 以当前2202年8月为例子,跨期的话是AL2209(主力)&AL2210(次主力), AG2209(次)&AG2212(主)。我们知道用远月对齐,并且主力的tick数量,要远多于次主力的tick。

  • 这样,由于AL2210远月而且是次主力,那么合成价差后,AL的价差数据 < AG。

  • 这更好从硬件上面去处理,如使用FPGA。因为FPGA行情网卡收的是交易所行情镜像,它第一条数据是全息盘口数据,之后的都是增量数据。由于数据量比较小,相同的数据包会有大量的信息。这样,只要硬件解完码后,共享内存一刷新,都是最新的数据。就不从这么麻烦,例如AL2209解包,共享内存刷新一次,策略订阅的on_tick触发一次,然后AL2210解包,共享内存刷新一次,策略on_tick再触发。

  • 而正常的行情api的话,以凌云为例(类似ctp的md api),它是在柜台层面解包后,按照品种一个小包一个小包的发出去。相同数据,数据包拆得越小,处理起来越麻烦。

  • 如果没有条件使用fpga的话,只能在别的逻辑上面修正。
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】