经过一段时间研究,基本实现多股轮动策略,成功实现回测,下面是对vnpy引擎代码的一些修改之处:
修改vnpy\app\portfolio_strategy\engine.py
在_init_strategy函数后面增加一个函数:
def add_subscribe(self, strategy: StrategyTemplate, vt_symbols: List[str]):
# Subscribe market data
for vt_symbol in vt_symbols:
contract: ContractData = self.main_engine.get_contract(vt_symbol)
if contract:
req = SubscribeRequest(
symbol=contract.symbol, exchange=contract.exchange)
self.main_engine.subscribe(req, contract.gateway_name)
else:
self.write_log(f"行情订阅失败,找不到合约{vt_symbol}", strategy)
修改vnpy\app\portfolio_strategy\template.py
在第239行增加一个函数:
def add_subscribe(self, vt_symbols: List[str]):
self.strategy_engine.add_subscribe(self, vt_symbols)
修改vnpy\app\portfolio_strategy\backtesting.py
在类PortfolioDailyResult的最后一个函数update_close_prices修改最后追加代码:
原来:
if contract_result:
contract_result.update_close_price(close_price)
修改为:
if contract_result:
contract_result.update_close_price(close_price)
else:
self.contract_results[vt_symbol] = ContractDailyResult(self.date, close_price)
在函数run_backtesting之前增加一个函数:
def add_subscribe(self, strategy: StrategyTemplate, add_vt_symbols: List[str]):
print('追加订阅:', self.vt_symbols)
for vt_symbol in add_vt_symbols:
self.vt_symbols.append(vt_symbol)
print('追加完成:', self.vt_symbols)
#print('已加载历史时间:',self.dts)
#print('已加载历史数据:',self.history_data)
# Load 30 days of data each time and allow for progress update
progress_delta = timedelta(days=30)
total_delta = self.end - self.start
interval_delta = INTERVAL_DELTA_MAP[self.interval]
for vt_symbol in add_vt_symbols:
# 追加相应测试参数,从第一个symbol复制一样的参数
self.size[vt_symbol]=self.size[self.vt_symbols[0]]
self.rates[vt_symbol]=self.rates[self.vt_symbols[0]]
self.slippages[vt_symbol]=self.slippages[self.vt_symbols[0]]
self.priceticks[vt_symbol]=self.priceticks[self.vt_symbols[0]]
start = self.start
end = self.start + progress_delta
progress = 0
data_count = 0
while start < self.end:
end = min(end, self.end) # Make sure end time stays within set range
data = load_bar_data(
vt_symbol,
self.interval,
start,
end
)
for bar in data:
self.dts.add(bar.datetime)
self.history_data[(bar.datetime, vt_symbol)] = bar
data_count += 1
progress += progress_delta / total_delta
progress = min(progress, 1)
progress_bar = "#" * int(progress * 10)
self.output(f"{vt_symbol}加载进度:{progress_bar} [{progress:.0%}]")
start = end + interval_delta
end += (progress_delta + interval_delta)
self.output(f"{vt_symbol}历史数据加载完成,数据量:{data_count}")
#print('追加加载历史时间:',self.dts)
#print('追加加载历史数据:',self.history_data)
self.output("所有历史数据加载完成")
多股轮动策略参照 ,主要如下:
def on_bars(self, bars: Dict[str, BarData]):
里面发现是新的一天,则执行自己的选股函数,得到股票代码,追加 self.vt_symbols.append(onefund.vt_symbol);
prtlog('追加订阅股票代码:',add_vt_symbols)
#print(self.pos_cala_datas)
def on_bar(bar: BarData):
""""""
pass
for vt_symbol in add_vt_symbols:
# 追加K线管理器
self.bgs[vt_symbol] = BarGenerator(on_bar)
# 追加订阅
self.add_subscribe(add_vt_symbols)
# TODO:此处要考虑退订,否则策略跑久了,订阅会越来越多,但需要修改引擎,如果速度尚可可不做
另外,如果完美一点,可以继续修改引擎代码,增加退订函数,调用gateway的相应退订接口实现。