如何更有效地利用合约交易状态信息——把集合竞价tick合并到开盘的K线中?
1. 合约集合竞价阶段的交易状态通知消息
1.1 合约集合竞价阶段
对于国内期货,通常,夜盘合约在20:55之后,日盘合约在8:55之后,合约进入集合竞价报单状态后,交易者就可以通过交易即可进行下单。下单后不会立即成交,而是经过交易所按照一定的规则进行匹配成交。集合竞价报单状态期间,可能会有tick推送给订阅的CTP客户端(如郑商所),但是此时的tick只有价格(last_price)没有成交量(volume=0);也可能没有任何tick推送给订阅的CTP客户端(如上期所)。
之后夜盘合约在20:59之后,日盘合约在8:59之后之后,合约进入集合竞价撮合阶段,交易所会通过行情接口推送1个tick到CTP客户端,之后会通过交易接口立即推送一次所有合约的连接的所有CTP客户端。此tick中包含的价格就是开盘价,成交量不为0。
1.2 集合竞价相关交易状态的提取
我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中介绍了如果提取合约交易状态信息。这些合约交易状态信息就包含了与合约集合竞价相关的几个状态,包括:集合竞价报单、集合竞价平衡(本人一直没有检测到)和集合竞价撮合几个交易状态。
1.3 目前的vnpy对集合竞价的处理是不完善的
因为vnpy对集合竞价产生的这个tick没有特别处理,把它看成是一个普通的tick了,所以如果你的策略中使用到n分钟周期K线的话,很可能产生一个只包含此tick的5分钟K线。
这和人们的通常习惯不符,也和通常的主流软件都不相同,也不符合集合竞价的原来本意不符,它应该被合并到包含开盘时刻的K线之中。
经过对合约交易状态的研究,发现合约交易状态信息恰好可以帮助我们正确处理集合竞价产生的这个tick,思路大致如下:
不失一般性,以5分钟K线为例:
- 集合竞价报单之后得到了开盘tick,按照正常逻辑产生新的5分钟K线
- 紧接着收到了集合竞价撮合状态,得到下一个交易状态next_status,如果tick价格没有触及涨停价或者跌停价,通常next_status为应该为连续竞价状态
- 不对当前K进行是否结束的判断,而立即把当前K线的时间戳修改为下一个交易状态的进入时间next_status.enter_time
- 随后市场进入连续竞价状态,收到新的连续竞价的tick之后,进行判断时当然也是当前的5分钟K线没有结束。
- 此新的5分钟K线就包含了该集合竞价产生的价格和成交量。
2. 缓存集合竞价阶段产生的tick等待开盘后再发送
2.1 缓存集合竞价阶段产生的开盘tick,等遇到连续竞价的一个tick再发送
修改vnpy\trader\engine.py中的OmsEngine的原始tick数据处理函数process_origin_tick_event()。
当收到集合竞价产生的开盘tick时,向tick中添加auction_tick属性并且赋值为True,缓存到self.ticks字典中,不转发该原始tick;收到其他连续竞价tick进行,检查self.ticks字典中是否存在该合约缓存的集合竞价产生的开盘tick,如果有则将缓存的开盘tick时间戳赋值为当前tick的时间戳减去1微秒,然后先发送开盘tick到消息队列,然后再发送当前tick到消息队列。代码如下:
代码如下:
def process_origin_tick_event(self,event: Event):#-> None: # hxxjava debug
""" 原始tick数据处理 """
tick:TickData = event.data
status:StatusData = self.trade_status_manager.get_tick_status(tick)
# hxxjava debug
if not status:
print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
return
# 有效交易状态
if status.instrument_status in VALID_TRADE_STATUSES:
# 这里是所有有效数据的发源地
# 处理行情和合约交易状态不同步的问题
relay_tick = False
tick_time = tick.datetime.strftime("%H%M%S.%f")
next_status = self.get_next_status(tick.vt_symbol)
if status.instrument_status in AUCTION_STATUS:
# 当前处在集合竞价阶段
if tick.volume == 0:
# 集合竞价保单的tick,放弃
return
if next_status.instrument_status == InstrumentStatus.CONTINOUS and \
tick_time >= next_status.enter_time:
# 连续竞价状态晚于其tick
relay_tick = True
else:
# 集合竞价产生的开盘tick,打上标记,缓存不发送
tick.aution_tick = True
self.ticks[tick.vt_symbol] = tick
else:
# 当前处在连续竞价阶段,转发tick
relay_tick = True
if relay_tick:
pre_tick = copy.copy(self.ticks.get(tick.vt_symbol,None)) # 保险起见
if pre_tick and hasattr(pre_tick,"auction_tick"):
# 在第一个连续性竞价tick之前,转发缓冲的集合竞价产生的开盘tick
pre_tick.datetime = tick.datetime - timedelta(microseconds=1)
self.event_engine.put(Event(EVENT_TICK, pre_tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, pre_tick))
self.event_engine.put(Event(EVENT_TICK, tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
else:
# 把集合竞价tick转发给订阅者
self.event_engine.put(Event(EVENT_AUCTION_TICK, tick))
self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
# else:
# print(f"{datetime.now()} 特别交易状态={status} {tick}")
2.2 完整的OmsEngine代码
class OmsEngine(BaseEngine):
"""
Provides order management system function for VN Trader.
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
# self.trade_status = TradeStatus(event_engine,6) # hxxjava add
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.active_orders: Dict[str, OrderData] = {}
self.trade_status_manager = TradeStatusManager(event_engine,30) # hxxjava debug
self.add_function()
self.register_event()
def add_function(self) -> None:
"""Add query function to main engine."""
self.main_engine.get_tick = self.get_tick
self.main_engine.get_order = self.get_order
self.main_engine.get_trade = self.get_trade
self.main_engine.get_position = self.get_position
self.main_engine.get_account = self.get_account
self.main_engine.get_contract = self.get_contract
self.main_engine.get_all_ticks = self.get_all_ticks
self.main_engine.get_all_orders = self.get_all_orders
self.main_engine.get_all_trades = self.get_all_trades
self.main_engine.get_all_positions = self.get_all_positions
self.main_engine.get_all_accounts = self.get_all_accounts
self.main_engine.get_all_contracts = self.get_all_contracts
self.main_engine.get_all_active_orders = self.get_all_active_orders
self.main_engine.get_status = self.get_status # hxxjava debug
self.main_engine.get_next_status = self.get_next_status # hxxjava debug
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event) # hxxjava debug
self.event_engine.register(EVENT_ORIGIN_TICK, self.process_origin_tick_event) # hxxjava debug
def process_origin_tick_event(self,event: Event):#-> None: # hxxjava debug
""" 原始tick数据处理 """
tick:TickData = event.data
status:StatusData = self.trade_status_manager.get_tick_status(tick)
# hxxjava debug
if not status:
print(f"{datetime.now()} {tick.vt_symbol} 还没有收到交易状态")
return
# 有效交易状态
if status.instrument_status in VALID_TRADE_STATUSES:
# 这里是所有有效数据的发源地
# 处理行情和合约交易状态不同步的问题
relay_tick = False
tick_time = tick.datetime.strftime("%H%M%S.%f")
next_status = self.get_next_status(tick.vt_symbol)
if status.instrument_status in AUCTION_STATUS:
# 当前处在集合竞价阶段
if tick.volume == 0:
# 集合竞价保单的tick,放弃
return
if next_status.instrument_status == InstrumentStatus.CONTINOUS and \
tick_time >= next_status.enter_time:
# 连续竞价状态晚于其tick
relay_tick = True
else:
# 集合竞价产生的开盘tick,打上标记,缓存不发送
tick.aution_tick = True
self.ticks[tick.vt_symbol] = tick
else:
# 当前处在连续竞价阶段,转发tick
relay_tick = True
if relay_tick:
pre_tick = copy.copy(self.ticks.get(tick.vt_symbol,None)) # 保险起见
if pre_tick and hasattr(pre_tick,"auction_tick"):
# 在第一个连续性竞价tick之前,转发缓冲的集合竞价产生的开盘tick
pre_tick.datetime = tick.datetime - timedelta(microseconds=1)
self.event_engine.put(Event(EVENT_TICK, pre_tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, pre_tick))
self.event_engine.put(Event(EVENT_TICK, tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
else:
# 把集合竞价tick转发给订阅者
self.event_engine.put(Event(EVENT_AUCTION_TICK, tick))
self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
# else:
# print(f"{datetime.now()} 特别交易状态={status} {tick}")
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
self.ticks[tick.vt_symbol] = tick
status = self.get_status(tick.vt_symbol)
if status.instrument_status in AUCTION_STATUS:
next_status = self.get_next_status(tick.vt_symbol)
print(f"{datetime.now()} 集合竞价状态: \n\t{status}\n\t{next_status}\n\t{tick}")
def process_order_event(self, event: Event) -> None:
""""""
order = event.data
self.orders[order.vt_orderid] = order
# If order is active, then update data in dict.
if order.is_active():
self.active_orders[order.vt_orderid] = order
# Otherwise, pop inactive order from in dict
elif order.vt_orderid in self.active_orders:
self.active_orders.pop(order.vt_orderid)
def process_trade_event(self, event: Event) -> None:
""""""
trade = event.data
self.trades[trade.vt_tradeid] = trade
def process_position_event(self, event: Event) -> None:
""""""
position = event.data
self.positions[position.vt_positionid] = position
def process_account_event(self, event: Event) -> None:
""""""
account = event.data
self.accounts[account.vt_accountid] = account
def process_contract_event(self, event: Event) -> None:
""""""
contract = event.data
self.contracts[contract.vt_symbol] = contract
def process_status_event(self, event: Event) -> None: # hxxjava debug
""" 交易状态通知消息处理 """
status:StatusData = event.data
self.trade_status_manager.save_status(status)
print(f"【{datetime.now()} {status}】")
def get_tick(self, vt_symbol: str) -> Optional[TickData]:
"""
Get latest market tick data by vt_symbol.
"""
return self.ticks.get(vt_symbol, None)
def get_order(self, vt_orderid: str) -> Optional[OrderData]:
"""
Get latest order data by vt_orderid.
"""
return self.orders.get(vt_orderid, None)
def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
"""
Get trade data by vt_tradeid.
"""
return self.trades.get(vt_tradeid, None)
def get_position(self, vt_positionid: str) -> Optional[PositionData]:
"""
Get latest position data by vt_positionid.
"""
return self.positions.get(vt_positionid, None)
def get_account(self, vt_accountid: str) -> Optional[AccountData]:
"""
Get latest account data by vt_accountid.
"""
return self.accounts.get(vt_accountid, None)
def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
"""
Get contract data by vt_symbol.
"""
return self.contracts.get(vt_symbol, None)
def get_all_ticks(self) -> List[TickData]:
"""
Get all tick data.
"""
return list(self.ticks.values())
def get_all_orders(self) -> List[OrderData]:
"""
Get all order data.
"""
return list(self.orders.values())
def get_all_trades(self) -> List[TradeData]:
"""
Get all trade data.
"""
return list(self.trades.values())
def get_all_positions(self) -> List[PositionData]:
"""
Get all position data.
"""
return list(self.positions.values())
def get_all_accounts(self) -> List[AccountData]:
"""
Get all account data.
"""
return list(self.accounts.values())
def get_all_contracts(self) -> List[ContractData]:
"""
Get all contract data.
"""
return list(self.contracts.values())
def get_status(self,vt_symbol:str) -> Optional[StatusData]: # hxxjava debug
"""
Get the vt_symbol's status data.
"""
return self.trade_status_manager.get_status(vt_symbol)
def get_next_status(self,vt_symbol:str) -> Optional[StatusData]: # hxxjava debug
"""
Get the vt_symbol's status data.
"""
return self.trade_status_manager.get_next_status(vt_symbol)
def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
"""
Get all active orders by vt_symbol.
If vt_symbol is empty, return all active orders.
"""
if not vt_symbol:
return list(self.active_orders.values())
else:
active_orders = [
order
for order in self.active_orders.values()
if order.vt_symbol == vt_symbol
]
return active_orders
3. 对竞价竞价tick的特别转发
3.1 各种交易状态下处理方法的分析
按照上面的处理,我们就把从CTP接口推送来的tick进行来过滤。
- 对非交易状态的下的tick进行丢弃处理
- 对集合保单状态下的tick(有的市场有推送,如CZCE,有的则没有推送)也进行丢弃处理。因为它的成交量为0,所有只有价格可以使用,其他信息无法使用。如果你想观看集合竞价报道阶段的价格变化情况,可以另外添加集合竞价消息类型对其进行特别包装处理发送。
- 对集合竞价撮合状态产生的开盘tick进行缓存和延后处理,恰好满足了我们把该tick合并到连续竞价时段中,这样我们的K线就可以包含该tick的各个价格和成交量等信息了。但是可能在20:59分钟的时候,可能暂时看不到该tick的信息。如果想知道该信息,可以另外添加集合竞价消息类型对其进行特别包装处理发送。
3.2 增加集合竞价消息类型
3.2.1 增加集合竞价消息类型
在vnpy\trader\event.py文件中添加集合竞价tick数据消息
EVENT_AUCTION_TICK = "eAuctionTick." # 集合竞价tick数据消息 hxxjava debug
3.2.2 修改主界面行情列表控件
修改vnpy\trader\ui\widget.py中的BaseMonitor的消息订阅函数,使得一个控件可以订阅多个消息类型,条件是数据是相同格式的:
def register_event(self) -> None:
"""
Register event handler into event engine.
"""
if self.event_type:
self.signal.connect(self.process_event)
if type(self.event_type) == list: # hxxjava debug
for ev in self.event_type:
self.event_engine.register(ev, self.signal.emit)
# print(f"已经订阅 {ev} 消息")
else:
self.event_engine.register(self.event_type, self.signal.emit)
修改vnpy\trader\ui\widget.py中的TickMonitor的消息类型:
class TickMonitor(BaseMonitor):
"""
Monitor for tick data.
"""
# event_type = EVENT_TICK
event_type = [EVENT_TICK,EVENT_AUCTION_TICK] # 这里的消息类型不再是一种,而是一个列表
data_key = "vt_symbol"
sorting = True
headers = {
"symbol": {"display": "代码", "cell": BaseCell, "update": False},
"exchange": {"display": "交易所", "cell": EnumCell, "update": False},
"name": {"display": "名称", "cell": BaseCell, "update": True},
"last_price": {"display": "最新价", "cell": BaseCell, "update": True},
"volume": {"display": "成交量", "cell": BaseCell, "update": True},
"open_price": {"display": "开盘价", "cell": BaseCell, "update": True},
"high_price": {"display": "最高价", "cell": BaseCell, "update": True},
"low_price": {"display": "最低价", "cell": BaseCell, "update": True},
"bid_price_1": {"display": "买1价", "cell": BidCell, "update": True},
"bid_volume_1": {"display": "买1量", "cell": BidCell, "update": True},
"ask_price_1": {"display": "卖1价", "cell": AskCell, "update": True},
"ask_volume_1": {"display": "卖1量", "cell": AskCell, "update": True},
"datetime": {"display": "时间", "cell": TimeCell, "update": True},
"gateway_name": {"display": "接口", "cell": BaseCell, "update": False},
}
至此,已经完成因为集合竞价过滤对行情显示控件的兼容了!!!
4. “如何更有效地利用合约交易状态信息”系列已经完成
经过近2个月的编写、调试、观察、再调试,“如何更有效地利用合约交易状态信息”系列已经基本完成,
也基本完成了我当时对CTP接口中合约交易状态通知接口可能的几大用功能的实现。
- 可以24小时开机,无需担脏数据的干扰和威胁,防止你的策略误动作。
- 同时它还可以帮助您产生正确的K线。
- 尽可能地防止你的策略在非连续交易时段中交易。
我想这对任何程序化交易者都是个好消息!如果您也想这么做,请翻阅我这个系列的帖子,希望能够帮到您!