1. 问题的由来
实盘中,如果你用K线图表对你的CTA策略生成的K线进行显示,你会发现明明已经休市了或者收盘了,最新的日K线、30分钟、5分钟K线本应该能够有了,可是迟迟见不到这些K线的生成。
以国内期货举例:
- 10:10~10:15的那根5分钟K线在10:15:00到10:30:00的休市时间段内是出不来的;
- 11:25:00到11:30:00的那根5分钟K线在11:30:00到13:30:00的休市时间段内是出不来的;
- 还有日K线,已经到15:00:00收盘了,可是因为没有收到下一秒tick而不能够及时生成出来。如果您不关机,等到下一个交易日的第一个tick收到的时候才能生成这根日K线。
- 另外如果你是个细心的人,注意下策略在加载历史数据的时候,最后一根日K线、30分钟、5分钟K线乃至最后一根1分钟K线也是见不到的。
目前vnpy的BarGenerator没有出错,但出现这种情况有违常理!
为什么会出现情况?原因是因为你的BarGenerator的生成K线的机制导致的,因为这些大K线是由1分钟K线合成的,就是说它们依赖1分钟K线的生成。
目前1分钟K线只是单纯由tick数据推动的,当收到下一分钟tick才判断当前1分钟K线的结束。如果遇到了中间休市时间或者收盘时间,网关接口就不再有新的tick推送,这样最后1分钟K线也就一直呆在BarGenerator中,无法推动5分钟、30分钟、日等大K线的生成,这就是目前BarGenerator的问题所在。
2. 合约交易状态可以解决这个问题。
交易所用合约交易状态通知交易客户端交易合约的状态已经发生了变化。它表明交易合约当前的交易时间段,在每个交易时间的开始和结束时推送,时间为分钟的开始。这个信息正好可以用于BarGenerator结束各个休市和收盘前1分钟的K线生成,进而一举解决比1分钟大的K线的生成。
3. 实现这个问题一共分成5步
套用一句宋丹丹的话,要把大象关进冰箱总共分三步:第一步把冰箱门打开,第二步把冰箱塞进冰箱中,第三步把冰箱门关上!
让BarGenerator在休市和收市时及时生成K线分五步:
- 第一步扩展CTA策略引擎CtaEngine
- 第二步修改CTA策略模板CtaTemplate
- 第三步扩展K线生成器BarGenerator
- 第四步修改CTA策略
- 第五步启用扩展的MyCtaEngine
3.1 第一步扩展CTA策略引擎CtaEngine
在vnpy_ctastrategy\engine.py中增加下面的内容:
class MyCtaEngine(CtaEngine):
"""
CTA策略引擎,对CtaEngine的功能进行扩展。
功能:
1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
"""
condition_filename = "condition_order.json" # 历史条件单存储文件
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
def load_active_condtion_orders(self):
""" """
return {}
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
self.event_engine.register(EVENT_STATUS, self.process_status_event)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 hxxjava add """
tick:TickData = event.data
strategies:List[CtaTemplate] = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_status_event(self,event:Event):
""" 交易状态消息处理 hxxjava add """
status:StatusData = event.data
strategies:List[CtaTemplate] = []
# step1: find strategies related to this status data
vt_instrument0 = get_vt_instrument(status.vt_symbol)
if vt_instrument0 == status.vt_symbol:
# 交易品种的交易状态
for vt_symbol in self.symbol_strategy_map.keys():
vt_instrument = get_vt_instrument(vt_symbol)
if vt_instrument == vt_instrument0:
# 交易品种的交易状态属于策略交易的合约
strategies.extend(self.symbol_strategy_map[vt_symbol])
else:
# 单独合约的交易状态
strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))
if not strategies:
return
# step 2: push status data to all relate strategies
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_status, status)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
3.2 第二步修改CTA策略模板CtaTemplate
在vnpy_ctastrategy\template.py中增加一个on_status()虚函数,它是接收交易状态信息数据推送的接口,on_status()代码如下:
class CtaTemplate(ABC):
""""""
author: str = ""
parameters: list = []
variables: list = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
) -> None:
""""""
self.cta_engine: Any = cta_engine
self.strategy_name: str = strategy_name
self.vt_symbol: str = vt_symbol
self.inited: bool = False
self.trading: bool = False
self.pos: int = 0
# Copy a new variables list here to avoid duplicate insert when multiple
# strategy instances are created with the same strategy class.
self.variables = copy(self.variables)
self.variables.insert(0, "inited")
self.variables.insert(1, "trading")
self.variables.insert(2, "pos")
self.update_setting(setting)
def update_setting(self, setting: dict) -> None:
"""
Update strategy parameter wtih value in setting dict.
"""
for name in self.parameters:
if name in setting:
setattr(self, name, setting[name])
@classmethod
def get_class_parameters(cls) -> dict:
"""
Get default parameters dict of strategy class.
"""
class_parameters: dict = {}
for name in cls.parameters:
class_parameters[name] = getattr(cls, name)
return class_parameters
def get_parameters(self) -> dict:
"""
Get strategy parameters dict.
"""
strategy_parameters: dict = {}
for name in self.parameters:
strategy_parameters[name] = getattr(self, name)
return strategy_parameters
def get_variables(self) -> dict:
"""
Get strategy variables dict.
"""
strategy_variables: dict = {}
for name in self.variables:
strategy_variables[name] = getattr(self, name)
return strategy_variables
def get_data(self) -> dict:
"""
Get strategy data.
"""
strategy_data: dict = {
"strategy_name": self.strategy_name,
"vt_symbol": self.vt_symbol,
"class_name": self.__class__.__name__,
"author": self.author,
"parameters": self.get_parameters(),
"variables": self.get_variables(),
}
return strategy_data
@virtual
def on_init(self) -> None:
"""
Callback when strategy is inited.
"""
pass
@virtual
def on_inited(self): # hxxjava add
"""
Callback when strategy is inited.
"""
pass
@virtual
def on_start(self):
"""
Callback when strategy is started.
"""
pass
@virtual
def on_stop(self) -> None:
"""
Callback when strategy is stopped.
"""
pass
@virtual
def on_auction_tick(self, tick: TickData):
"""
Callback of new tick data update. # hxxjava add for auction tick
"""
pass
@virtual
def on_status(self, status: StatusData=None):
"""
Callback of new status data update. # hxxjava add for trading status
"""
pass
@virtual
def on_tick(self, tick: TickData) -> None:
"""
Callback of new tick data update.
"""
pass
@virtual
def on_bar(self, bar: BarData) -> None:
"""
Callback of new bar data update.
"""
pass
@virtual
def on_trade(self, trade: TradeData) -> None:
"""
Callback of new trade data update.
"""
pass
@virtual
def on_order(self, order: OrderData) -> None:
"""
Callback of new order data update.
"""
pass
@virtual
def on_stop_order(self, stop_order: StopOrder) -> None:
"""
Callback of stop order update.
"""
pass
def buy(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send buy order to open a long position.
"""
return self.send_order(
Direction.LONG,
Offset.OPEN,
price,
volume,
stop,
lock,
net
)
def sell(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send sell order to close a long position.
"""
return self.send_order(
Direction.SHORT,
Offset.CLOSE,
price,
volume,
stop,
lock,
net
)
def short(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send short order to open as short position.
"""
return self.send_order(
Direction.SHORT,
Offset.OPEN,
price,
volume,
stop,
lock,
net
)
def cover(
self,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send cover order to close a short position.
"""
return self.send_order(
Direction.LONG,
Offset.CLOSE,
price,
volume,
stop,
lock,
net
)
def send_order(
self,
direction: Direction,
offset: Offset,
price: float,
volume: float,
stop: bool = False,
lock: bool = False,
net: bool = False
) -> list:
"""
Send a new order.
"""
if self.trading:
vt_orderids: list = self.cta_engine.send_order(
self, direction, offset, price, volume, stop, lock, net
)
return vt_orderids
else:
return []
def cancel_order(self, vt_orderid: str) -> None:
"""
Cancel an existing order.
"""
if self.trading:
self.cta_engine.cancel_order(self, vt_orderid)
def cancel_all(self) -> None:
"""
Cancel all orders sent by strategy.
"""
if self.trading:
self.cta_engine.cancel_all(self)
def write_log(self, msg: str) -> None:
"""
Write a log message.
"""
self.cta_engine.write_log(msg, self)
def get_engine_type(self) -> EngineType:
"""
Return whether the cta_engine is backtesting or live trading.
"""
return self.cta_engine.get_engine_type()
def get_pricetick(self) -> float:
"""
Return pricetick data of trading contract.
"""
return self.cta_engine.get_pricetick(self)
def load_bar(
self,
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
use_database: bool = False
) -> None:
"""
Load historical bar data for initializing strategy.
"""
if not callback:
callback: Callable = self.on_bar
bars: List[BarData] = self.cta_engine.load_bar(
self.vt_symbol,
days,
interval,
callback,
use_database
)
for bar in bars:
callback(bar)
def load_tick(self, days: int) -> None:
"""
Load historical tick data for initializing strategy.
"""
ticks: List[TickData] = self.cta_engine.load_tick(self.vt_symbol, days, self.on_tick)
for tick in ticks:
self.on_tick(tick)
def put_event(self) -> None:
"""
Put an strategy data event for ui update.
"""
if self.inited:
self.cta_engine.put_strategy_event(self)
def send_email(self, msg) -> None:
"""
Send email to default receiver.
"""
if self.inited:
self.cta_engine.send_email(msg, self)
def sync_data(self) -> None:
"""
Sync strategy variables value into disk storage.
"""
if self.trading:
self.cta_engine.sync_strategy_data(self)
def get_trading_hours(self):
"""
Return trading_hours of trading hours. # hxxjava add
"""
return self.cta_engine.get_trading_hours(self)
def get_actual_days(self,last_time:datetime,days:int): # hxxjava add
"""
得到从last_time开始往前days天的实际天数。
"""
if days <= 0:
return 0
th = TradingHours(self.get_trading_hours())
# 找到有效的最后时间
till_time = last_time
while not th.get_trade_hours(till_time):
till_time = timedelta(minutes=1)
availble_days = 0
#找到有些多开始时间
from_time = till_time
while availble_days <= days:
from_time -= timedelta(days=1)
if th.get_trade_hours(from_time):
availble_days += 1
actual_days = (last_time-from_time).days
# print(f"till_time={till_time},from_time{from_time},days={days},acutal_days={actual_days}")
return actual_days
def get_contract(self):
"""
Return trading_hours of trading contract. # hxxjava add
"""
return self.cta_engine.get_contract(self)
@virtual
def on_condition_order(self, cond_order: ConditionOrder):
"""
Callback of condition order update.
"""
pass
def send_condition_order(self,order:ConditionOrder): # hxxjava add
""" """
if not self.trading:
return False
return self.cta_engine.send_condition_order(order)
def cancel_condition_order(self,cond_orderid:str): # hxxjava add
""" """
return self.cta_engine.cancel_condition_order(cond_orderid)
def cancel_all_condition_orders(self): # hxxjava add
""" """
return self.cta_engine.cancel_all_condition_orders(self)
def send_margin_ratio_request(self): # hxxjava add
""" """
main_engine = self.cta_engine.main_engine
contract:ContractData = self.get_contract()
symbol,exchange = extract_vt_symbol(self.vt_symbol)
req = MarginRequest(symbol=symbol,exchange=exchange)
main_engine.send_margin_ratio_request(req,contract.gateway_name)
def send_commission_request(self): # hxxjava add
""" """
main_engine = self.cta_engine.main_engine
contract:ContractData = self.get_contract()
symbol,exchange = extract_vt_symbol(self.vt_symbol)
req = CommissionRequest(symbol=symbol,exchange=exchange)
main_engine.send_commission_request(req,contract.gateway_name)
3.3 第三步修改K线生成器BarGenerator
修改vnpy\trader\utility.py中的BarGenerator,为其添加下面的update_status()函数:
class BarGenerator:
... ...
def update_status(self,status:StatusData=None):
""" """
# if status:
# hh,mm = status.enter_time.split(':')
# st_time = datetime.now().replace(hour=int(hh),minute=int(mm),second=0,microsecond=0,tzinfo=CHINA_TZ)
if self.bar:
# 只要接收到交易状态信息,一定是整分钟,立即推送当前分钟bar
self.on_bar(self.bar)
self.bar = None
3.4 第四步修改Cta策略
在您的CTA策略中增加一个on_status()函数,这里只给出一个CTA策略与本文主题相关部分的代码,其他部分用省略号表示,一般的代码如下:
class XxxStrategy(CtaTemplate):
""" XXX交易策略 """
self.window = 30
... ...
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)
... ...
self.bg : BarGenerator:BarGenerator(self.on_bar,self.window,self.on_xmin_bar)
... ...
def on_inited(self):
"""
Callback after strategy is inited.
"""
self.bg.update_status() # 用来强制生成最后的1分钟K线
self.write_log("策略初始化结束。")
def on_auction_tick(self, tick: TickData):
"""
Callback of auction tick data update.
"""
self.bg.update_auction_tick(tick)
def on_status(self,status:StatusData=None): #
"""
收到合约交易状态信息时,更新所有的K线生成器 。
注意:合约交易状态信息推送只会在策略初始化后才执行!
"""
self.bg.update_status(status=status)
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.bg.update_bar(bar)
def on_xmin_bar(self, bar: BarData):
""" """
# 这里应该是利用大周期bar进行出入场交易信号计算和委托交易指令发出的代码
pass
... ...
3.5 第五步启用扩展的MyCtaEngine
修改vnpy_ctastrategy__init__.py中的CtaStrategyApp,将其engine_class更改为扩展的MyCtaEngine:
class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
# engine_class = CtaEngine
engine_class = MyCtaEngine # hxxjava add
widget_name = "CtaManager"
icon_name = str(app_path.joinpath("ui", "cta.ico"))