注明:
本贴是继如何解决CTP接口中集合竞价后立即下单被拒的问题 !之后,对交易状态信息进行更为深入地研究,实现了交易状态信息管理器,它可以更为有效地利用合约交易状态信息。
1. 交易状态信息管理器
1.1 功能介绍
主要功能:
1)集中保存所有品种的交易状态到字典
2)保存所有品种的交易状态到字典到json文件
3)从json文件加载已经保存的交易状态到字典
4)定时同步交易状态到字典到json文件
5)提取交易合约的在某个时刻的交易状态
1.2 代码实现
class TradeStatusManager(object):
"""
交易状态管理器。
主要功能:
1)集中保存所有品种的交易状态到字典
2)保存所有品种的交易状态到字典到json文件
3)从json文件加载已经保存的交易状态到字典
4)定时同步交易状态到字典到json文件
5)提取交易合约的在某个时刻的交易状态
"""
json_file = "trade_status.json"
def __init__(self,event_engine:EventEngine,interval:int)-> None:
self.event_engine = event_engine
self.interval = interval
self.count = 0
# {vt_symbol:{trade_segment_sn:StatusData}}
self.trade_status_map:Dict[str:Dict] = self.load_from_file()
self.change = False
self.register_event()
def register_event(self):
""" 注册消息事件 """
self.event_engine.register(EVENT_TIMER,self.on_time_event)
def load_from_file(self):
"""
从json文件读取交易状态映射
"""
filepath = get_file_path(self.json_file)
# print(f"filepath = {filepath}")
if filepath.exists():
with open(filepath, mode="r", encoding="UTF-8") as f:
data = json.load(fp=f,cls=TradeStatusDecoder)
return data
else:
save_json(self.json_file, {})
return {}
def sync_to_file(self):
"""
保存交易状态映射到json文件
"""
filepath = get_file_path(self.json_file)
with open(filepath, mode="w+", encoding="UTF-8") as f:
json.dump(
self.trade_status_map,fp=f,
cls=TradeStatusEncoder,
indent=4,
ensure_ascii=False
)
def on_time_event(self,event:Event):
self.count += 1
if self.count > self.interval:
self.count = 0
if self.change:
self.sync_to_file()
self.change = False
print(f"已经保存交易状态映射到交易状态映射!")
def save_status(self,status:StatusData):
"""
保存交易状态映射到交易状态映射
"""
vt_symbol = status.vt_symbol
if vt_symbol not in self.trade_status_map:
self.trade_status_map[vt_symbol] = {}
segment_sn_str = f"{status.trading_segment_sn}".rjust(2,'0')
self.trade_status_map[vt_symbol].update({"current":segment_sn_str})
self.trade_status_map[vt_symbol].update({segment_sn_str:status})
next_segment_sn = self._get_next_segment_sn(status.vt_symbol)
self.trade_status_map[vt_symbol].update({"next":next_segment_sn})
self.change = True
def get_status(self,vt_symbol:str):
"""
获得一个datetime所处的交易状态
"""
# 构造
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
vt_symbol1 = f"{instrument}.{exchange.value}"
ret_status = None
# dt_time = dt.time()
if vt_symbol1 in self.trade_status_map:
current = self.trade_status_map[vt_symbol1]['current']
# for enter_time_str,status in self.trade_status_map[vt_symbol1].items():
# enter_time = datetime.strptime(enter_time_str,"%H:%M:%S").time()
# if dt_time >= enter_time:
# ret_status = status
ret_status = self.trade_status_map[vt_symbol1][current]
return ret_status
def _get_next_segment_sn(self,vt_symbol:str):
""" 得到合约的下一交易状态编号 """
next_key:str = ""
if vt_symbol in self.trade_status_map:
instrment_dict = self.trade_status_map[vt_symbol]
current_key = instrment_dict['current']
keys = [key for key in sorted(instrment_dict.keys()) if key not in ["current","next"]]
if current_key == keys[-1]:
next_key = keys[0]
else:
index = 0
for key in keys:
index += 1
if key == current_key:
break
next_key = keys[index]
return next_key
def get_current_status(self,vt_symbol:str):
""" 得到合约的当前交易状态 """
status:StatusData = None
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
vt_symbol1 = f"{instrument}.{exchange.value}"
if vt_symbol1 in self.trade_status_map:
status_dict = self.trade_status_map[vt_symbol1]
key = status_dict['current']
status = status_dict[key]
return status
def get_next_status(self,vt_symbol:str):
""" 得到合约的下一交易状态 """
status:StatusData = None
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
vt_symbol1 = f"{instrument}.{exchange.value}"
if vt_symbol1 in self.trade_status_map:
status_dict = self.trade_status_map[vt_symbol1]
key = status_dict['next']
status = status_dict[key]
return status
def get_tick_status(self,tick:TickData):
"""
得到一个tick数据的合约所处交易状态
"""
status:StatusData = None
instrument = left_alphas(tick.symbol)
tick_time = tick.datetime.strftime("%H:%M:%S")
vt_symbol = f"{instrument}.{tick.exchange.value}"
if vt_symbol in self.trade_status_map:
status_dict = self.trade_status_map[vt_symbol]
curr_key = status_dict["current"]
next_key = status_dict["next"]
curr_status:StatusData = status_dict[curr_key]
next_status:StatusData = status_dict[next_key]
if curr_status.enter_time < next_status.enter_time:
if curr_status.enter_time <= tick_time < next_status.enter_time:
status = curr_status
elif next_status.enter_time <= tick_time:
status = next_status
else:
if curr_status.enter_time <= tick_time:
status = curr_status
elif next_status.enter_time <= tick_time:
status = next_status
return status
2. 交易状态相关类型的编码器
因为TradeStatusManager的trade_status_map字典是一个复杂字典,对其进行json转换需要自定义一个编码器。实现如下:
class TradeStatusEncoder(json.JSONEncoder):
"""
交易状态相关类型的编码器
"""
def default(self, obj):
d = {}
d['__class__'] = obj.__class__.__name__
if isinstance(obj,Exchange):
d['_value_'] = obj.value
elif isinstance(obj,InstrumentStatus):
d['_value_'] = obj.value
elif isinstance(obj,StatusEnterReason):
d['_value_'] = obj.value
elif isinstance(obj,StatusData):
d.update(obj.__dict__)
else:
d['__module__'] = obj.__module__
d.update(obj.__dict__)
return d
3. 交易状态相关类型的译码器
与2的原因相同,对把json转换到TradeStatusManager的trade_status_map,需要自定义一个译码器。实现如下:
class TradeStatusDecoder(json.JSONDecoder):
"""
交易状态相关类型的译码器
"""
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict2obj)
def dict2obj(self, d):
if '__class__' in d:
class_name = d.pop('__class__')
if class_name == 'Exchange':
value = d['_value_']
instance = Exchange(value)
elif class_name == 'InstrumentStatus':
value = d['_value_']
instance = InstrumentStatus(value)
elif class_name == 'StatusEnterReason':
value = d['_value_']
instance = StatusEnterReason(value)
elif class_name == 'StatusData':
vt_symbol = d.pop('vt_symbol')
symbol,exchange = extract_vt_symbol(vt_symbol)
args = dict((key,value) for key, value in d.items())
args.update({"symbol":symbol,"exchange":exchange})
instance = StatusData(**args)
else:
module_name = d.pop('__module__')
# print(f"!!!! class_name:{class_name} module_name:{module_name}")
module = __import__(module_name)
class_ = getattr(module, class_name)
args = dict((key,value) for key, value in d.items())
instance = class_(**args)
else:
instance = d
return instance
4. trade_status.json文件
4.1 trade_status.json保存在哪里?
这个文件通常保存在系统的用户目录下的.vntrader目录中。
4.2 trade_status.json的内容
4.2.1 有哪些内容?
这里仅仅展示一部分内容,如果您足够敏感的化,您应该能够立即领悟到这些信息的作用!
它是以合约品种为主键值,以进入时间为次键值的嵌套的字典,字典值为交易状态。
有了它你可以知道某个品种、合约在某个时刻之后处于什么状态。
这些状态包含:
- NO_TRADING = "非交易"
- BEFORE_TRADING = "开盘前"
- AUCTION_ORDERING = "集合竞价报单"
- AUCTION_MATCH = "集合竞价撮合"
- AUCTION_BALANCE = "集合竞价价格平衡"
- CONTINOUS = "连续交易"
- CLOSE = "收盘"
4.2.2 这些信息有什么用?
有了这个状态信息,你可以:
- 过滤非交易接收到的无效数据
- 可以知道收到的tick是开盘tick,其包含的last_price就是开盘价
- 可以避免在在收到开盘tick时立即执行
- 因为这些信息每个交易日似乎重复播放的,你可以利用这些历史接收的交易状态信息,快速作出决策。
4.2.3 内容展示
{
"TA.CZCE": {
"19:44:24": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 4,
"enter_time": "19:44:24",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"20:04:29": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 4,
"enter_time": "20:04:29",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"20:55:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "集合竞价报单"
},
"trading_segment_sn": 7,
"enter_time": "20:55:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"20:59:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "集合竞价撮合"
},
"trading_segment_sn": 9,
"enter_time": "20:59:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"21:00:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "连续交易"
},
"trading_segment_sn": 13,
"enter_time": "21:00:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"23:00:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "TA",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 15,
"enter_time": "23:00:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "TA",
"vt_symbol": "TA.CZCE"
},
"current": "23:00:00"
},
"OI.CZCE": {
"19:44:24": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 4,
"enter_time": "19:44:24",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"20:04:29": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 4,
"enter_time": "20:04:29",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"20:55:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "集合竞价报单"
},
"trading_segment_sn": 7,
"enter_time": "20:55:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"20:59:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "集合竞价撮合"
},
"trading_segment_sn": 9,
"enter_time": "20:59:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"21:00:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "连续交易"
},
"trading_segment_sn": 13,
"enter_time": "21:00:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"23:00:00": {
"__class__": "StatusData",
"gateway_name": "CTP",
"symbol": "OI",
"exchange": {
"__class__": "Exchange",
"_value_": "CZCE"
},
"settlement_group_id": "00000001",
"instrument_status": {
"__class__": "InstrumentStatus",
"_value_": "非交易"
},
"trading_segment_sn": 15,
"enter_time": "23:00:00",
"enter_reason": {
"__class__": "StatusEnterReason",
"_value_": "自动切换"
},
"exchange_inst_id": "OI",
"vt_symbol": "OI.CZCE"
},
"current": "23:00:00"
},
... ... 内容太多,略去
}
5. 交易状态信息管理器的使用
未完待续 ... ...