VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 420
声望: 174

1. 问题的由来

在使用vnpy中的DataRecorder进行tick情录制的时候,本人发现郑商所的tick中时间在一秒内会收到两个tick,但是同一秒内的tick的时间是相同的。这会给后端的CTA策略、价差交易策略登录模块在合成K的时带来不必要的困扰,因为通常合成K线的BarGenerator通常对tick的时间进行有效性判断的时候,会把时间重复的tick做抛弃处理,这会再次郑商所的品种的tick被使用的量减半。如果重复的tick也用的话,这样倒是不会减半,可是在防止垃圾数据时又无法杜绝旧的数据在网关断网或者重新再次连接时,接口回再次推送最后的tick数据,而这个tick数据之前已经被实时推送给客户段了。所有在ctp_gateway中对郑商所行情数据中时间秒以下的部分进行补充是非常必要的!

2. 修改思路

  1. 在ctp_gateway中对郑商所tick行情数据进行特别处理,思路是每次接收到郑商所tick时t1,与上次的tick的时间t2进行比较,如果二者是同一秒的tick,那么将新的tick时间t1 = t2 +(t2到下一秒微秒数量÷2);如果t1,t2不是同一秒或t2不存在,这t1无需调整。
  2. 记录该t1时间到字典中,供下一次使用

3. 修改代码

本修改只需要对vnpy_ctp\ctp_gateway.py进行修改就可以了,修改步骤如下:

3.1 在CtpMdApi的init()增加郑商所最新tick时间字典:

    def __init__(self, gateway: CtpGateway) -> None:
        """构造函数"""
        super().__init__()

        self.gateway: CtpGateway = gateway
        self.gateway_name: str = gateway.gateway_name

        self.reqid: int = 0

        self.connect_status: bool = False
        self.login_status: bool = False
        self.subscribed: Set = set()  

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""

        self.current_date: str = datetime.now().strftime("%Y%m%d")

        self.czce_last_times:Dict[str,datetime] = {}    # 郑商所最新tick时间字典 hxxjava add

3.2 修改CtpMdApi行情推送函数onRtnDepthMarketData():

    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date

        else:
            date_str: str = data["ActionDay"]

        # 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
        # timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"        
        timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        # hxxjava add start
        if contract.exchange == Exchange.CZCE:
            # 对郑商所配置的秒以下的部分进行特别处理
            key = f"{symbol}.{contract.exchange.value}"
            # 读取上次的tick时间
            last_time:datetime = self.czce_last_times.get(key,None)
            if last_time:
                # 取得上次时间的整秒时间
                dt1 = last_time - timedelta(microseconds = last_time.microsecond)
                if dt == dt1:
                    # 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
                    next_second = dt1 + timedelta(seconds=1)
                    delta = next_second - last_time
                    microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
                    # 时间调整为上次tick时间到下一秒到中间值
                    dt = last_time + timedelta(microseconds = microsecond)

            # 更新最新tick时间字典   
            self.czce_last_times[key] = dt
        # hxxjava add end

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)

4. 问题解决了

这是本人录制的郑商所合约TA301的结果:

description

description
都说国内期货的行情推送最多是2次,其实是错误的,可能超过2次 !看上图就知道,一秒3次一个经常存在的,所以不能够固定递增0.5秒,而应该向本文中的办法,每次加0.5,0.25,0.125......秒的比较好,既不重复有可以超过2次。

至此就完美地解决了郑商所合约行情秒内tick时间重复的问题了!

Member
avatar
加入于:
帖子: 3
声望: 0

大佬真牛!!!

Administrator
avatar
加入于:
帖子: 4502
声望: 322

感谢分享,精华送上!

Member
avatar
加入于:
帖子: 5
声望: 0

description
感谢大佬分享!
我想按您说的修改一下代码,但发现有一行代码不同,用原代码还是用您的代码呀,还是说两行代码是一个意思。
我学python没多久,很多地方看不懂,在这里跪谢大佬。

Member
avatar
加入于:
帖子: 420
声望: 174

Teller wrote:

description
感谢大佬分享!
我想按您说的修改一下代码,但发现有一行代码不同,用原代码还是用您的代码呀,还是说两行代码是一个意思。
我学python没多久,很多地方看不懂,在这里跪谢大佬。

答复:
去掉前面的,保留后一句。

Member
avatar
加入于:
帖子: 5
声望: 0

description
感谢大佬回复!
大佬,修改后我遇到这样的报错,是不是我少import了什么包?
我自己尝试import了pytz包,但是没成功。

Member
avatar
加入于:
帖子: 125
声望: 4

试试升级一下库

Member
avatar
加入于:
帖子: 5
声望: 0

description
我研究了一下,这两段代码好像是跟时区有关的,先全都注了就能用。
但不知道后续会有什么影响,坐等大佬答疑。

Member
avatar
加入于:
帖子: 125
声望: 4

Teller wrote:

description
我研究了一下,这两段代码好像是跟时区有关的,先全都注了就能用。
但不知道后续会有什么影响,坐等大佬答疑。
红框圈的是毫秒,不是时区。接口每0.5秒推一次数据

Member
avatar
加入于:
帖子: 420
声望: 174

Teller wrote:

description
感谢大佬回复!
大佬,修改后我遇到这样的报错,是不是我少import了什么包?
我自己尝试import了pytz包,但是没成功。

答复:
ctp_gateway.py的引用部分有

import pytz

下面是常量定义:

CHINA_TZ = pytz.timezone("Asia/Shanghai")       # 中国时区

然后就可以使用了,目的是把CTP接口中的所有行情、委托单和成绩单这涉及到时间的字段都同一到中国时区。
难道你的ctp_gateway.py文件中没有这些内容?

Member
avatar
加入于:
帖子: 420
声望: 174

七月雪 wrote:

Teller wrote:

description
我研究了一下,这两段代码好像是跟时区有关的,先全都注了就能用。
但不知道后续会有什么影响,坐等大佬答疑。
红框圈的是毫秒,不是时区。接口每0.5秒推一次数据

不是0.5秒,而是秒内收到tick,第1个算整秒,第2个算0.5秒,第3个算0.75秒,一次类推... ...。本意是为了上层应用使用这些tick时的方便。

Member
avatar
加入于:
帖子: 5
声望: 0

非常感谢大佬,按您的方法做,问题已经解决了,我的ctp_gateway.py里确实没有pytz。而是这样写的
from vnpy.trader.utility import get_folder_path, ZoneInfo
……
CHINA_TZ = ZoneInfo("Asia/Shanghai")
按ctrl点进去看,以我现在的水平还看不懂,所以不太会用
单纯去掉时区,又会导致updata_tick报错
今天按照您的方法,数据就正常了
非常感谢
hxxjava wrote:

Teller wrote:

description
感谢大佬回复!
大佬,修改后我遇到这样的报错,是不是我少import了什么包?
我自己尝试import了pytz包,但是没成功。

答复:
ctp_gateway.py的引用部分有

import pytz

下面是常量定义:

CHINA_TZ = pytz.timezone("Asia/Shanghai")       # 中国时区

然后就可以使用了,目的是把CTP接口中的所有行情、委托单和成绩单这涉及到时间的字段都同一到中国时区。
难道你的ctp_gateway.py文件中没有这些内容?

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】