说明:本帖只讨论了如何解决CTP接口中,集合竞价后立即下单被拒的问题
对集合竞价不太了解的童鞋们可以参阅 期货集合竞价时间期货集合竞价规则
1. 集合竞价之后开盘价往往是策略频繁出现信号的时候
做量化交易的都知道,在各个交易日的前都有集合竞价时段,集合竞价产生该交易日的开盘价。这个开盘价格是自上一个交易日结束至今所有市场因素对交易对象价格影响的集中体现,这些因素既包括基本面、技术面和消息面等因素!
行情也往往出现这些时候,尤其在经过一个交叉的节假日或者突发利空或者利多消息出现的时候更是如此!
2. 在收到集合竞价之后的开盘价下单会被拒
行情出现了,当然策略就会发出信号,进而下单交易。
可是当策略收到收到开盘价的时候,市场还处在集合竞价阶段,此时下单将会被拒(相信大家都遇到过),而这个问题目前vnpy并没有解决。
如果是手工操盘,当然没有问题,可是我们的通过使用策略或者算法来交易的,怎么可以不解决这个问题 ?!!!
2.1 解决因为开盘价下单会被拒问题的思路
此时下单会被拒主要是因为市场还处在集合竞价状态,我们应该选择在连续竞价状态时下单即可。
就这么简单,可是实现起来有点儿复杂。
你可以把下单的逻辑简单地规定为21:00~23:00,9:00~11:30和13:30~15:00这几个时段才可以下单,其他时段禁止下单。
这倒是也可以解决一部分问题,但这样总会顾头不顾尾,很多品种你是无法处理的!
3. 新版的CTP接口协议中增加了合约交易状态通知
3.1 合约交易状态通知推送接口函数
下面是合约交易状态通知推送接口函数和合约交易状态通知数据结构。
合约交易状态通知,主动推送。私有流回报。
◇ 1.函数原型virtual void OnRtnInstrumentStatus(CThostFtdcInstrumentStatusField *pInstrumentStatus) {};
回到顶部
◇ 2.参数pInstrumentStatus:合约状态
struct CThostFtdcInstrumentStatusField
{
///交易所代码
TThostFtdcExchangeIDType ExchangeID;
///合约在交易所的代码
TThostFtdcExchangeInstIDType ExchangeInstID;
///结算组代码
TThostFtdcSettlementGroupIDType SettlementGroupID;
///合约代码
TThostFtdcInstrumentIDType InstrumentID;
///合约交易状态
TThostFtdcInstrumentStatusType InstrumentStatus;
///交易阶段编号
TThostFtdcTradingSegmentSNType TradingSegmentSN;
///进入本状态时间
TThostFtdcTimeType EnterTime;
///进入本状态原因
TThostFtdcInstStatusEnterReasonType EnterReason;
};
◇ 3.返回无
3.2 下面合约当前所处的交易状态C语言类型和枚举值
/////////////////////////////////////////////////////////////////////////
///TFtdcInstrumentStatusType是一个合约交易状态类型
/////////////////////////////////////////////////////////////////////////
///开盘前
#define THOST_FTDC_IS_BeforeTrading '0'
///非交易
#define THOST_FTDC_IS_NoTrading '1'
///连续交易
#define THOST_FTDC_IS_Continous '2'
///集合竞价报单
#define THOST_FTDC_IS_AuctionOrdering '3'
///集合竞价价格平衡
#define THOST_FTDC_IS_AuctionBalance '4'
///集合竞价撮合
#define THOST_FTDC_IS_AuctionMatch '5'
///收盘
#define THOST_FTDC_IS_Closed '6'
typedef char TThostFtdcInstrumentStatusType;
4. 解决CTP接口中集合竞价后立即下单被拒问题的实现步骤
4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:
@dataclass
class StatusData(BaseData):
"""
合约品种状态信息 hxxjava debug
"""
# 合约代码
symbol:str
# 交易所代码
exchange : Exchange
# 结算组代码
settlement_group_id : str = ""
# 合约交易状态
instrument_status : InstrumentStatus = None
# 交易阶段编号
trading_segment_sn : int = None
# 进入本状态时间
enter_time : str = ""
# 进入本状态原因
enter_reason : str = ""
# 合约在交易所的代码
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def belongs_to(self,vt_symbol:str):
""" 判断状态是否属于一个合约 """
symbol,exchange_str = vt_symbol.split(".")
instrument = left_alphas(symbol).upper()
return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)
在vnpy\trader\event.py中增加交易状态消息类型
EVENT_STATUS = "eStatus" # hxxjava debug
4.2 为gateway添加on_status()函数
在vnpy\trader\gateway.py中修改网关父类gateway,为其增加on_status()函数:
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
4.3 为CTP接口中的TdApi接口
在vnpy_ctp\gateway\ctp_gateway.py中为CtpTdApi增加交易状态通知推送接口函数onRtnInstrumentStatus():
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约品种状态信息 # hxxjava debug
"""
if data:
# print(f"【data={data}】")
status = StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
# print(f"status={status}")
self.gateway.on_status(status)
4.3.1 别忘了其中用到的两个映射字典:
映射字典INSTRUMENTSTATUS_CTP2VT:
# 品种状态进入原因映射 hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
"0": InstrumentStatus.BEFORE_TRADING,
"1": InstrumentStatus.NO_TRADING,
"2": InstrumentStatus.CONTINOUS,
"3": InstrumentStatus.AUCTION_ORDERING,
"4": InstrumentStatus.AUCTION_BALANCE,
"5": InstrumentStatus.AUCTION_MATCH,
"6": InstrumentStatus.CLOSE,
"7": InstrumentStatus.CLOSE
}
映射字典ENTERREASON_CTP2VT:
# 品种状态进入原因映射 hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
"1": StatusEnterReason.AUTOMATIC,
"2": StatusEnterReason.MANUAL,
"3": StatusEnterReason.FUSE
}
4.4 将交易状态通知保存在OmsEngine中
修改vnpy\trader\engine.py中的OmsEngine,修改如下:
class OmsEngine(BaseEngine):
"""
Provides order management system function for VN Trader.
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.statuses: Dict[str, StatusData] = {} # hxxjava debug
self.active_orders: Dict[str, OrderData] = {}
self.add_function()
self.register_event()
def add_function(self) -> None:
"""Add query function to main engine."""
self.main_engine.get_tick = self.get_tick
self.main_engine.get_order = self.get_order
self.main_engine.get_trade = self.get_trade
self.main_engine.get_position = self.get_position
self.main_engine.get_account = self.get_account
self.main_engine.get_contract = self.get_contract
self.main_engine.get_all_ticks = self.get_all_ticks
self.main_engine.get_all_orders = self.get_all_orders
self.main_engine.get_all_trades = self.get_all_trades
self.main_engine.get_all_positions = self.get_all_positions
self.main_engine.get_all_accounts = self.get_all_accounts
self.main_engine.get_all_contracts = self.get_all_contracts
self.main_engine.get_all_active_orders = self.get_all_active_orders
self.main_engine.get_all_statuses = self.get_all_statuses # hxxjava debug
self.main_engine.get_status = self.get_status # hxxjava debug
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event) # hxxjava debug
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
self.ticks[tick.vt_symbol] = tick
def process_order_event(self, event: Event) -> None:
""""""
order = event.data
self.orders[order.vt_orderid] = order
# If order is active, then update data in dict.
if order.is_active():
self.active_orders[order.vt_orderid] = order
# Otherwise, pop inactive order from in dict
elif order.vt_orderid in self.active_orders:
self.active_orders.pop(order.vt_orderid)
def process_trade_event(self, event: Event) -> None:
""""""
trade = event.data
self.trades[trade.vt_tradeid] = trade
def process_position_event(self, event: Event) -> None:
""""""
position = event.data
self.positions[position.vt_positionid] = position
def process_account_event(self, event: Event) -> None:
""""""
account = event.data
self.accounts[account.vt_accountid] = account
def process_contract_event(self, event: Event) -> None:
""""""
contract = event.data
self.contracts[contract.vt_symbol] = contract
def process_status_event(self, event: Event) -> None: # hxxjava debug
""""""
status = event.data
# print(f"got a status = {status}")
self.statuses[status.vt_symbol] = status
def get_tick(self, vt_symbol: str) -> Optional[TickData]:
"""
Get latest market tick data by vt_symbol.
"""
return self.ticks.get(vt_symbol, None)
def get_order(self, vt_orderid: str) -> Optional[OrderData]:
"""
Get latest order data by vt_orderid.
"""
return self.orders.get(vt_orderid, None)
def get_trade(self, vt_tradeid: str) -> Optional[TradeData]:
"""
Get trade data by vt_tradeid.
"""
return self.trades.get(vt_tradeid, None)
def get_position(self, vt_positionid: str) -> Optional[PositionData]:
"""
Get latest position data by vt_positionid.
"""
return self.positions.get(vt_positionid, None)
def get_account(self, vt_accountid: str) -> Optional[AccountData]:
"""
Get latest account data by vt_accountid.
"""
return self.accounts.get(vt_accountid, None)
def get_contract(self, vt_symbol: str) -> Optional[ContractData]:
"""
Get contract data by vt_symbol.
"""
return self.contracts.get(vt_symbol, None)
def get_all_ticks(self) -> List[TickData]:
"""
Get all tick data.
"""
return list(self.ticks.values())
def get_all_orders(self) -> List[OrderData]:
"""
Get all order data.
"""
return list(self.orders.values())
def get_all_trades(self) -> List[TradeData]:
"""
Get all trade data.
"""
return list(self.trades.values())
def get_all_positions(self) -> List[PositionData]:
"""
Get all position data.
"""
return list(self.positions.values())
def get_all_accounts(self) -> List[AccountData]:
"""
Get all account data.
"""
return list(self.accounts.values())
def get_all_contracts(self) -> List[ContractData]:
"""
Get all contract data.
"""
return list(self.contracts.values())
def get_all_statuses(self) -> List[StatusData]: # hxxjava debug
"""
Get all status data.
"""
return list(self.statuses.values())
def get_status(self,vt_symbol:str) -> List[StatusData]: # hxxjava debug
"""
Get the vt_symbol's status data.
"""
symbol,exchange_str = vt_symbol.split('.')
instrment = left_alphas(symbol)
instrment_vt_symbol = f"{instrment}.{exchange_str}"
return self.statuses.get(instrment_vt_symbol,None)
def get_all_active_orders(self, vt_symbol: str = "") -> List[OrderData]:
"""
Get all active orders by vt_symbol.
If vt_symbol is empty, return all active orders.
"""
if not vt_symbol:
return list(self.active_orders.values())
else:
active_orders = [
order
for order in self.active_orders.values()
if order.vt_symbol == vt_symbol
]
return active_orders
5. 如何使用交易状态通知相关函数?
5.1 修改CTA交易策略
5.1.1 修改交易模板CtaTemplate
修改vnpy\app\cta_strategy\template.py,增加得到交易合约当前交易状态的函数get_trade_status():
def get_trade_status(self):
"""
得到交易合约当前交易状态
"""
main_engine = self.cta_engine.main_engine
return main_engine.get_status(self.vt_symbol)
5.1.2 修改您的CTA策略下单控制
在你的策略中调用buy()、sell()、short()、cover()或者send_order()下单函数之前,使用类似这样的判断:
下面使用某个CTA策略的on_bar()函数的来说明对交易状态的使用。
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
atr_array = am.atr(self.atr_length, array=True)
self.atr_value = atr_array[-1]
self.atr_ma = atr_array[-self.atr_ma_length:].mean()
self.rsi_value = am.rsi(self.rsi_length)
if self.get_trade_status() != InstrumentStatus.CONTINOUS:
# 注意:这里,如果当前合约不在连续竞价状态,放弃下面的信号计算和下单操作!!!
self.put_event()
return
if self.pos == 0:
self.intra_trade_high = bar.high_price
self.intra_trade_low = bar.low_price
if self.atr_value > self.atr_ma:
if self.rsi_value > self.rsi_buy:
self.buy(bar.close_price + 5, self.fixed_size)
elif self.rsi_value < self.rsi_sell:
self.short(bar.close_price - 5, self.fixed_size)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
self.intra_trade_low = bar.low_price
long_stop = self.intra_trade_high * \
(1 - self.trailing_percent / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
self.intra_trade_high = bar.high_price
short_stop = self.intra_trade_low * \
(1 + self.trailing_percent / 100)
self.cover(short_stop, abs(self.pos), stop=True)
self.put_event()
5.2 修改价差交易相关模板及价差交易策略
5.2.1 修改SpreadAlgoTemplate
修改vnpy\app\spread_trading\template.py中的SpreadAlgoTemplate模板,增加get_legs_status()读取价差算法的各个腿当前交易状态:
def get_legs_status(self):
"""
得到当前策略所交易的价差交易状态 # hxxjava add
"""
legs_status:Dict[str:InstrumentStatus] = {}
main_engine = self.algo_engine.main_engine
for leg in self.spread.legs.values():
astatus:StatusData = main_engine.get_status(leg.vt_symbol)
legs_status[leg.vt_symbol] = astatus.instrument_status
return legs_status
5.2.2 修改SpreadStrategyTemplate
修改vnpy\app\spread_trading\template.py中的SpreadStrategyTemplate模板,增加get_legs_status()读取价差交易策略的各个腿当前交易状态:
def get_legs_status(self):
"""
得到当前策略所交易的价差交易状态 # hxxjava add
"""
legs_status:Dict[str:InstrumentStatus] = {}
main_engine = self.strategy_engine.main_engine
for leg in self.spread.legs.values():
astatus:StatusData = main_engine.get_status(leg.vt_symbol)
legs_status[leg.vt_symbol] = astatus.instrument_status
return legs_status
5.2.3 修改您的价差交易策略下单控制
在你的价差交易策略start_long_algo()和start_short_algo()下单函数之前,使用类似这样的判断:
下面使用某个价差交易策略的on_spread_data()函数的来说明对交易状态的使用,交易指令部分无需纠结从哪里来,只是举例。
def on_spread_data(self):
"""
Callback when spread price is updated.
"""
tick = self.get_spread_tick()
self.bg.update_tick(tick)
# 得到价差交易策略的各个腿的交易状态
legs_status = list(self.get_legs_status().values())
# 这里要求价差交易策略的每个腿的交易状态均为连续竞价状态
is_continous_status = legs_status.count(InstrumentStatus.CONTINOUS) == len(self.spread.legs)
if self.trading and is_continous_status:
# 这里是交易指令,无需纠结代码从哪里来
pc = PriceData(sponsor=self.strategy_name,price_name=self.spread_name,price=tick.last_price)
self.event_engine.put(Event(EVENT_PRICE_CHANGE,data=pc))
5.3 其他模块如何修改?
如果您感兴趣,其他的模块,如算法交易模块algo、投资组合模块PortfolioStrategy等,在对交易状态信息的使用,可以参考上述的CTA策略米筐和价差交易模块的做法,应该不难。
也许您也和我一样饱受这个问题的困扰,希望本人的辛苦付出能够帮到你!