1. IF2107的集合竞价
1.1 修改策略的on_tick(),以便输出集合竞价tick数据
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
tick_time = tick.datetime.time()
time1 = time(20,55,0)
time2 = time(21,0,1)
if time1 <= tick_time <= time2:
print(f"【集合竞价数据 tick_time={tick_time} tick={tick}】")
if ("IF" in tick.vt_symbol) and ("CFFEX" in tick.vt_symbol):
time1 = time(9,25,0)
time2 = time(9,30,1)
if time1 <= tick_time <= time2:
print(f"【集合竞价数据 tick_time={tick_time} tick={tick}】")
super().on_tick(tick)
1.2 集合竞价tick数据
【集合竞价tick数据
tick_time=09:29:00.500000
tick=TickData(
gateway_name='CTP',
symbol='IF2107',
exchange=<Exchange.CFFEX: 'CFFEX'>,
datetime=datetime.datetime(2021, 6, 9, 9, 29, 0, 500000, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>),
name='沪深300指数2107',
volume=7,
open_interest=22276.0,
last_price=5183.8,
last_volume=0,
limit_up=5698.200000000001,
limit_down=4662.2,
open_price=5183.8,
high_price=5183.8,
low_price=5183.8,
pre_close=5184.0,
bid_price_1=5180.2,
bid_price_2=0,
bid_price_3=0,
bid_price_4=0,
bid_price_5=0,
ask_price_1=5184.0,
ask_price_2=0,
ask_price_3=0,
ask_price_4=0,
ask_price_5=0,
bid_volume_1=4,
bid_volume_2=0,
bid_volume_3=0,
bid_volume_4=0,
bid_volume_5=0,
ask_volume_1=3,
ask_volume_2=0,
ask_volume_3=0,
ask_volume_4=0,
ask_volume_5=0)】
1.3 开市后第一秒的tick数据
1.3.1 开市后第一秒的第一个tick数据
tick_time=09:30:00.500000
tick=TickData(
gateway_name='CTP',
symbol='IF2107',
exchange=<Exchange.CFFEX: 'CFFEX'>,
datetime=datetime.datetime(2021, 6, 9, 9, 30, 0, 500000, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>),
name='沪深300指数2107',
volume=10,
open_interest=22274.0,
last_price=5180.2,
last_volume=0,
limit_up=5698.200000000001,
limit_down=4662.2,
open_price=5183.8,
high_price=5183.8,
low_price=5180.2,
pre_close=5184.0,
bid_price_1=5180.2,
bid_price_2=0,
bid_price_3=0,
bid_price_4=0,
bid_price_5=0,
ask_price_1=5180.4,
ask_price_2=0,
ask_price_3=0,
ask_price_4=0,
ask_price_5=0,
bid_volume_1=2,
bid_volume_2=0,
bid_volume_3=0,
bid_volume_4=0,
bid_volume_5=0,
ask_volume_1=1,
ask_volume_2=0,
ask_volume_3=0,
ask_volume_4=0,
ask_volume_5=0)】
1.3.2 开市后第一秒的第二个tick数据
tick_time=09:30:01
tick=TickData(
gateway_name='CTP',
symbol='IF2107',
exchange=<Exchange.CFFEX: 'CFFEX'>,
datetime=datetime.datetime(2021, 6, 9, 9, 30, 1, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>),
name='沪深300 指数2107',
volume=13,
open_interest=22272.0,
last_price=5180.2,
last_volume=0,
limit_up=5698.200000000001,
limit_down=4662.2,
open_price=5183.8,
high_price=5183.8,
low_price=5180.2,
pre_close=5184.0,
bid_price_1=5177.4,
bid_price_2=0,
bid_price_3=0,
bid_price_4=0,
bid_price_5=0,
ask_price_1=5179.2,
ask_price_2=0,
ask_price_3=0,
ask_price_4=0,
ask_price_5=0,
bid_volume_1=5,
bid_volume_2=0,
bid_volume_3=0,
bid_volume_4=0,
bid_volume_5=0,
ask_volume_1=1,
ask_volume_2=0,
ask_volume_3=0,
ask_volume_4=0,
ask_volume_5=0)】
2. 深交所的股票的集合竞价——有意思!
2.1 白云机场的1分钟K线图:
以白云机场为例,它的集合竞价是每个交易日的14:56-14:59,但是它这三分钟却只形成1根1分钟K线!
下面是白云机场的1分钟K线图:
2.2 1分钟K线的代表的交易时间长度不一定是1分钟
由上图可知:
深交所的股票在集合竞价阶段1分钟K线就是3分钟时长,其它时段的又是1分钟;
上交所的股票在集合竞价阶段是没有1分钟K线的,它发生在9:25-9:29,却被合并到开市后的第一根1分钟K线中,也就是说这根K线代表的交易时长为5分钟。
国内期货日盘合约的集合竞价阶段发生在上午开市前5分钟的前4分钟内,却被合并到开市后的第一根1分钟K线中,也就是说这根K线代表的交易时长为5分钟。
国内期货夜盘合约的集合竞价阶段发生在前一交易日的夜间20:55-21:00的前4分钟内,却被合并到开市后的第一根1分钟K线中,也就是说这根K线代表的交易时长为5分钟。
3. 问题:vnpy的BarGenerator如何正确处理这些集合竞价的tick数据?
3.1 目前的BarGenerator无法正确处理这些集合竞价的tick数据。
这个问题我之前已经有文章讨论过了,需要修改是肯定的!
3.2 正确处理这些集合竞价的tick数据需要那些条件?
- 交易时间段
- 集合竞价时段
- 集合竞价的数据应该合并到开市后第一根K线,还是独立存在?
4. 被vnpy忽视的CTP接口功能:合约交易状态
4.1 合约交易状态即进入原因定义
位于vnpy_ctp\api\include\ctp\ThostFtdcUserApiDataType.h
/////////////////////////////////////////////////////////////////////////
///TFtdcInstrumentStatusType是一个合约交易状态类型
/////////////////////////////////////////////////////////////////////////
///开盘前
#define THOST_FTDC_IS_BeforeTrading '0'
///非交易
#define THOST_FTDC_IS_NoTrading '1'
///连续交易
#define THOST_FTDC_IS_Continous '2'
///集合竞价报单
#define THOST_FTDC_IS_AuctionOrdering '3'
///集合竞价价格平衡
#define THOST_FTDC_IS_AuctionBalance '4'
///集合竞价撮合
#define THOST_FTDC_IS_AuctionMatch '5'
///收盘
#define THOST_FTDC_IS_Closed '6'
typedef char TThostFtdcInstrumentStatusType;
/////////////////////////////////////////////////////////////////////////
///TFtdcInstStatusEnterReasonType是一个品种进入交易状态原因类型
/////////////////////////////////////////////////////////////////////////
///自动切换
#define THOST_FTDC_IER_Automatic '1'
///手动切换
#define THOST_FTDC_IER_Manual '2'
///熔断
#define THOST_FTDC_IER_Fuse '3'
typedef char TThostFtdcInstStatusEnterReasonType;
4.2 让vnpy接收CTP接口合约交易状态信息
1)在trader\constant.py中定义合约交易状态类型
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
2)在trader\object.py中定义合约状态类型StatusData
@dataclass
class StatusData(BaseData):
"""
合约状态 hxxjava debug
"""
# 合约代码
symbol:str
# 交易所代码
exchange : Exchange
# 结算组代码
settlement_group_id : str = ""
# 合约交易状态
instrument_status : InstrumentStatus = None
# 交易阶段编号
trading_segment_sn : int = None
# 进入本状态时间
enter_time : str = ""
# 进入本状态原因
enter_reason : str = ""
# 合约在交易所的代码
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
3)在trader\event.py中定义合约状态消息EVENT_STATUS
EVENT_STATUS = "eStatus" # hxxjava debug
4)在trader\gateway.py中给gateway增加on_status()函数
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
5) 在vnpy_ctp\gateway\ctp_gateway.py文件中,为TdApi加入如下函数
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约状态信息 # hxxjava debug
"""
if data:
status = StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = data["InstrumentStatus"],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = data["EnterReason"],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
# print(f"status = {status}")
self.gateway.on_status(status)
6) 运行vnpy,连接CTP接口,看看都收到了什么?
status=StatusData(gateway_name='CTP', symbol='nr', exchange=<Exchange.INE: 'INE'>, settlement_group_id='00000001', instrument_status='1', trading_segment_sn=4, enter_time='19:58:37', enter_reason='1', exchange_inst_id='nr')
status=StatusData(gateway_name='CTP', symbol='lu', exchange=<Exchange.INE: 'INE'>, settlement_group_id='00000001', instrument_status='1', trading_segment_sn=4, enter_time='19:58:37', enter_reason='1', exchange_inst_id='lu')
status=StatusData(gateway_name='CTP', symbol='sc', exchange=<Exchange.INE: 'INE'>, settlement_group_id='00000001', instrument_status='1', trading_segment_sn=4, enter_time='19:58:37', enter_reason='1', exchange_inst_id='sc')
... ... 相同略去 内容太多,包括了这个CTP接口中所有合约品种的合约状态信息,其他省略了
7) 收到了合约状态信息有什么用?
合约状态信息是有交易服务器推送到客户端的,其中包含如下的合约状态:
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
并且还有进入的时间和原因,这些信息正是我们解决BarGenerator在处理集合竞价时段的tick数据错误问题所需要的!
5. CTA策略如何使用使用合约状态信息?
5.1 在OmsEngine中收集当前市场的所有合约的合约状态信息
在vnpy\trader\engine.py文件中,为OmsEngine做如下的修改:
class OmsEngine(BaseEngine):
"""
Provides order management system function for VN Trader.
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
self.ticks: Dict[str, TickData] = {}
self.orders: Dict[str, OrderData] = {}
self.trades: Dict[str, TradeData] = {}
self.positions: Dict[str, PositionData] = {}
self.accounts: Dict[str, AccountData] = {}
self.contracts: Dict[str, ContractData] = {}
self.statuses: Dict[str, StatusData] = {} # hxxjava debug
self.active_orders: Dict[str, OrderData] = {}
self.add_function()
self.register_event()
def add_function(self) -> None:
"""Add query function to main engine."""
self.main_engine.get_tick = self.get_tick
self.main_engine.get_order = self.get_order
self.main_engine.get_trade = self.get_trade
self.main_engine.get_position = self.get_position
self.main_engine.get_account = self.get_account
self.main_engine.get_contract = self.get_contract
self.main_engine.get_all_ticks = self.get_all_ticks
self.main_engine.get_all_orders = self.get_all_orders
self.main_engine.get_all_trades = self.get_all_trades
self.main_engine.get_all_positions = self.get_all_positions
self.main_engine.get_all_accounts = self.get_all_accounts
self.main_engine.get_all_contracts = self.get_all_contracts
self.main_engine.get_all_statuses = self.get_all_statuses # hxxjava debug
self.main_engine.get_all_active_orders = self.get_all_active_orders
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event) # hxxjava debug
... ... 相同略去
def process_status_event(self, event: Event) -> None: # hxxjava debug
""""""
status = event.data
# print(f"got a status = {status}")
self.statuses[status.vt_symbol] = status
... ... 相同略去
def get_all_statuses(self) -> List[StatusData]: # hxxjava debug
"""
Get all status data.
"""
return list(self.statuses.values())
... ... 相同略去
注意:
这个步骤的目的:
把CTP接口接收到的所有合约品种的状态信息保存到self.statuses字典中。
为系统的主引擎main_engine提供访问所有合约品种的状态信息函数get_all_statuses()
5.2 把CTA策略模板CtaTemplate做如下修改
5.2.1 CTA策略引擎中,增加策略初始化时订阅EVENT_STATUS
修改app\cta_strategy\engine.py文件中的CtaEngine,下面只给出主要的代码修改部分:
class CtaEngine(BaseEngine):
""""""
... ... 相同略去
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_STATUS,self.process_status_event) # hxxjava debug
... ... 相同略去
def process_status_event(self,event:Event): # hxxjava debug
""" 分发合约某种状态信息到相应的策略 """
status:StatusData = event.data
for vt_symbol in self.symbol_strategy_map.keys():
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
if (status.vt_symbol.upper() in [symbol.upper(),instrument.upper()]) and (status.exchange == exchange):
# 分发合约某种状态信息到相应的所有策略中
strategies = self.symbol_strategy_map[vt_symbol]
for strategy in strategies:
self.call_strategy_func(strategy, strategy.on_status, status)
... ... 相同略去
5.2.2 CtaTemplate增加 on_status()处理接口推送的合约状态信息
在app\cta_strategy\template.py文件中,为cta_template模版增加下面的接口:
@virtual
def on_status(self, status: StatusData): # hxxjava debug
"""
Callback of status data
"""
pass
5.2.3 为您自己CTA策略增加 on_status()处理接口推送的合约状态信息
简单举例如下:
def on_status(self, status: StatusData): # hxxjava debug
print(f"strategy {self.strategy_name} got a status event {status}")
到这里,您可以让你的CTA策略在感知到策略正在交易的合约交易状态变化了。
具体如何使用合约交易状态信息, 这么做取决于您的需求了。举例如下:
- 当在合约处在集合竞价完成阶段收到了一条tick信息,你就可以把它代入到开盘的第一个连续竞价时间段,把该tick合并到第一个1分钟K线中
- 当最新收到的合约交易状态为非交易状态(既不是集合竞价状态,也不连续竞价状态),策略就可以不发出委托,以免被拒单
- 目前vnpy的CTA策略中,使用BarGenerator生成1分钟K,在各个交易时间段的最后1分钟是不会生成1分钟K线的,直到收到下一个各个交易时间段的第一个tick,才会生成上个交易时间段的最后1分钟K线。现在我们就可以利用合约交易状态的变化(如合约品种转入了非交易状态或者收盘状态),可以让BarGenerator立即生成当前的1分钟K线。