VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 1
声望: 0

不知道会不会有小伙伴跟我一样,会需要多进程获取或录制行情(我的情况是要按不同交易所去分进程),为了避免交易接口过多且无意义的重复登录,想只在登录行情接口的前提下记录行情。
这时陆续会发现几个问题:
1是CTPgateway里面是同时登录MD(行情)和TD(交易)这两个接口的
2是在只登录MDapi的情况下,登录成功,订阅成功,但是收不到行情推送

昨天晚上研究了一下,在No UI和修改RecorderEngine的前提下,说一说可行的改造方式以及部分代码:
对于第一个问题,其实很好解决,新写一个子类,把init和connect中,涉及td的部分删掉。然后调用新写的子类就好。

class CtpGatewayMD(CtpGateway):

    def __init__(self, event_engine: EventEngine, gateway_name: str) -> None:
        super().__init__(event_engine, gateway_name)
        self.md_api: "CtpMdApi" = CtpMdApiMD(self) #这里跟第二个问题相关

    def connect(self, setting: dict) -> None:
        """连接交易接口"""
        userid: str = setting["用户名"]
        password: str = setting["密码"]
        brokerid: str = setting["经纪商代码"]
        md_address: str = setting["行情服务器"]


        if (
            (not md_address.startswith("tcp://"))
            and (not md_address.startswith("ssl://"))
            and (not md_address.startswith("socks"))
        ):
            md_address = "tcp://" + md_address

        self.md_api.connect(md_address, userid, password, brokerid)

第二个问题也不难,但是在追踪问题的时候花费了不少时间,中间除了定位ctpgateway的问题,还有遇到了诸如process_contract_event以及subscribe时的一些问题。
这些问题本质上都来源于vnpy由于很多模块需要对全市场合约的获取,从而绑定了TDapi的登录所导致的。这些对于vnpy整个项目而言是可以理解的,因为只登录MDapi是个较小众的需求。

对于第二个问题,首先要说合约信息,这儿涉及到ContractData和各种涉及contract的方法,理论上获取Contract信息、生成ContractData只有gateway内登录tdapi之后才能获取。
对于process_contract_event,这个方法的背后逻辑其实是要取全市场合约的,这里可以不注册这个事件。
行情记录模块里面的subscribe在订阅时,要用到ContractData,可以通过修改函数解决。

    def subscribe(self, symbol: str,exchange:Exchange,gateway_name:str) -> None:
        """"""
        req: SubscribeRequest = SubscribeRequest(
            symbol=symbol,
            exchange=exchange
        )
        self.main_engine.subscribe(req, gateway_name)

上述这些都完成之后,会发现一切正常就是收不到行情。从CtpGatewayMD切回到原始的CtpGateway就一切正常。
说明问题还是在CtpGateway内部。然后定位到了CtpGateway里onRtnDepthMarketData内部看到symbol_contract_map这么个坑爹的东西。

# 合约数据全局缓存字典
symbol_contract_map: Dict[str, ContractData] = {}

symbol_contract_map是CtpGateway内的一个全局字典,每当登录td并由查询完全市场合约(这个应该vnpy自动进行的)后,会存把合约信息存到symbol_contract_map这里一份。
干嘛用呢,就是给下单和收到回报进行校验。坑在哪了呢,行情推送时,也会校验是否存在,查不到合约则直接跳过返回。

    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date
        else:
            date_str: str = data["ActionDay"]

定位到这里其实就完事了,不过symbol_contract_map这种全局变量修改起来有一定风险,我用了另一种笨办法去做的:再写一个MDapi子类,供新写的CtpGatewayMD调用,同时内部增加一个symbol_contract_map属性,然后再单独给一个的赋值方法。

class CtpMdApiMD(CtpMdApi):

    def __init__(self, gateway: CtpGateway) -> None:
        super().__init__(gateway)
        self.symbol_contract_map: Dict[str, ContractData] = {}

    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]

        contract: ContractData = self.symbol_contract_map.get(symbol, None)
        if not contract:
            return
#这个方法底下的代码都一样,我不放进来了

    def make_fake_symbol_contract_map(self,tmp):
        self.symbol_contract_map=copy(tmp)

这样只要在订阅行情之前,通过make_fake_symbol_contract_map做一个包含要订阅的合约的字典,传给self.symbol_contract_map就可以了。
后续再运行就没问题了。

最后要说一下,这样做的前提

  1. 要在vnpy的外部有准确的合约获取方式
  2. 只获取或记录行情

最后,修改后如果脚本内还有其他涉及到获取合约的方法、属性,都要注意风险
谢谢


哦对了,我早上想到还有一个方法,更简单,就是跟着gateway先登录tdapi,获取合约信息之后再登出,理论上应该也没问题。

Member
avatar
加入于:
帖子: 58
声望: 0

其实可以开多一个期货账号,100块就可以有一个稳定的数据源了,录制和交易分开账号

Member
avatar
加入于:
帖子: 1615
声望: 115

感谢分享!

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】