VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 419
声望: 170

如何更有效地利用合约交易状态信息——拒绝CTP接口中的脏数据。

1. 任何接口中都会有脏数据,CTP接口也不例外

不知道您是否发现这些情况:

  1. 明明已经休市或者收盘了,可是行情列表中的合约数据还在疯狂地涌来,如果此时你的策略还在运行状态,那会发生什么?
  2. 市场中许多人使用数据记录器(DataRecorder)进行合约数据的录制,经常发现录制到的结果里有许多莫名其妙的数据!
  3. 于是我们会说,尽量要靠近开盘再启动程序或者策略,一旦收盘就关闭vnpy系统,这样就可以避免被接口中的脏数据干扰和影响。

这种情况不只是使用CTP接口的交易者会遇到,其他接口也是一样。

2. CTP接口本身已经有完善的机制

为了防止客户端接收到脏数据,CTP接口会在某个交易时间端的开始结束时,在公共流中播发所有品种的合约品种的交易状态通知,只是vnpy系统没有使用。
交易状态通知的格式见帖子 如何更有效地利用合约交易状态信息——交易状态信息管理器

3. 拒绝CTP接口中的脏数据的方法

系统连接接口后,订阅的行情数据就会从CTP网关的MdApi接口中推送给客户端。目前vnpy在收到tick数据时没有任何有效性判断,就直接推送给了各种订阅者了,而这正是脏数据的来由!
正确的做法是:客户端就在收到tick数据的时候,根据合约品种的交易状态判断该数据是是否为有效数据,如果合约的状态是开盘前、非交易或者是收盘状态,则将该tick丢弃。
这样就可以杜绝脏数据对交易者、各种应用策略或数据记录器的影响。

4. 代码实现

4.1 实现交易状态管理器

交易状态管理器的功能及实现代码详见 如何更有效地利用合约交易状态信息——交易状态信息管理器 ,这里就不再提供了。

4.2 添加EVENT_ORIGIN_TICK消息类型

在vnpy\trader\event.py文件中增加下面的消息定义:

EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug

4.3 修改网关Gateway的on_tick()函数

在vnpy\trader\gateway.py文件中,将所有网关的父类Gateway的on_tick()函数做如下修改:

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        # self.on_event(EVENT_TICK, tick)
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)
        self.on_event(EVENT_ORIGIN_TICK, tick)

4.4 修改OmsManager

class OmsEngine(BaseEngine):
    """
    Provides order management system function for VN Trader.
    """

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(OmsEngine, self).__init__(main_engine, event_engine, "oms")

        # self.trade_status = TradeStatus(event_engine,6)   # hxxjava add

        self.ticks: Dict[str, TickData] = {}
        self.orders: Dict[str, OrderData] = {}
        self.trades: Dict[str, TradeData] = {}
        self.positions: Dict[str, PositionData] = {}
        self.accounts: Dict[str, AccountData] = {}
        self.contracts: Dict[str, ContractData] = {}
        self.active_orders: Dict[str, OrderData] = {}
        self.trade_status_manager = TradeStatusManager(event_engine,30)       # 创建交易状态管理器

        self.add_function()
        self.register_event()

    def add_function(self) -> None:
        """Add query function to main engine."""
        self.main_engine.get_tick = self.get_tick
        self.main_engine.get_order = self.get_order
        self.main_engine.get_trade = self.get_trade
        self.main_engine.get_position = self.get_position
        self.main_engine.get_account = self.get_account
        self.main_engine.get_contract = self.get_contract
        self.main_engine.get_all_ticks = self.get_all_ticks
        self.main_engine.get_all_orders = self.get_all_orders
        self.main_engine.get_all_trades = self.get_all_trades
        self.main_engine.get_all_positions = self.get_all_positions
        self.main_engine.get_all_accounts = self.get_all_accounts
        self.main_engine.get_all_contracts = self.get_all_contracts
        self.main_engine.get_all_active_orders = self.get_all_active_orders
        self.main_engine.get_status = self.get_status                   # hxxjava debug

    def register_event(self) -> None:
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)
        self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)  
        self.event_engine.register(EVENT_STATUS, self.process_status_event)  # 订阅合约交易状态数据
        self.event_engine.register(EVENT_ORIGIN_TICK, self.process_origin_tick_event)   # 订阅原始行情数据

    def process_origin_tick_event(self,event: Event):#-> None:  # 处理原始行情数据     
        """ 对原始tick数据进行有效性判断和处理 """
        tick = event.data
        status:StatusData = self.trade_status_manager.get_tick_status(tick)

        if not status:
            print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
            return

        # 有效交易状态
        valid_statuses = [
            InstrumentStatus.CONTINOUS,
            InstrumentStatus.AUCTION_ORDERING,
            InstrumentStatus.AUCTION_BALANCE,
            InstrumentStatus.AUCTION_MATCH
        ]
        if status.instrument_status in valid_statuses:
            # 这里是所有有效数据的发源地
            self.event_engine.put(Event(EVENT_TICK, tick))
            self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
        else:
            print(f"{datetime.now()} 特别交易状态={status} {tick}")


    def process_tick_event(self, event: Event) -> None:
        """"""
        tick = event.data
        self.ticks[tick.vt_symbol] = tick

    def process_order_event(self, event: Event) -> None:
        """"""
        order = event.data
        self.orders[order.vt_orderid] = order

        # If order is active, then update data in dict.
        if order.is_active():
            self.active_orders[order.vt_orderid] = order
        # Otherwise, pop inactive order from in dict
        elif order.vt_orderid in self.active_orders:
            self.active_orders.pop(order.vt_orderid)

    def process_trade_event(self, event: Event) -> None:
        """"""
        trade = event.data
        self.trades[trade.vt_tradeid] = trade

    def process_position_event(self, event: Event) -> None:
        """"""
        position = event.data
        self.positions[position.vt_positionid] = position

    def process_account_event(self, event: Event) -> None:
        """"""
        account = event.data
        self.accounts[account.vt_accountid] = account

    def process_contract_event(self, event: Event) -> None:
        """"""
        contract = event.data
        self.contracts[contract.vt_symbol] = contract

    def process_status_event(self, event: Event) -> None:   # 处理交易状态信息
        """"""
        status = event.data
        # print(f"【{datetime.now()} {status}】")
        self.trade_status_manager.save_status(status)

    def get_tick(self, vt_symbol: str) -> Optional[TickData]:
        """
        Get latest market tick data by vt_symbol.
        """
        return self.ticks.get(vt_symbol, None)

    def get_order(self, vt_orderid: str) -> Optional[OrderData]:
        """
        Get latest order data by vt_orderid.
        """
        return self.orders.get(vt_orderid, None)

    def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
        """
        Get trade data by vt_tradeid.
        """
        return self.trades.get(vt_tradeid, None)

    def get_position(self, vt_positionid: str) -> Optional[PositionData]:
        """
        Get latest position data by vt_positionid.
        """
        return self.positions.get(vt_positionid, None)

    def get_account(self, vt_accountid: str) -> Optional[AccountData]:
        """
        Get latest account data by vt_accountid.
        """
        return self.accounts.get(vt_accountid, None)

    def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
        """
        Get contract data by vt_symbol.
        """
        return self.contracts.get(vt_symbol, None)

    def get_all_ticks(self) -> List[TickData]:
        """
        Get all tick data.
        """
        return list(self.ticks.values())

    def get_all_orders(self) -> List[OrderData]:
        """
        Get all order data.
        """
        return list(self.orders.values())

    def get_all_trades(self) -> List[TradeData]:
        """
        Get all trade data.
        """
        return list(self.trades.values())

    def get_all_positions(self) -> List[PositionData]:
        """
        Get all position data.
        """
        return list(self.positions.values())

    def get_all_accounts(self) -> List[AccountData]:
        """
        Get all account data.
        """
        return list(self.accounts.values())

    def get_all_contracts(self) -> List[ContractData]:
        """
        Get all contract data.
        """
        return list(self.contracts.values())

    def get_status(self,vt_symbol:str) -> List[StatusData]:     # hxxjava debug
        """
        Get the vt_symbol's status data.
        """
        return self.trade_status_manager.get_status(vt_symbol)

    def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
        """
        Get all active orders by vt_symbol.

        If vt_symbol is empty, return all active orders.
        """
        if not vt_symbol:
            return list(self.active_orders.values())
        else:
            active_orders = [
                order
                for order in self.active_orders.values()
                if order.vt_symbol == vt_symbol
            ]
            return active_orders

5. 采用这样过滤脏数据的方法的好处

采用这样过滤脏数据的方法,可以从源头一次性地过滤掉脏数据,避免后面的各种应用中分别地对推送的行情数据进行过滤,应用策略无需再考虑脏数据的影响,逻辑简单,效率高。现在再回过头来看帖子 如何更有效地利用合约交易状态信息——交易状态信息管理器 里的方法,就有点效率低下了!

6. 其他接口怎么办?

本文只提供了CTP接口修改方法,没有涉及到其他类型的接口。可是作为一个完善的交易接口,通常都应该提供类似CTP接口中的类似合约交易状态通知的信息。采用的类似的方法也是可以办到的。

Administrator
avatar
加入于:
帖子: 4500
声望: 320

感谢分享!精华送上

Member
avatar
加入于:
帖子: 9
声望: 0

牛批啊兄dei,我一直治标不治本地定时运行一条SQL语句直接按交易时间删除的

Member
avatar
加入于:
帖子: 12
声望: 0

按照如上方式更替后,发现VN station直接启动不了,请问下什么原因?就是连用户名和密码界面都启动不了。
正在重装中……

Member
avatar
加入于:
帖子: 26
声望: 0

洋生 wrote:

按照如上方式更替后,发现VN station直接启动不了,请问下什么原因?就是连用户名和密码界面都启动不了。
正在重装中…

应该是替换的时候有语法错误 在cmd中用 python -m vnstation 启动可以看到报错的位置

Member
avatar
加入于:
帖子: 46
声望: 1

大佬求教,行情数据应该是在vnpy_ctp.gateway.ctp_gateway.CtpMdApi.onRtnDepthMarketData进来的,为什么不在ctp_gateway这里处理呢?

Member
avatar
加入于:
帖子: 20
声望: 0

报错了,这个left_alphas在哪里定义的?
description

description

Member
avatar
加入于:
帖子: 419
声望: 170
def left_alphas(instr:str):
    """ get lefe alphas of a string """
    ret_str = ''
    for s in instr:
        if s.isalpha():
            ret_str += s
        else:
            break
    return ret_str
Member
avatar
加入于:
帖子: 20
声望: 0

大神,增加函数定义后,继续报错
description
然后我直接用instrument = tick.symbol,跑通了,所以搞不懂left_alphas(tick.symbol)这个语句的作用是什么?求解惑

Member
avatar
加入于:
帖子: 20
声望: 0

还有一个问题,symbol,exchange = extract_vt_symbol(vt_symbol),这个 extract_vt_symbol函数在哪里定义?

Member
avatar
加入于:
帖子: 419
声望: 170

duke wrote:

还有一个问题,symbol,exchange = extract_vt_symbol(vt_symbol),这个 extract_vt_symbol函数在哪里定义?

这个在vnpy\trader\utility.py中,
引用方法:from vnpy.trader.utility import extract_vt_symbol

另外:遇到这种东西,应该是先搜索下吧,这个问题太基础了。

Member
avatar
加入于:
帖子: 20
声望: 0

调通了,但是在开盘时间段合约收不到订阅信息了,再次入坑,求大神帮忙。

description

Member
avatar
加入于:
帖子: 46
声望: 1

状态信息里面的instrument是品种的代码,就是没有期数的,例如“sp”。tick里面的是有期数的,例如“sp2205”。之前的代码里面相当于都统一成了“sp.SHFE”的形式。

duke wrote:

大神,增加函数定义后,继续报错
description
然后我直接用instrument = tick.symbol,跑通了,所以搞不懂left_alphas(tick.symbol)这个语句的作用是什么?求解惑

Member
avatar
加入于:
帖子: 46
声望: 1

left_alphas也可以自己写个正则

instrument = re.match(r"^[a-zA-Z]{1,3}", symbol).group()

warpgate wrote:

状态信息里面的instrument是品种的代码,就是没有期数的,例如“sp”。tick里面的是有期数的,例如“sp2205”。之前的代码里面相当于都统一成了“sp.SHFE”的形式。

duke wrote:

大神,增加函数定义后,继续报错
description
然后我直接用instrument = tick.symbol,跑通了,所以搞不懂left_alphas(tick.symbol)这个语句的作用是什么?求解惑

Member
avatar
加入于:
帖子: 20
声望: 0

warpgate wrote:

left_alphas也可以自己写个正则

instrument = re.match(r"^[a-zA-Z]{1,3}", symbol).group()

warpgate wrote:

状态信息里面的instrument是品种的代码,就是没有期数的,例如“sp”。tick里面的是有期数的,例如“sp2205”。之前的代码里面相当于都统一成了“sp.SHFE”的形式。

duke wrote:

大神,增加函数定义后,继续报错
description
然后我直接用instrument = tick.symbol,跑通了,所以搞不懂left_alphas(tick.symbol)这个语句的作用是什么?求解惑
明白了,谢谢

Member
avatar
加入于:
帖子: 20
声望: 0

duke wrote:

调通了,但是在开盘时间段合约收不到订阅信息了,再次入坑,求大神帮忙。

description

顶上去,求大神帮忙

Member
avatar
加入于:
帖子: 46
声望: 1

你把上面instrument的问题解决了就好了

Member
avatar
加入于:
帖子: 419
声望: 170

duke wrote:

大神,增加函数定义后,继续报错
description
然后我直接用instrument = tick.symbol,跑通了,所以搞不懂left_alphas(tick.symbol)这个语句的作用是什么?求解惑

left_alphas()是把提取合约的品种名称,如rb2201的品种是rb,TA201的品种是TA。
因为交易状态是按照品种发布的,而你订阅的tick中包含的是合约名称,所以要用
left_alphas()提取出品种名称,方便查找其当前的交易状态。

Member
avatar
加入于:
帖子: 20
声望: 0

在大神的亲自指导下,终于跑通了,离实用化更近一步,再次感谢!

Member
avatar
加入于:
帖子: 20
声望: 0

出现了新问题,跟踪行情,发现0点以后收不到交易状态,0点以前是正常的,al2202是交易到1点的,这是什么原因?

description

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】