1. 停止单、条件单存在的问题
CTA策略模块原来是有停止单的,本人后来又在添加了条件单功能。使用中很多用户反应有这样的问题,那就是策略已经发出过停止单或条件单,但是还未触发,但是因为某种原因策略被关机了,再次启动该策略时发现之前发出过停止单或条件单没有了,非常不方便。如果能够在策略再次启动的时候,把历史的停止单或条件单回复出来就好了。
2. 如何解决加载停止单、条件单
那么如何把历史的停止单或条件单回复出来呢?把策略运行时曾经发出的停止单或条件单保存到文件或者数据库,在策略再次启动时,从文件或者数据库读取出来,恢复到CTA策略管理器的停止单或条件单列表,让它们继续运行就可以了。
这就有选择的问题:
- 是否希望加载历史停止单或条件单,这是可以选择的,应该可以设置;
- 全部恢复历史停止单或条件单,还是止恢复仍然有效的,这也应该可以选择的;
3. 实现方法
3.1 先实现停止单字典和条件单字典的存取功能
包括如下:
- 保存内存中的停止单字典到json文件
- 从json文件读取停止单字典
- 保存内存中的条件单字典到json文件
- 从json文件读取条件单字典
在vnpy_ctastrategy命令下新建一个文件utitlity.py,其内容如下:
"""
实现停止单字典和条件单字典的存取功能,功能如下:
- 保存内存中的停止单字典到json文件
- 从json文件读取停止单字典
- 保存内存中的条件单字典到json文件
- 从json文件读取条件单字典
作者:hxxjava
时间:2023-2-12
"""
import json
from datetime import datetime
from vnpy.trader.constant import Direction,Offset
from vnpy_ctastrategy.base import (
StopOrder,
StopOrderStatus,
ConditionOrder,
Condition,
ExecutePrice,
CondOrderStatus,
)
from vnpy.trader.utility import get_file_path,get_folder_path,save_json
class StopOrderEncoder(json.JSONEncoder):
"""
停止单相关类型的编码器————用来保存json文件
"""
def default(self, obj):
d = {}
d['__class__'] = obj.__class__.__name__
if isinstance(obj,Direction):
d['_value_'] = obj.value
elif isinstance(obj,Offset):
d['_value_'] = obj.value
elif isinstance(obj,StopOrderStatus):
d['_value_'] = obj.value
elif isinstance(obj, datetime):
d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
elif isinstance(obj,StopOrder):
d.update(obj.__dict__)
else:
d['__module__'] = obj.__module__
d.update(obj.__dict__)
return d
class StopOrderDecoder(json.JSONDecoder):
"""
停止单相关类型的译码器————用来从json文件读取
"""
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict2obj)
def dict2obj(self, d):
if '__class__' in d:
class_name = d.pop('__class__')
if class_name == 'Direction':
value = d['_value_']
instance = Direction(value)
elif class_name == 'Offset':
value = d['_value_']
instance = Offset(value)
elif class_name == 'StopOrderStatus':
value = d['_value_']
instance = StopOrderStatus(value)
elif class_name == 'datetime':
value = d['_value_']
instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
elif class_name == 'StopOrder':
instance = StopOrder(**d)
else:
module_name = d.pop('__module__')
module = __import__(module_name)
class_ = getattr(module, class_name)
args = dict((key,value) for key, value in d.items())
instance = class_(**args)
else:
instance = d
return instance
class CondOrderEncoder(json.JSONEncoder):
"""
条件单相关类型的编码器————用来保存json文件
"""
def default(self, obj):
d = {}
d['__class__'] = obj.__class__.__name__
if isinstance(obj,Direction):
d['_value_'] = obj.value
elif isinstance(obj,Offset):
d['_value_'] = obj.value
elif isinstance(obj,Condition):
d['_value_'] = obj.value
elif isinstance(obj,ExecutePrice):
d['_value_'] = obj.value
elif isinstance(obj,CondOrderStatus):
d['_value_'] = obj.value
elif isinstance(obj, datetime):
d['_value_'] = obj.strftime("%Y-%m-%d %H:%M:%S.%f")
elif isinstance(obj,ConditionOrder):
d.update(obj.__dict__)
else:
d['__module__'] = obj.__module__
d.update(obj.__dict__)
return d
class CondOrderDecoder(json.JSONDecoder):
"""
条件单相关类型的译码器————用来从json文件读取
"""
def __init__(self):
json.JSONDecoder.__init__(self, object_hook=self.dict2obj)
def dict2obj(self, d):
if '__class__' in d:
class_name = d.pop('__class__')
if class_name == 'Direction':
value = d['_value_']
instance = Direction(value)
elif class_name == 'Offset':
value = d['_value_']
instance = Offset(value)
elif class_name == 'Condition':
value = d['_value_']
instance = Condition(value)
elif class_name == 'ExecutePrice':
value = d['_value_']
instance = ExecutePrice(value)
elif class_name == 'CondOrderStatus':
value = d['_value_']
instance = CondOrderStatus(value)
elif class_name == 'datetime':
value = d['_value_']
instance = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
elif class_name == 'ConditionOrder':
instance = ConditionOrder(**d)
else:
module_name = d.pop('__module__')
module = __import__(module_name)
class_ = getattr(module, class_name)
args = dict((key,value) for key, value in d.items())
instance = class_(**args)
else:
instance = d
return instance
def save_stop_order(filename: str,data: dict) -> None:
"""
Save StopOrder dict into {.vntrader}\stop_orders\{filename}.json.
"""
path = get_folder_path("stop_orders")
path_file = path.joinpath(filename)
with open(path_file, mode="w+",encoding="UTF-8") as f:
json.dump(
data,
f,
indent=4,
cls=StopOrderEncoder,
ensure_ascii=False
)
def load_stop_order(filename: str) -> dict:
"""
Load StopOrder dict from {.vntrader}\stop_orders\{filename}.json.
"""
path = get_folder_path("stop_orders")
path_file = path.joinpath(filename)
filepath: Path = get_file_path(path_file)
if filepath.exists():
with open(filepath, mode="r", encoding="UTF-8") as f:
data: dict = json.load(f,cls=StopOrderDecoder)
return data
else:
save_json(filepath, {})
return {}
def save_condition_order(filename: str,data: dict) -> None:
"""
Save ConditionOrder dict into {.vntrader}\cond_orders\{filename}.json.
"""
path = get_folder_path("cond_orders")
path_file = path.joinpath(filename)
with open(path_file, mode="w+",encoding="UTF-8") as f:
json.dump(
data,
f,
indent=4,
cls=CondOrderEncoder,
ensure_ascii=False
)
def load_condition_order(filename: str) -> dict:
"""
Load ConditionOrder dict from {.vntrader}\cond_orders\{filename}.json.
"""
path = get_folder_path("cond_orders")
path_file = path.joinpath(filename)
filepath: Path = get_file_path(path_file)
if filepath.exists():
with open(filepath, mode="r", encoding="UTF-8") as f:
data: dict = json.load(f,cls=CondOrderDecoder)
return data
else:
save_json(filepath, {})
return {}
3.2 为CtaEngine增加停止单和条件单保存和加载功能
在文件cta_strategy\engine.py的class MyCtaEngine下面增加下面的代码。class MyCtaEngine已经在比停止单更好用的条件单——ConditionOrder一文中分享给大家来,虽然这次贴出其完整代码,但这里只介绍与停止单和条件单的保存与恢复有关的内容。
- 增加了save_stop_orders()接口,用来保存策略停止单
- 增加了load_stop_orders()接口,用来回复策略停止单
- 增加了save_condition_orders()接口,用来保存策略条件单
- 增加了load_condition_orders()接口,用来回复策略条件单
- 增加了对EVENT_TIMER消息的订阅,其消息处理函数process_timer_event()用来定时对所以初始化的策略进行监视,保存其停止单和条件单到以其策略名称为文件名的json文件。
在cta_strategy\engine.py的引用部分增加这些代码:
from .utility import save_stop_order, load_stop_order, save_condition_order, load_condition_order # hxxjava add
class MyCtaEngine下面增加下面的代码:
class MyCtaEngine(CtaEngine):
"""
CTA策略引擎,对CtaEngine的功能进行扩展。
功能:
1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
4. 定时保存已经初始化策略的停止单和条件单到json文件。
5. 提供历史策略的停止单和条件单的查询接口。
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
self.seconds = 0
self.save_orders_interval = 10
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event)
self.event_engine.register(EVENT_ALL_PENDING_ORDER, self.process_pending_order_event)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def process_pending_order_event(self,event:Event):
""" 集合竞价消息处理 hxxjava add """
pending_orders:PendingOrders = event.data
strategies:List[CtaTemplate] = self.symbol_strategy_map.get(pending_orders.vt_symbol,[])
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行全挂单消息推送
self.call_strategy_func(strategy, strategy.on_pending_orders, pending_orders)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 hxxjava add """
tick:TickData = event.data
strategies:List[CtaTemplate] = self.symbol_strategy_map.get(tick.vt_symbol,[])
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_status_event(self,event:Event):
""" 交易状态消息处理 hxxjava add """
status:StatusData = event.data
strategies:List[CtaTemplate] = []
# step1: find strategies related to this status data
vt_instrument0 = get_vt_instrument(status.vt_symbol)
if vt_instrument0 == status.vt_symbol:
# 交易品种的交易状态
for vt_symbol in self.symbol_strategy_map.keys():
vt_instrument = get_vt_instrument(vt_symbol)
if vt_instrument == vt_instrument0:
# 交易品种的交易状态属于策略交易的合约
strategies.extend(self.symbol_strategy_map[vt_symbol])
else:
# 单独合约的交易状态
strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))
if not strategies:
return
# step 2: push status data to all relate strategies
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_status, status)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.put_cond_order_event(order)
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.put_cond_order_event(order)
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.put_cond_order_event(order)
return True
def put_cond_order_event(self, cond_order: ConditionOrder) -> None:
"""
Put an event to update condition order status.
"""
event: Event = Event(EVENT_CONDITION_ORDER, cond_order)
self.event_engine.put(event)
def save_stop_orders(self,strategy_name:str,active_only:bool=False):
""" 保存加载历史停止单到json文件 """
count:int = 0
strategy:CtaTemplate = self.strategies.get(strategy_name,None)
if not strategy:
return count
stop_orders = {}
for order_id,stop_order in self.stop_orders.items():
if stop_order.strategy_name == strategy_name:
if active_only and stop_order.status == StopOrderStatus.WAITING:
stop_orders[order_id] = stop_order
count += 1
else:
stop_orders[order_id] = stop_order
count += 1
file_name = f"{strategy_name}.json"
save_stop_order(file_name,stop_orders)
return count
def load_stop_orders(self,strategy_name:str,active_only:bool=True):
""" 从json文件加载历史停止单 """
file_name = f"{strategy_name}.json"
stop_orders:Dict[str,StopOrder] = load_stop_order(file_name)
if not stop_orders:
return
if not active_only:
loaded_orders = stop_orders
else:
loaded_orders = {}
for id,stop_order in stop_orders.items():
if stop_order.status == StopOrderStatus.WAITING:
loaded_orders[id] = stop_order
if loaded_orders:
self.stop_orders.update(loaded_orders)
# 更新GUI中加载的停止单列表
for stop_order in loaded_orders.values():
self.put_stop_order_event(stop_order)
def save_condition_orders(self,strategy_name:str,active_only:bool=True):
""" 保存加载历史条件单到json文件 """
count:int = 0
strategy:CtaTemplate = self.strategies.get(strategy_name,None)
if not strategy:
return count
cond_orders = {}
for order_id,cond_order in self.condition_orders.items():
if cond_order.strategy_name == strategy_name:
if active_only and cond_order.status == CondOrderStatus.WAITING:
cond_orders[order_id] = cond_order
count += 1
else:
cond_orders[order_id] = cond_order
count += 1
file_name = f"{strategy_name}.json"
save_condition_order(file_name,cond_orders)
return count
def load_condition_orders(self,strategy_name:str,active_only:bool=True):
""" 从json文件加载历史条件单 """
file_name = f"{strategy_name}.json"
cond_orders:Dict[str,ConditionOrder] = load_condition_order(file_name)
if not cond_orders:
return
if not active_only:
loaded_orders = cond_orders
else:
loaded_orders = {}
for id,cond_order in cond_orders.items():
if cond_order.status == CondOrderStatus.WAITING:
loaded_orders[id] = cond_order
if loaded_orders:
self.condition_orders.update(loaded_orders)
# 更新GUI中加载的条件单列表
for cond_order in loaded_orders.values():
self.put_cond_order_event(cond_order)
def process_timer_event(self,event:Event) -> None:
# 定时保存策略的
self.seconds += 1
if self.seconds % self.save_orders_interval:
return
if self.get_engine_type() != EngineType.LIVE:
# 只有实盘引擎才保存停止单和条件单,回测引擎则不保存
return
for strategy in self.strategies.values():
if strategy.inited:
cnt1 = self.save_stop_orders(strategy.strategy_name)
cnt2 = self.save_condition_orders(strategy.strategy_name)
# print(f"保存了策略 {strategy.strategy_name} 的 {cnt1} 个停止单,{cnt2} 个条件。")
3.3 为CTA策略模板CtaTemplate增加停止单和条件单加载选项
这里主要介绍在CtaTemplate中增加点与加载历史停止单和条件单相关的成员变量:
其中:
- self.history_order表示策略是否在启动之后加载历史停止单和条件单,
- self.active_only表示是否只加载仍然处于等待状态的历史停止单和条件单;
class CtaTemplate的其他代码见本人之前的帖子中的代码:比停止单更好用的条件单——ConditionOrder。
class CtaTemplate(ABC):
""""""
author: str = ""
parameters: list = []
variables: list = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
) -> None:
""""""
self.cta_engine: Any = cta_engine
self.strategy_name: str = strategy_name
self.vt_symbol: str = vt_symbol
self.inited: bool = False
self.trading: bool = False
self.pos: int = 0
# 是否在启动之后加载历史停止单和条件单
self.history_order:bool = False # hxxjava add
self.active_only:bool = False # hxxjava add
# Copy a new variables list here to avoid duplicate insert when multiple
# strategy instances are created with the same strategy class.
self.variables = copy(self.variables)
self.variables.insert(0, "inited")
self.variables.insert(1, "trading")
self.variables.insert(2, "pos")
self.update_setting(setting)
# 其他代码省略 ... ...
4. 加载停止单和条件单委托选项的应用
4.1 用户CTA策略如何使用停止单和条件单加载功能?
增加历史停止单和条件单委托选项
class DemoStrategy(CtaTemplate):
""" 示例策略 """
author = "hxxjava"
capital : float = 200000.0 # 交易资金
max_loss_ratio : int = 6 # 每次开仓最大亏损比例
max_open_times : int = 3 # 每次开仓最大亏损比例
dir_interval : str = '1m' # 方向周期单位,只能够是:'1m','1h','d'或'w'中的一个
dir_window : int = 30 # 方向周期窗口
op_interval : str = '1m' # 操作周期单位,只能够是:'1m','1h','d'或'w'中的一个
op_window : int = 3 # 操作周期窗口
load_days : int = 10 # 加载历史行情的天数
OpenSelect:str = "逆转价"
show_chart: bool = True # 是否需要显示图表
dir_trend: str = ""
op_trend: str = ""
parameters = [
"capital",
"max_loss_ratio",
"max_open_times",
"dir_interval",
"dir_window",
"op_interval",
"op_window",
"load_days",
"OpenSelect",
"show_chart",
"history_order", # 启动时是否加载停止单和条件单选项
"active_only", # 是否只加载仍然有效的停止单和条件单选项,
]
long_pos:float = 0 # 持有多仓
short_pos:float = 0 # 持有空仓
variables = [
"dir_trend",
"op_trend",
"long_pos",
"short_pos"
]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)
self.history_order = True # 启动时载历史停止单和条件单
self.active_only = False # 加载又有点历史停止单和条件单
# 其他的代码省略
def on_start(self):
"""
Callback when strategy is started.
"""
is_live = self.cta_engine.get_engine_type() == EngineType.LIVE
if is_live and self.history_order:
# 只有实盘才保存策略的历史停止单和条件单
from vnpy_ctastrategy.engine import MyCtaEngine
cta_engine:MyCtaEngine = self.cta_engine
cta_engine.load_stop_orders(self.strategy_name,self.active_only)
cta_engine.load_condition_orders(self.strategy_name,self.active_only)
self.write_log("加载历史停止单和条件单已执行。")
# 其他的代码省略
4.2 设置加载停止单和条件单委托选项
在用户策略未启动的情况下,还可以设置有关加载停止单和条件单委托的选项,如图所示:
4.3 停止单和条件单会在用户CTA策略启动之后立即加载
下图是策略执行了条件单之后被关闭,再次重新启动之后回复的条件单。当然,停止单也是可以的实现恢复历史的,大家可以去试。
4.4 保存的停止单和条件单在哪里,内容是什么样子?
以条件单为例,如下图所示,条件单通常保存在用户目录下的.vntrader\cond_orders{策略名称}.json文件中:
其内容及格式如下:
{
"0213082700975": {
"__class__": "ConditionOrder",
"strategy_name": "gs-rb2305",
"vt_symbol": "rb2305.SHFE",
"direction": {
"__class__": "Direction",
"_value_": "多"
},
"offset": {
"__class__": "Offset",
"_value_": "开"
},
"price": 4072.0,
"volume": 4.0,
"condition": {
"__class__": "Condition",
"_value_": ">"
},
"execute_price": {
"__class__": "ExecutePrice",
"_value_": "设定价"
},
"create_time": {
"__class__": "datetime",
"_value_": "2023-02-13 08:27:00.975422"
},
"trigger_time": null,
"cond_orderid": "0213082700975",
"traded": 0.0,
"vt_orderids": [],
"status": {
"__class__": "CondOrderStatus",
"_value_": "已撤销"
},
"before_trigger": null,
"after_traded": null
},
"0213090325941": {
"__class__": "ConditionOrder",
"strategy_name": "gs-rb2305",
"vt_symbol": "rb2305.SHFE",
"direction": {
"__class__": "Direction",
"_value_": "空"
},
"offset": {
"__class__": "Offset",
"_value_": "开"
},
"price": 4063.0,
"volume": 4.0,
"condition": {
"__class__": "Condition",
"_value_": "<="
},
"execute_price": {
"__class__": "ExecutePrice",
"_value_": "极限价"
},
"create_time": {
"__class__": "datetime",
"_value_": "2023-02-13 09:03:25.941102"
},
"trigger_time": {
"__class__": "datetime",
"_value_": "2023-02-13 09:03:25.500000"
},
"cond_orderid": "0213090325941",
"traded": 0.0,
"vt_orderids": [
"CTP.12_-2076558284_1"
],
"status": {
"__class__": "CondOrderStatus",
"_value_": "已触发"
},
"before_trigger": null,
"after_traded": null
}
}