考虑到期货合约的交割属性,一个主力合约的活跃期大概在3到4个月。
对于半年以上的回测,通常都使用8888连续指数合约,连续指数合于是该品种所有合约按照成交量和价格加权算出来的,和主力合约有一定差异,而且很多时候指数合约的价格是带小数,不符合对应的价格最小变动,回测小时还有点差异。
增加使用连续主力合约,在切换采用虚拟移仓来拼接的功能,这样回测可以更好地接近与实际交易,这里简单讲下怎么实现。因为代码都是vnpy2上改动的,可能未必适用现在最新的,这里就简单列下思路和示例代码。
1. 通过数据源或者本地json文件来取得主力合约队列,比如比如2021到2024年的fu.HOT fu主力合约回测,返回一个这样包含开始日期,结束日期和主力合约名的字典的队列。 示例backtest_list:[{'start_date': datetime.datetime(2021, 6, 15, 0, 0), 'end_date': datetime.datetime(2021, 8, 9, 0, 0), 'contract_code': 'fu2109.SHFE'}, {'start_date': datetime.datetime(2021, 8, 9, 0, 0), 'end_date': datetime.datetime(2021, 12, 2, 0, 0), 'contract_code': 'fu2201.SHFE'} …]。我使用了Jquant的主力合约查询功能,这个rq也有,返回的按日队列,转换成为需要格式。
def get_daily_contracts(self, start_date, end_date):
"""使用 jqdatasdk 获取每日合约的方法"""
self.end = end_date
data = jq.get_dominant_future(self.hot_code, start_date, end_date)
# 将数据转换为 Series
return self.process_series(data)
def process_series(self, series):
"""处理系列数据得到结果队列的方法"""
result = []
current_contract = None
start_date = None
for date, contract in series.items():
if contract!= current_contract:
if current_contract:
result.append({
"start_date": datetime.strptime(start_date, '%Y-%m-%d'),
"end_date": datetime.strptime(date, '%Y-%m-%d'),
"contract_code": mddata_client.to_vn_symbol(current_contract)
})
current_contract = contract
start_date = date
if current_contract:
result.append({
"start_date": datetime.strptime(start_date, '%Y-%m-%d'),
"end_date": self.end,
"contract_code": mddata_client.to_vn_symbol(current_contract)
})
return result
2. 在backtester增强,当传入的合约是.HOT结尾,就使用连续主力合约拼接回测。传入查询主力合约队列,然后按照队列,按每个字典内容,导入backtesting engine进行回测。这里考虑到初始化天数,每个都有一定提前量。
def process_symbol(self, vt_symbol, start, end, subtract_days=0): # 增加了可设置减去天数的参数,默认为 0
hot_symbol, hot_exchange = vt_symbol.split(".")
result = []
if hot_exchange == "HOT":
hot_handler = HotFuturesHandler(hot_symbol)
download_list = hot_handler.get_daily_contracts(start, end)
self.write_log(f"{vt_symbol}-对应的主力合约分别为:")
for query_item in download_list:
self.write_log(f"从{self.convertTime(query_item['start_date'])} 到 {self.convertTime(query_item['end_date'])} 的"
f"主力合约是 {query_item['contract_code']}")
query_item['start_date'] = query_item['start_date'] - timedelta(days=subtract_days) # 根据传入参数减去相应天数
result.append(query_item)
else:
result.append({'start_date': start, 'end_date': end, 'contract_code': vt_symbol})
return result
3. 在backtesting engine , 有两个注意点,
第一个是初始化日期问题,当第一个主力合约结束后, 为了保持接下来下一个主力合约已经完成初始化,刚好在那个结束bar转为交易状态。这里是采用bar.datetime的比较,保留上一个主力合约的bar.datetime,下一个主力合约的开始日期提前一段时间,当回放的bar.datetime大于之前的最后一个,转入交易状态。
第二个是未平持仓移仓;当上一个合约结束时候,如果最后一个交易offset是OPEN,增加一个模拟的close交易,交易时间是最后一个bar价格。在新的合约第一个bar中增加一个OPEN的交易,实现移仓。
for ix, data in enumerate(self.history_data):
if self.trading_start_date is None and self.datetime and data.datetime.day != self.datetime.day:
day_count += 1
if day_count >= self.days:
break
elif self.trading_start_date and data.datetime > self.trading_start_date:
#如果bar的时间大于等于上个合约最后最后一个bar,如果self.legacy_trade存在,则新建一个trade放入
if self.legacy_trade:
new_trade = self.legacy_trade
new_trade.symbol = data.symbol
new_trade.datetime = data.datetime
new_trade.orderid = str(self.trade_count)
new_trade.tradeid = str(self.trade_count)
new_trade.vt_tradeid = f"{self.gateway_name}.{new_trade.tradeid}"
new_trade.price = new_trade.price + (data.open_price - self.last_bar.close_price)
self.trades[new_trade.vt_tradeid] = new_trade
self.output(f"****生成移仓开单,上个bar结束价格{self.last_bar.close_price},当前bar开仓价格{data.open_price}"
f"交易编号{new_trade.tradeid},"
f"时间{new_trade.datetime},合约{new_trade.symbol},价格{new_trade.price},方向{new_trade.direction}")
# 在strategy中新增移仓开单数据
self.strategy.pos = new_trade.volume if new_trade.direction == Direction.LONG else -new_trade.volume
self.strategy.entry_price = new_trade.price
self.strategy.PosPrice = new_trade.price
break
...............................
if self.contract_num > 1:
# DONE 如果后续还有合约,读取当前是否self.trades最后一个trade,如果是open,从self.trades pop这条
# 传入self.legacy_trade,然后修改价差后放入下一个合约的self.trades,然后保留最后一个bar。
self.legacy_trade = None
self.last_bar = copy.copy(backtesting_data[-1])
self.trading_start_date = self.last_bar.datetime
self.contract_num = self.contract_num -1
if self.trades:
last_trade_tuple = list(self.trades.items())[-1]
# last_trade = self.trades[self.gateway_name + "."+ str(self.trade_count)]
last_trade =last_trade_tuple[-1]
if last_trade.offset == Offset.OPEN:
self.legacy_trade = copy.copy(last_trade)
# 如果有未平的deal,生成一个对应平仓deal,对应时间最后一个bar
rollover_trade = copy.copy(last_trade)
if last_trade.direction == Direction.LONG:
rollover_trade.direction = Direction.SHORT
else:
rollover_trade.direction = Direction.LONG
rollover_trade.offset = Offset.CLOSE
rollover_trade.tradeid = str(int(rollover_trade.tradeid)+1)
rollover_trade.orderid = str(int(rollover_trade.orderid)+1)
rollover_trade.datetime = self.trading_start_date
rollover_trade.vt_tradeid = f"{self.gateway_name}.{rollover_trade.tradeid}"
self.trades[rollover_trade.vt_tradeid] = rollover_trade
self.output(f"****生成同价位移仓平单,交易编号{rollover_trade.tradeid},"
f"时间{rollover_trade.datetime},合约{rollover_trade.symbol},价格{rollover_trade.price},方向{rollover_trade.direction}")
4. 在backtester中增强,把每个主力合约回测的结果数据拼接一起,共回测引擎分析
For backtest_item in backtest_list:
………..
………..
trades_list.extend(self.get_all_trades())
result_df_list.append(engine.calculate_result())
daily_results_dict.update(engine.daily_results)
self.result_df=pd.concat(result_df_list)
dict_trades_data={f'BACKTESTING.{i+1}':valuefori,valueinenumerate(trades_list)}
engine.trades=dict_trades_data
engine.daily_results=daily_results_dict
engine.daily_df=self.result_df