一、问题是怎么发现的
本人在对郑商所行情数据中时间秒以下的部分进行补充的修改一文中,给出来对郑商所行情数据中时间秒以下的部分进行了补充,ctp_gateway也已经按照预想的那样工作了。之后启动DataRecorder对多个合约进行tick数据的录制。可是在价差tick数据的时候却发现对有夜盘的合约进行录制的时间,如果在23:00以后还有tick数据推送的话,会出现日期错误,具体的表现是:2022-10-19 23:00:00以后的tick数据中的datetime字段的日期部分居然是2022-10-20!也就是说我昨天晚上就录到了今天晚上23:00:00以后tick数据(当然是未来时间),这是错误的!
二、证据
打开mysql数据库(我用的是mysql数据库):
输入下面的SQL查询语句:
select symbol,datetime from dbtickdata where time(datetime) > "22:59:59.5";
得到的结果:
+--------+-------------------------+
| symbol | datetime |
+--------+-------------------------+
| i2301 | 2022-10-19 22:59:59.522 |
| i2301 | 2022-10-19 23:00:00.013 |
| i2301 | 2022-10-19 23:00:00.038 |
| i2301 | 2022-10-20 23:00:00.038 | ---- 日期错误,应该是2022-10-19
| p2301 | 2022-10-19 23:00:00.025 |
| p2301 | 2022-10-20 23:00:00.025 | ---- 日期错误,应该是2022-10-19
| rb2301 | 2022-10-19 22:59:59.500 |
| rb2301 | 2022-10-19 23:00:00.000 |
| TA301 | 2022-10-19 22:59:59.500 |
| TA301 | 2022-10-19 22:59:59.750 |
| TA301 | 2022-10-19 22:59:59.875 |
+--------+-------------------------+
注意:我发现此问题的时间是2022-10-20 16:00!
三、原因找到了!
经过仔细研读ctp_gateway.py的CtpMdApi的onRtnDepthMarketData(),具体的实现见下面的代码,发现对大商所合约tick的datetime错误导致的。
3.1 大商所合约的深度行情中没有日期字段
按照ctp接口规范文档知道,大商所合约的深度行情中没有日期字段,需要客户端收到后自行用本地日期去替代。
CtpMdApi中用定时器维护了一个self.current_date的本地日期成员,其主要作用就是做为了补齐大商所合约的深度行情日期和时间之用的。
3.2 何时会出现这个错误?
因为CtpMdApi每次重新连接之后,再次订阅一些合约的行情的时候,必然会推送一条该合约在交易所中最后更新的tick时间给订阅方。只要在每天8:59(不含)之前重新连接大商所的行情服务器,就一定会收到一个日期为当日,时间为该合约最后一次推送的tick,而其中的补足日期是不对的!
请看看上面的证据中,发生日期错误的全部是大商所的合约,分析正确 !
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
# 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
# timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt: datetime = CHINA_TZ.localize(dt)
# hxxjava add start
if contract.exchange == Exchange.CZCE:
# 对郑商所配置的秒以下的部分进行特别处理
key = f"{symbol}.{contract.exchange.value}"
# 读取上次的tick时间
last_time:datetime = self.czce_last_times.get(key,None)
if last_time:
# 取得上次时间的整秒时间
dt1 = last_time - timedelta(microseconds = last_time.microsecond)
if dt == dt1:
# 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
next_second = dt1 + timedelta(seconds=1)
delta = next_second - last_time
microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
# 时间调整为上次tick时间到下一秒到中间值
dt = last_time + timedelta(microseconds = microsecond)
# 更新最新tick时间字典
self.czce_last_times[key] = dt
# hxxjava add end
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
turnover=data["Turnover"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
3.3 怎么解决?
方法1. 问题已经找到,怎么保证这个日期一定是正确的,可不是一件简单的事情,因为它牵涉到交易时间段和节假日的计算问题!
方法2. 有一个简单的办法,既然正确日期不好计算,可以对大商所的tick与当前本地时间进行比较,如果tick的datetime是未来时间,直接该tick抛弃掉,这样也不会有太大影响。
3.4 采用方法2的实现方法
打开vnpy_ctpgateway.py文件,对class CtpMdApi进行如下修改,具体的修改请查找 '# hxxjava'就可以找到修改的语句了。
1、CtpMdApi中增加self.current_time,用来记录本地时间
class CtpMdApi(MdApi):
""""""
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.subscribed: Set = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
# self.current_date: str = datetime.now().strftime("%Y%m%d") # hxxjava comments
self.current_time: datetime = datetime.now() # hxxjava adds
self.current_date: str = self.current_time.strftime("%Y%m%d") # hxxjava adds
self.czce_last_times:Dict[str,datetime] = {} # hxxjava add
2、修改CtpMdApi的update_date()函数
def update_date(self) -> None:
"""更新当前日期"""
# self.current_date = datetime.now().strftime("%Y%m%d") # hxxjava comments
self.current_time = datetime.now() # hxxjava adds
self.current_date = self.current_time.strftime("%Y%m%d") # hxxjava adds
3、修改CtpMdApi的深度行情推送函数onRtnDepthMarketData()
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
# 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
# timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt: datetime = CHINA_TZ.localize(dt)
# hxxjava add start
if contract.exchange == Exchange.CZCE:
# 对郑商所配置的秒以下的部分进行特别处理
key = f"{symbol}.{contract.exchange.value}"
# 读取上次的tick时间
last_time:datetime = self.czce_last_times.get(key,None)
if last_time:
# 取得上次时间的整秒时间
dt1 = last_time - timedelta(microseconds = last_time.microsecond)
if dt == dt1:
# 如果有收到一次秒内的tick,计算应该添加到秒以下的部分
next_second = dt1 + timedelta(seconds=1)
delta = next_second - last_time
microsecond = (delta.seconds*1000000 + delta.microseconds) / 2
# 时间调整为上次tick时间到下一秒到中间值
dt = last_time + timedelta(microseconds = microsecond)
# 更新最新tick时间字典
self.czce_last_times[key] = dt
elif contract.exchange == Exchange.DCE:
# 对郑商所配置的秒以下的部分进行特别处理
if dt > CHINA_TZ.localize(self.current_time) + timedelta(seconds = 10):
# 如果大商所的补足本地日期的dt超前每2秒更新一次的本地时间2秒,判断为无数的行情数据,做丢弃处理。
# 但是为了降低对本地时间与交易所时间同步要求,放宽为10秒或者更多都可以
return
# hxxjava add end
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
turnover=data["Turnover"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)