不知道会不会有小伙伴跟我一样,会需要多进程获取或录制行情(我的情况是要按不同交易所去分进程),为了避免交易接口过多且无意义的重复登录,想只在登录行情接口的前提下记录行情。
这时陆续会发现几个问题:
1是CTPgateway里面是同时登录MD(行情)和TD(交易)这两个接口的
2是在只登录MDapi的情况下,登录成功,订阅成功,但是收不到行情推送
昨天晚上研究了一下,在No UI和修改RecorderEngine的前提下,说一说可行的改造方式以及部分代码:
对于第一个问题,其实很好解决,新写一个子类,把init和connect中,涉及td的部分删掉。然后调用新写的子类就好。
class CtpGatewayMD(CtpGateway):
def __init__(self, event_engine: EventEngine, gateway_name: str) -> None:
super().__init__(event_engine, gateway_name)
self.md_api: "CtpMdApi" = CtpMdApiMD(self) #这里跟第二个问题相关
def connect(self, setting: dict) -> None:
"""连接交易接口"""
userid: str = setting["用户名"]
password: str = setting["密码"]
brokerid: str = setting["经纪商代码"]
md_address: str = setting["行情服务器"]
if (
(not md_address.startswith("tcp://"))
and (not md_address.startswith("ssl://"))
and (not md_address.startswith("socks"))
):
md_address = "tcp://" + md_address
self.md_api.connect(md_address, userid, password, brokerid)
第二个问题也不难,但是在追踪问题的时候花费了不少时间,中间除了定位ctpgateway的问题,还有遇到了诸如process_contract_event以及subscribe时的一些问题。
这些问题本质上都来源于vnpy由于很多模块需要对全市场合约的获取,从而绑定了TDapi的登录所导致的。这些对于vnpy整个项目而言是可以理解的,因为只登录MDapi是个较小众的需求。
对于第二个问题,首先要说合约信息,这儿涉及到ContractData和各种涉及contract的方法,理论上获取Contract信息、生成ContractData只有gateway内登录tdapi之后才能获取。
对于process_contract_event,这个方法的背后逻辑其实是要取全市场合约的,这里可以不注册这个事件。
行情记录模块里面的subscribe在订阅时,要用到ContractData,可以通过修改函数解决。
def subscribe(self, symbol: str,exchange:Exchange,gateway_name:str) -> None:
""""""
req: SubscribeRequest = SubscribeRequest(
symbol=symbol,
exchange=exchange
)
self.main_engine.subscribe(req, gateway_name)
上述这些都完成之后,会发现一切正常就是收不到行情。从CtpGatewayMD切回到原始的CtpGateway就一切正常。
说明问题还是在CtpGateway内部。然后定位到了CtpGateway里onRtnDepthMarketData内部看到symbol_contract_map这么个坑爹的东西。
# 合约数据全局缓存字典
symbol_contract_map: Dict[str, ContractData] = {}
symbol_contract_map是CtpGateway内的一个全局字典,每当登录td并由查询完全市场合约(这个应该vnpy自动进行的)后,会存把合约信息存到symbol_contract_map这里一份。
干嘛用呢,就是给下单和收到回报进行校验。坑在哪了呢,行情推送时,也会校验是否存在,查不到合约则直接跳过返回。
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
# 对大商所的交易日字段取本地日期
if not data["ActionDay"] or contract.exchange == Exchange.DCE:
date_str: str = self.current_date
else:
date_str: str = data["ActionDay"]
定位到这里其实就完事了,不过symbol_contract_map这种全局变量修改起来有一定风险,我用了另一种笨办法去做的:再写一个MDapi子类,供新写的CtpGatewayMD调用,同时内部增加一个symbol_contract_map属性,然后再单独给一个的赋值方法。
class CtpMdApiMD(CtpMdApi):
def __init__(self, gateway: CtpGateway) -> None:
super().__init__(gateway)
self.symbol_contract_map: Dict[str, ContractData] = {}
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
# 过滤还没有收到合约数据前的行情推送
symbol: str = data["InstrumentID"]
contract: ContractData = self.symbol_contract_map.get(symbol, None)
if not contract:
return
#这个方法底下的代码都一样,我不放进来了
def make_fake_symbol_contract_map(self,tmp):
self.symbol_contract_map=copy(tmp)
这样只要在订阅行情之前,通过make_fake_symbol_contract_map做一个包含要订阅的合约的字典,传给self.symbol_contract_map就可以了。
后续再运行就没问题了。
最后要说一下,这样做的前提
- 要在vnpy的外部有准确的合约获取方式
- 只获取或记录行情
最后,修改后如果脚本内还有其他涉及到获取合约的方法、属性,都要注意风险
谢谢
哦对了,我早上想到还有一个方法,更简单,就是跟着gateway先登录tdapi,获取合约信息之后再登出,理论上应该也没问题。