1. 为什么要修改?
原因见这个帖子:https://www.vnpy.com/forum/topic/4461-shuo-shi-r-breakerce-lue-de-wen-ti
2. 修改步骤:
2.1 添加文件vnpy\usertools\trade_hour.py
内容如下:
"""
本文件主要实现合约的交易时间段
作者:hxxjava
日期:2020-8-1
"""
from typing import Callable,List,Dict, Tuple, Union
from enum import Enum
import datetime
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")
from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.constant import Interval
from rqdatac.utils import to_date
import rqdatac as rq
def get_listed_date(symbol:str):
'''
获得上市日期
'''
info = rq.instruments(symbol)
return to_date(info.listed_date)
def get_de_listed_date(symbol:str):
'''
获得交割日期
'''
info = rq.instruments(symbol)
return to_date(info.de_listed_date)
class Timeunit(Enum):
"""
时间单位
"""
SECOND = '1s'
MINUTE = '1m'
HOUR = '1h'
class TradeHours(object):
""" 合约交易时间段 """
def __init__(self,symbol:str):
self.symbol = symbol.upper()
self.init()
def init(self):
"""
初始化交易日字典及交易时间段数据列表
"""
self.listed_date = get_listed_date(self.symbol)
self.de_listed_date = get_de_listed_date(self.symbol)
self.trade_date_index = {} # 合约的交易日索引字典
self.trade_index_date = {} # 交易天数与交易日字典
trade_dates = rq.get_trading_dates(self.listed_date,self.de_listed_date) # 合约的所有的交易日
days = 0
for td in trade_dates:
self.trade_date_index[td] = days
self.trade_index_date[days] = td
days += 1
trading_hours = rq.get_trading_hours(self.symbol,date=self.listed_date,frequency='tick',expected_fmt='datetime')
self.time_dn_pairs = self._get_trading_times_dn(trading_hours)
trading_hours0 = [(CHINA_TZ.localize(start),CHINA_TZ.localize(stop)) for start,stop in trading_hours]
self.trade_date_index[self.listed_date] = (0,trading_hours0)
for day in range(1,days):
td = self.trade_index_date[day]
trade_datetimes = []
for (start,dn1),(stop,dn2) in self.time_dn_pairs:
#start:开始时间,dn1:相对交易日前推天数,
#stop :开始时间,dn2:相对开始时间后推天数
d = self.trade_index_date[day+dn1]
start_dt = CHINA_TZ.localize(datetime.datetime.combine(d,start))
stop_dt = CHINA_TZ.localize(datetime.datetime.combine(d,stop))
trade_datetimes.append((start_dt,stop_dt+datetime.timedelta(days=dn2)))
self.trade_date_index[td] = (day,trade_datetimes)
def _get_trading_times_dn(self,trading_hours:List[Tuple[datetime.datetime,datetime.datetime]]):
"""
交易时间跨天处理,不推荐外部使用 。
产生的结果:[((start1,dn11),(stop1,dn21)),((start2,dn12),(stop2,dn22)),...,((startN,dn1N),(stopN,dn2N))]
其中:
startN:开始时间,dn1N:相对交易日前推天数,
stopN:开始时间,dn2N:相对开始时间后推天数
"""
ilen = len(trading_hours)
if ilen == 0:
return []
start_stops = []
for start,stop in trading_hours:
start_stops.insert(0,(start.time(),stop.time()))
pre_start,pre_stop = start_stops[0]
dn1 = 0
dn2 = 1 if pre_start > pre_stop else 0
time_dn_pairs = [((pre_start,dn1),(pre_stop,dn2))]
for start,stop in start_stops[1:]:
if start > pre_start:
dn1 -= 1
dn2 = 1 if start > stop else 0
time_dn_pairs.insert(0,((start,dn1),(stop,dn2)))
pre_start,pre_stop = start,stop
return time_dn_pairs
def get_date_tradetimes(self,date:datetime.date):
"""
得到合约date日期的交易时间段
"""
idx,trade_times = self.trade_date_index.get(date,(None,[]))
return idx,trade_times
def get_trade_datetimes(self,dt:datetime,allday:bool=False):
"""
得到合约date日期的交易时间段
"""
# 得到最早的交易时间
idx0,trade_times0 = self.get_date_tradetimes(self.listed_date)
start0,stop0 = trade_times0[0]
if dt < start0:
return None,[]
# 首先找到dt日期自上市以来的交易天数
date,dn = dt.date(),0
days = None
while date < self.de_listed_date:
days,ths = self.trade_date_index.get(date,(None,[]))
if not days:
dn += 1
date = (dt+datetime.timedelta(days=dn)).date()
else:
break
# 如果超出交割日也没有找到,那这就不是一个有效的交易时间
if days is None:
return (None,[])
index_3 = [days,days+1,days-1] # 前后三天的
date_3d = []
for day in index_3:
date = self.trade_index_date.get(day,None)
date_3d.append(date)
# print(date_3d)
for date in date_3d:
if not date:
# print(f"{date} is not trade date")
continue
idx,trade_dts = self.get_date_tradetimes(date)
# print(f"{date} tradetimes {trade_dts}")
ilen = len(trade_dts)
if ilen > 0:
start0,stop = trade_dts[0] # start0 是date交易日的开始时间
start,stop0 = trade_dts[-1]
if dt<start0 or dt>stop0:
continue
for start,stop in trade_dts:
if dt>=start and dt < stop:
if allday:
return idx,trade_dts
else:
return idx,[(start,stop)]
return None,[]
def get_trade_time_perday(self):
"""
计算每日的交易总时长(单位:分钟)
"""
TTPD = datetime.timedelta(0,0,0)
datetimes = []
today = datetime.datetime.now().date()
for (start,dn1),(stop,dn2) in self.time_dn_pairs:
start_dt = CHINA_TZ.localize(datetime.datetime.combine(today,start)) + datetime.timedelta(days=dn1)
stop_dt = CHINA_TZ.localize(datetime.datetime.combine(today,stop)) + datetime.timedelta(days=dn2)
time_delta = stop_dt - start_dt
TTPD = TTPD + time_delta
return int(TTPD.seconds/60)
def get_trade_time_inday(self,dt:datetime,unit:Timeunit=Timeunit.MINUTE):
"""
计算dt在交易日内的分钟数
unit: '1s':second;'1m':minute;'1h';1h
"""
TTID = datetime.timedelta(0,0,0)
day,trade_times = self.get_trade_datetimes(dt,allday=True)
if not trade_times:
return None
for start,stop in trade_times:
if dt > stop:
time_delta = stop - start
TTID += time_delta
elif dt > start:
time_delta = dt - start
TTID += time_delta
break
else:
break
if unit == Timeunit.SECOND:
return TTID.seconds
elif unit == Timeunit.MINUTE:
return int(TTID.seconds/60)
elif unit == Timeunit.HOUR:
return int(TTID.seconds/3600)
else:
return TTID
def get_day_tradetimes(self,dt:datetime):
"""
得到合约日盘的交易时间段
"""
index,trade_times = self.get_trade_datetimes(dt,allday=True)
trade_times1 = []
if trade_times:
for start_dt,stop_dt in trade_times:
if start_dt.time() < datetime.time(18,0,0):
trade_times1.append((start_dt,stop_dt))
return index,trade_times1
return (index,trade_times1)
def get_night_tradetimes(self,dt:datetime):
"""
得到合约夜盘的交易时间段
"""
index,trade_times = self.get_trade_datetimes(dt,allday=True)
trade_times1 = []
if trade_times:
for start_dt,stop_dt in trade_times:
if start_dt.time() > datetime.time(18,0,0):
trade_times1.append((start_dt,stop_dt))
return index,trade_times1
return (index,trade_times1)
def convet_to_datetime(self,day:int,minutes:int):
"""
计算minutes在第day交易日内的datetime形式的时间
"""
date = self.trade_index_date.get(day,None)
if date is None:
return None
idx,trade_times = self.trade_date_index.get(date,(None,[]))
if not trade_times: # 不一定必要
return None
for (start,stop) in trade_times:
timedelta = stop - start
if minutes < int(timedelta.seconds/60):
return start + datetime.timedelta(minutes=minutes)
else:
minutes -= int(timedelta.seconds/60)
return None
def get_bar_window(self,dt:datetime,window:int,interval:Interval=Interval.MINUTE):
"""
计算dt所在K线的起止时间
"""
bar_windows = (None,None)
day,trade_times = self.get_trade_datetimes(dt,allday=True)
if not trade_times:
# print(f"day={day} trade_times={trade_times}")
return bar_windows
# 求每个交易日的交易时间分钟数
TTPD = self.get_trade_time_perday()
# 求dt在交易日内的分钟数
TTID = self.get_trade_time_inday(dt,unit=Timeunit.MINUTE)
# 得到dt时刻K线的起止时间
total_minites = day*TTPD + TTID
# 计算K线宽度(分钟数)
if interval == Interval.MINUTE:
bar_width = window
elif interval == Interval.HOUR:
bar_width = 60*window
elif interval == Interval.DAILY:
bar_width = TTPD*window
elif interval == Interval.WEEKLY:
bar_width = TTPD*window*5
else:
return bar_windows
# 求K线的开始时间的和结束的分钟形式
start_m = int(total_minites/bar_width)*bar_width
stop_m = start_m + bar_width
# 计算K开始时间的datetime形式
start_d = int(start_m / TTPD)
minites = start_m % TTPD
start_dt = self.convet_to_datetime(start_d,minites)
# print(f"start_d={start_d} minites={minites}---->{start_dt}")
# 计算K结束时间的datetime形式
stop_d = int(stop_m / TTPD)
minites = stop_m % TTPD
stop_dt = self.convet_to_datetime(stop_d,minites)
# print(f"stop_d={stop_d} minites={minites}---->{stop_dt}")
return start_dt,stop_dt
def get_date_start_stop(self,dt:datetime):
"""
获得dt所在交易日的开始和停止时间
"""
index,trade_times = self.get_trade_datetimes(dt,allday=True)
if trade_times:
valid_dt = False
for t1,t2 in trade_times:
if t1 < dt and dt < t2:
valid_dt = True
break
if valid_dt:
start_dt = trade_times[0][0]
stop_dt = trade_times[-1][1]
return True,(start_dt,stop_dt)
return False,(None,None)
def get_day_start_stop(self,dt:datetime):
"""
获得dt所在交易日日盘的开始和停止时间
"""
index,trade_times = self.get_day_tradetimes(dt)
if trade_times:
valid_dt = False
for t1,t2 in trade_times:
if t1 < dt and dt < t2:
valid_dt = True
break
if valid_dt:
start_dt = trade_times[0][0]
stop_dt = trade_times[-1][1]
return True,(start_dt,stop_dt)
return False,(None,None)
def get_night_start_stop(self,dt:datetime):
"""
获得dt所在交易日夜盘的开始和停止时间
"""
index,trade_times = self.get_night_tradetimes(dt)
if trade_times:
valid_dt = False
for t1,t2 in trade_times:
if t1 < dt and dt < t2:
valid_dt = True
break
if valid_dt:
start_dt = trade_times[0][0]
stop_dt = trade_times[-1][1]
return True,(start_dt,stop_dt)
return False,(None,None)
if __name__ == "__main__":
rq.init('xxxxx','******',("rqdatad-pro.ricequant.com",16011))
# vt_symbols = ["rb2010.SHFE","ag2012.SHFE","i2010.DCE"]
vt_symbols = ["ag2012.SHFE"]
date0 = datetime.date(2020,8,31)
dt0 = CHINA_TZ.localize(datetime.datetime(2020,8,31,9,20,15))
for vt_symbol in vt_symbols:
symbol,exchange = extract_vt_symbol(vt_symbol)
th = TradeHours(symbol)
# trade_hours = th.get_date_tradetimes(date0)
# print(f"\n{vt_symbol} {date0} trade_hours={trade_hours}")
days,trade_hours = th.get_trade_datetimes(dt0,allday=True)
print(f"\n{vt_symbol} {dt0} days:{days} trade_hours={trade_hours}")
if trade_hours:
day_start = trade_hours[0][0]
day_end = trade_hours[-1][1]
print(f"day_start={day_start} day_end={day_end}")
exit_time = day_end + datetime.timedelta(minutes=-5)
print(f"exit_time={exit_time}")
dt1 = CHINA_TZ.localize(datetime.datetime(2020,8,31,9,20,15))
dt2 = CHINA_TZ.localize(datetime.datetime(2020,9,1,1,1,15))
for dt in [dt1,dt2]:
in_trade,(start,stop) = th.get_date_start_stop(dt)
if (in_trade):
print(f"\n{vt_symbol} 时间 {dt} 交易日起止:{start,stop}")
else:
print(f"\n{vt_symbol} 时间 {dt} 非交易时间")
in_day,(start,stop) = th.get_day_start_stop(dt)
if (in_day):
print(f"\n{vt_symbol} 时间 {dt} 日盘起止:{start,stop}")
else:
print(f"\n{vt_symbol} 时间 {dt} 非日盘时间")
in_night,(start,stop) = th.get_night_start_stop(dt)
if in_night:
print(f"\n{vt_symbol} 时间 {dt} 夜盘起止:{start,stop}")
else:
print(f"\n{vt_symbol} 时间 {dt} 非夜盘时间")
2.2 修改策略文件 RBreakerStrategy.py
代码如下:
from datetime import datetime,time,timedelta
from vnpy.app.cta_strategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager
)
from vnpy.trader.utility import extract_vt_symbol
from vnpy.usertools.trade_hour import TradeHours
class RBreakStrategy2(CtaTemplate):
""""""
author = "KeKe"
setup_coef = 0.25
break_coef = 0.2
enter_coef_1 = 1.07
enter_coef_2 = 0.07
fixed_size = 1
donchian_window = 30
trailing_long = 0.4
trailing_short = 0.4
multiplier = 3
buy_break = 0 # 突破买入价
sell_setup = 0 # 观察卖出价
sell_enter = 0 # 反转卖出价
buy_enter = 0 # 反转买入价
buy_setup = 0 # 观察买入价
sell_break = 0 # 突破卖出价
intra_trade_high = 0
intra_trade_low = 0
day_high = 0
day_open = 0
day_close = 0
day_low = 0
tend_high = 0
tend_low = 0
parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(RBreakStrategy2, self).__init__(
cta_engine, strategy_name, vt_symbol, setting
)
self.bg = BarGenerator(self.on_bar)
self.am = ArrayManager()
self.bars = []
symbol,exchange = vt_symbol.split('.')
self.trade_hour = TradeHours(symbol)
self.trade_datetimes = None
self.exit_time = None
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def is_new_day(self,dt:datetime):
"""
判断dt时间是否在当天的交易时间段内
"""
if not self.trade_datetimes:
return True
day_start = self.trade_datetimes[0][0]
day_end = self.trade_datetimes[-1][1]
if day_start<=dt and dt < day_end:
return False
return True
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
# 判断是否是下一交易日
self.new_day = self.is_new_day(bar.datetime)
if self.new_day:
# 计算下一交易日的交易时间段
days,self.trade_datetimes = self.trade_hour.get_trade_datetimes(bar.datetime,allday=True)
# 计算退出时间
# print(f"trade_datetimes={self.trade_datetimes}")
if self.trade_datetimes:
day_end = self.trade_datetimes[-1][1]
self.exit_time = day_end + timedelta(minutes=-5)
if not self.trade_datetimes:
# 不是个有效的K线,不可以处理,
# 为什么会有K线推送?因为非交易时段接口的行为是不可理喻的
return
self.bars.append(bar)
if len(self.bars) <= 2:
return
else:
self.bars.pop(0)
last_bar = self.bars[-2]
# New Day
if self.new_day: # 如果是新交易日
if self.day_open:
self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close) # 观察买入价
self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low) # 观察卖出价
self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high # 反转买入价
self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low # 反转卖出价
self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup) # 突破买入价
self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup) # 突破卖出价
self.day_open = bar.open_price
self.day_high = bar.high_price
self.day_close = bar.close_price
self.day_low = bar.low_price
# Today
else:
self.day_high = max(self.day_high, bar.high_price)
self.day_low = min(self.day_low, bar.low_price)
self.day_close = bar.close_price
if not self.sell_setup:
return
self.tend_high, self.tend_low = am.donchian(self.donchian_window)
if bar.datetime < self.exit_time:
if self.pos == 0:
self.intra_trade_low = bar.low_price
self.intra_trade_high = bar.high_price
if self.tend_high > self.sell_setup:
long_entry = max(self.buy_break, self.day_high)
self.buy(long_entry, self.fixed_size, stop=True)
self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
elif self.tend_low < self.buy_setup:
short_entry = min(self.sell_break, self.day_low)
self.short(short_entry, self.fixed_size, stop=True)
self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
self.cover(short_stop, abs(self.pos), stop=True)
# Close existing position
else:
if self.pos > 0:
self.sell(bar.close_price * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close_price * 1.01, abs(self.pos))
self.put_event()
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass