如何更有效地利用合约交易状态信息——拒绝CTP接口中的脏数据。
1. 任何接口中都会有脏数据,CTP接口也不例外
不知道您是否发现这些情况:
- 明明已经休市或者收盘了,可是行情列表中的合约数据还在疯狂地涌来,如果此时你的策略还在运行状态,那会发生什么?
- 市场中许多人使用数据记录器(DataRecorder)进行合约数据的录制,经常发现录制到的结果里有许多莫名其妙的数据!
- 于是我们会说,尽量要靠近开盘再启动程序或者策略,一旦收盘就关闭vnpy系统,这样就可以避免被接口中的脏数据干扰和影响。
这种情况不只是使用CTP接口的交易者会遇到,其他接口也是一样。
2. CTP接口本身已经有完善的机制
为了防止客户端接收到脏数据,CTP接口会在某个交易时间端的开始结束时,在公共流中播发所有品种的合约品种的交易状态通知,只是vnpy系统没有使用。
交易状态通知的格式见帖子 如何更有效地利用合约交易状态信息——交易状态信息管理器。
3. 拒绝CTP接口中的脏数据的方法
系统连接接口后,订阅的行情数据就会从CTP网关的MdApi接口中推送给客户端。目前vnpy在收到tick数据时没有任何有效性判断,就直接推送给了各种订阅者了,而这正是脏数据的来由!
正确的做法是:客户端就在收到tick数据的时候,根据合约品种的交易状态判断该数据是是否为有效数据,如果合约的状态是开盘前、非交易或者是收盘状态,则将该tick丢弃。
这样就可以杜绝脏数据对交易者、各种应用策略或数据记录器的影响。
4. 代码实现
4.1 实现交易状态管理器
交易状态管理器的功能及实现代码详见 如何更有效地利用合约交易状态信息——交易状态信息管理器 ,这里就不再提供了。
4.2 添加EVENT_ORIGIN_TICK消息类型
在vnpy\trader\event.py文件中增加下面的消息定义:
EVENT_ORIGIN_TICK = "eOriginTick." # hxxjava debug
4.3 修改网关Gateway的on_tick()函数
在vnpy\trader\gateway.py文件中,将所有网关的父类Gateway的on_tick()函数做如下修改:
def on_tick(self, tick: TickData) -> None:
"""
Tick event push.
Tick event of a specific vt_symbol is also pushed.
"""
# self.on_event(EVENT_TICK, tick)
# self.on_event(EVENT_TICK + tick.vt_symbol, tick)
self.on_event(EVENT_ORIGIN_TICK, tick)
4.4 修改OmsManager
class OmsEngine(BaseEngine):
"""
Provides order management system function for VN Trader.
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
# self.trade_status = TradeStatus(event_engine,6) # hxxjava add
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.active_orders: Dict[str, OrderData] = {}
self.trade_status_manager = TradeStatusManager(event_engine,30) # 创建交易状态管理器
self.add_function()
self.register_event()
def add_function(self) -> None:
"""Add query function to main engine."""
self.main_engine.get_tick = self.get_tick
self.main_engine.get_order = self.get_order
self.main_engine.get_trade = self.get_trade
self.main_engine.get_position = self.get_position
self.main_engine.get_account = self.get_account
self.main_engine.get_contract = self.get_contract
self.main_engine.get_all_ticks = self.get_all_ticks
self.main_engine.get_all_orders = self.get_all_orders
self.main_engine.get_all_trades = self.get_all_trades
self.main_engine.get_all_positions = self.get_all_positions
self.main_engine.get_all_accounts = self.get_all_accounts
self.main_engine.get_all_contracts = self.get_all_contracts
self.main_engine.get_all_active_orders = self.get_all_active_orders
self.main_engine.get_status = self.get_status # hxxjava debug
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event) # 订阅合约交易状态数据
self.event_engine.register(EVENT_ORIGIN_TICK, self.process_origin_tick_event) # 订阅原始行情数据
def process_origin_tick_event(self,event: Event):#-> None: # 处理原始行情数据
""" 对原始tick数据进行有效性判断和处理 """
tick = event.data
status:StatusData = self.trade_status_manager.get_tick_status(tick)
if not status:
print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
return
# 有效交易状态
valid_statuses = [
InstrumentStatus.CONTINOUS,
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
if status.instrument_status in valid_statuses:
# 这里是所有有效数据的发源地
self.event_engine.put(Event(EVENT_TICK, tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
else:
print(f"{datetime.now()} 特别交易状态={status} {tick}")
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
self.ticks[tick.vt_symbol] = tick
def process_order_event(self, event: Event) -> None:
""""""
order = event.data
self.orders[order.vt_orderid] = order
# If order is active, then update data in dict.
if order.is_active():
self.active_orders[order.vt_orderid] = order
# Otherwise, pop inactive order from in dict
elif order.vt_orderid in self.active_orders:
self.active_orders.pop(order.vt_orderid)
def process_trade_event(self, event: Event) -> None:
""""""
trade = event.data
self.trades[trade.vt_tradeid] = trade
def process_position_event(self, event: Event) -> None:
""""""
position = event.data
self.positions[position.vt_positionid] = position
def process_account_event(self, event: Event) -> None:
""""""
account = event.data
self.accounts[account.vt_accountid] = account
def process_contract_event(self, event: Event) -> None:
""""""
contract = event.data
self.contracts[contract.vt_symbol] = contract
def process_status_event(self, event: Event) -> None: # 处理交易状态信息
""""""
status = event.data
# print(f"【{datetime.now()} {status}】")
self.trade_status_manager.save_status(status)
def get_tick(self, vt_symbol: str) -> Optional[TickData]:
"""
Get latest market tick data by vt_symbol.
"""
return self.ticks.get(vt_symbol, None)
def get_order(self, vt_orderid: str) -> Optional[OrderData]:
"""
Get latest order data by vt_orderid.
"""
return self.orders.get(vt_orderid, None)
def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
"""
Get trade data by vt_tradeid.
"""
return self.trades.get(vt_tradeid, None)
def get_position(self, vt_positionid: str) -> Optional[PositionData]:
"""
Get latest position data by vt_positionid.
"""
return self.positions.get(vt_positionid, None)
def get_account(self, vt_accountid: str) -> Optional[AccountData]:
"""
Get latest account data by vt_accountid.
"""
return self.accounts.get(vt_accountid, None)
def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
"""
Get contract data by vt_symbol.
"""
return self.contracts.get(vt_symbol, None)
def get_all_ticks(self) -> List[TickData]:
"""
Get all tick data.
"""
return list(self.ticks.values())
def get_all_orders(self) -> List[OrderData]:
"""
Get all order data.
"""
return list(self.orders.values())
def get_all_trades(self) -> List[TradeData]:
"""
Get all trade data.
"""
return list(self.trades.values())
def get_all_positions(self) -> List[PositionData]:
"""
Get all position data.
"""
return list(self.positions.values())
def get_all_accounts(self) -> List[AccountData]:
"""
Get all account data.
"""
return list(self.accounts.values())
def get_all_contracts(self) -> List[ContractData]:
"""
Get all contract data.
"""
return list(self.contracts.values())
def get_status(self,vt_symbol:str) -> List[StatusData]: # hxxjava debug
"""
Get the vt_symbol's status data.
"""
return self.trade_status_manager.get_status(vt_symbol)
def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
"""
Get all active orders by vt_symbol.
If vt_symbol is empty, return all active orders.
"""
if not vt_symbol:
return list(self.active_orders.values())
else:
active_orders = [
order
for order in self.active_orders.values()
if order.vt_symbol == vt_symbol
]
return active_orders
5. 采用这样过滤脏数据的方法的好处
采用这样过滤脏数据的方法,可以从源头一次性地过滤掉脏数据,避免后面的各种应用中分别地对推送的行情数据进行过滤,应用策略无需再考虑脏数据的影响,逻辑简单,效率高。现在再回过头来看帖子 如何更有效地利用合约交易状态信息——交易状态信息管理器 里的方法,就有点效率低下了!
6. 其他接口怎么办?
本文只提供了CTP接口修改方法,没有涉及到其他类型的接口。可是作为一个完善的交易接口,通常都应该提供类似CTP接口中的类似合约交易状态通知的信息。采用的类似的方法也是可以办到的。