基于python3.8以上版本,我个人是最新的vnpy3.6.0。
首先该功能的实现可以节省多进程回测时大部分的内存消耗,原有1个进程准备1份K线数据,现在可以16个进程准备1分K线数据,占用率大大降低。
感谢Q群上的“广西_漫步“提供的代码,我在此基础上整理了一下实现的思路与难点,并提供部分代码,为后续有必要实现这个功能的小伙伴提供参考。代码改的部分非常多,所以不建议小白去尝试,除非是确实有这个需求非要实现不可。
以下是实现时要解决的问题:
1,官方目前应该还没支持多标的遗传算法(如果有错可以指出),所以要个人实现多进程遗传算法参数优化。(多看下CTA单合约部分是怎么实现的,再改到多标的)
2,K线数据怎么通过ShareMemory共享到各进程里面。在这里,我花费了很多时间去研究,最后还是通过参考“广西_漫步”的代码,才悟出了实现方式。简单的说,就是将原有的历史数据拆分成12个ShareMemory,拆分前先按时间排序,然后再按次序拆分。backtesting读取的时候,直接去遍历整个history数据,由于已经按时间排序好了,所以就符合原有backtesting的数据按时间推送的逻辑。
代码如下,混杂了很多我自己的代码,如果有确实要实现这个功能的小伙伴可以爬下这个屎山,按上面这两个处理难题将其中精华的部分抽取出来,就可以实现自己的多进程多标的遗传算法优化功能了。难度还是有的,但是这个路子是可以走得通的,值得花时间研究研究。
第一份代码是my_backtesting.py,对应的改动原版是官方的vnpy_portfoliostrategy.backtesting文件
# coding=utf-8
from datetime import date, datetime, timedelta
from typing import Dict, List, Set, Tuple
import traceback
from functools import partial
import os
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import seaborn as sns
from pandas import DataFrame
from empyrical import cagr, annual_volatility, downside_risk
from dataclasses import dataclass
from vnpy.trader.constant import Direction, Offset, Interval, Status
from vnpy.trader.object import OrderData, TradeData, BarData, BaseData
from vnpy.trader.utility import round_to, extract_vt_symbol, generate_vt_symbol
from vnpy.trader.constant import Exchange
from vnpy_ctastrategy.backtesting import get_target_value
from vnpy_portfoliostrategy.backtesting import BacktestingEngine, PortfolioDailyResult, ContractDailyResult,\
INTERVAL_DELTA_MAP
from vnpy_portfoliostrategy.template import StrategyTemplate
from vnpy_spreadtrading.backtesting import BacktestingMode
from my_vnpy.my_trader.my_optimize import my_run_ga_optimization, OptimizationSetting, check_optimization_setting
from my_vnpy.my_app.my_portfolio_strategy.my_template import MyStrategyTemplate
from my_vnpy.my_trader.my_database import my_database_manager
from FuturesResearch.Definite_value import FuturesSchema, IndexFuturesCode, ALL_FUTURES_CODE_LIST
from my_vnpy.my_app.my_portfolio_strategy.my_utility import set_ratios, UTC
from SaveData.SaveDateUtility import get_next_trade_date
# Set seaborn style
sns.set_style("whitegrid")
class MyBacktestingEngine(BacktestingEngine):
""""""
gateway_name = "BACKTESTING"
def __init__(self):
""""""
super().__init__()
self.mode = BacktestingMode.BAR
self.inverse = False
self.strategy: MyStrategyTemplate = None
self.strategy_class = None
self.trades_by_datetime = {} # 2021-11-15用来计算每笔盈亏的魔改参数
# 2021-11-15用来计算每笔盈亏的魔改参数
self.dynamic_total_occupy_capital_on_minute = 0 # 这个根据每分钟的数据都改变
self.dynamic_profit_and_loss = 0
self.dynamic_commission = 0
# 2022-11-22新加
self.margin_ratios: Dict[str, float] = 0
self.dynamic_total_capital = self.capital # 这里新增一个动态仓位参数
self.advanced_stop_dynamic_capital_min = 120000 # 这里设置,如果动态资金小于12万,就提前结束回测
self.dynamic_total_occupy_capital = 0
self.risk_free: float = 0.02
self.vt_symbols_dynamic_pnl: Dict[str, VtSymbolDynamicPnl] = {}
# 下面是为了共享内存多进程回测加入的参数
self.optimization_count = 0
self.zw = None
self.my_stats = None
self.fixed_param_dict = {}
self.current_pid = os.getpid()
def check_data(self, h_data, d_data):
for vt_symbol in self.vt_symbols:
for dt in self.dts:
bar = self.history_data.get((dt, vt_symbol))
if bar is None:
print(dt, vt_symbol)
vt_s_len = len(self.vt_symbols)
if len(d_data) * vt_s_len != len(h_data):
raise ValueError(f"history的总数必须等于 vt_symbols的数量 乘以 dts,dts长度:{len(d_data)}")
def new_bars(self, dt: datetime) -> None:
""""""
self.datetime = dt
self.bars.clear()
# 2022-01-18新加,预期加入动态的vt_symbols
for vt_symbol in self.vt_symbols:
bar = self.history_data.get((dt, vt_symbol), None)
if bar:
# print('这里是new_bar里面合成bars的bar:', bar.__dict__.items())
self.bars[vt_symbol] = bar
else:
dt_str = dt.strftime("%Y-%m-%d %H:%M:%S")
if vt_symbol == 'AU.SHFE':
if (dt.hour < 9 or
(dt.hour == 9 and dt.minute <= 30) or
dt.hour > 15 or
(dt.hour == 10 and 15 <= dt.minute <= 30) or
(dt.hour == 13 and dt.minute < 30)):
# self.output(f"数据缺失:{dt_str} {vt_symbol}")
return
elif dt.year == 2020 and dt.month == 7 and dt.day == 24:
return
elif dt.year == 2019 and dt.month == 5 and dt.day == 10:
return
elif dt.year == 2019 and dt.month == 11 and dt.day == 19:
return
elif dt.year == 2018 and dt.month == 11 and dt.day == 12:
return
elif dt.year == 2018 and dt.month == 5 and dt.day == 7:
return
elif dt.year == 2017 and dt.month == 11 and dt.day == 15:
return
elif dt.year == 2017 and dt.month == 5 and dt.day == 8:
return
elif dt.year == 2016 and dt.month == 11 and dt.day == 11:
return
elif dt.year == 2016 and dt.month == 5 and dt.day == 5:
return
self.output(f"数据缺失:{dt_str} {vt_symbol}")
return
self.cross_limit_order()
self.update_daily_close(self.bars, dt) # 这里调换顺序,先算当日收益,再把bars输入on_bars计算。
self.update_dynamic_capital(dt, self.bars) # 更新剩余资金
self.strategy.on_bars(self.bars)
def cross_limit_order(self) -> None:
"""
Cross limit order with last bar/tick data.
"""
for order in list(self.active_limit_orders.values()):
bar = self.bars[order.vt_symbol]
long_cross_price = bar.low_price
short_cross_price = bar.high_price
long_best_price = bar.open_price
short_best_price = bar.open_price
# Push order update with status "not traded" (pending).
if order.status == Status.SUBMITTING:
order.status = Status.NOTTRADED
self.strategy.update_order(order)
# Check whether limit orders can be filled.
long_cross = (
order.direction == Direction.LONG
and order.price >= long_cross_price
and long_cross_price > 0
)
short_cross = (
order.direction == Direction.SHORT
and order.price <= short_cross_price
and short_cross_price > 0
)
if not long_cross and not short_cross:
continue
# Push order update with status "all traded" (filled).
order.traded = order.volume
order.status = Status.ALLTRADED
self.strategy.update_order(order)
self.active_limit_orders.pop(order.vt_orderid)
# Push trade update
self.trade_count += 1
if long_cross:
trade_price = min(order.price, long_best_price)
else:
trade_price = max(order.price, short_best_price)
trade = TradeData(
symbol=order.symbol,
exchange=order.exchange,
orderid=order.orderid,
tradeid=str(self.trade_count),
direction=order.direction,
offset=order.offset,
price=trade_price,
volume=order.volume,
datetime=self.datetime,
gateway_name=self.gateway_name,
)
self.strategy.update_trade(trade)
self.trades[trade.vt_tradeid] = trade
# 这里经过魔改,将trade按datetime为key加入字典
if trade.datetime in self.trades_by_datetime.keys():
self.trades_by_datetime[trade.datetime].append(trade)
else:
self.trades_by_datetime[trade.datetime] = [trade]
def update_dynamic_capital(self, dt: datetime, bars) -> None:
# self.output("开始计算逐日盯市实时资金净值")
if not self.vt_symbols_dynamic_pnl:
# 如果vt_symbols_dynamic_pnl是空字典,就初始化,生成每个symbol对应的变量容器
for vt_symbol in self.vt_symbols:
vt_symbol_dynamic_pnl = VtSymbolDynamicPnl(
vt_symbol=vt_symbol,
rate=self.rates[vt_symbol],
slippage=self.slippages[vt_symbol],
size=self.sizes[vt_symbol],
margin_ratio=self.margin_ratios[vt_symbol],
gateway_name=self.gateway_name
)
self.vt_symbols_dynamic_pnl[vt_symbol] = vt_symbol_dynamic_pnl
# 归纳每天持仓的市值,计算每天的每个品种累计未平仓的盈亏
if not self.trades_by_datetime:
self.strategy.update_capital(self.dynamic_total_capital, self.dynamic_total_occupy_capital)
return
if dt in self.trades_by_datetime:
for trade in self.trades_by_datetime[dt]:
# 将每一笔交易计算动态盈亏及其保证金持仓
self.calculate_symbol_dynamic_capital(trade)
# 2022-11-22改动,将所有持仓手续费汇总再加进去
all_vt_symbol_dynamic_commission = 0
all_vt_symbol_submit_dynamic_total_occupy_capital = 0
_all_symbol_dynamic_pnl = 0
all_total_pnl = 0
_dynamic_total_pnl = 0
for vt_symbol in self.vt_symbols_dynamic_pnl:
all_vt_symbol_dynamic_commission += self.vt_symbols_dynamic_pnl[
vt_symbol].dynamic_open_pos_commission
all_total_pnl += self.vt_symbols_dynamic_pnl[vt_symbol].total_pnl
dynamic_volume = self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_volume
if dynamic_volume != 0:
now_close_price = bars[vt_symbol].close_price
dynamic_cost_price = self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_cost_price
size = self.vt_symbols_dynamic_pnl[vt_symbol].size
_dynamic_pnl = (now_close_price - dynamic_cost_price) * dynamic_volume
_dynamic_total_pnl += _dynamic_pnl
single_submit_dynamic_total_occupy_capital = now_close_price * size * self.margin_ratios[vt_symbol] * \
dynamic_volume
all_vt_symbol_submit_dynamic_total_occupy_capital += single_submit_dynamic_total_occupy_capital
sing_commission = now_close_price * dynamic_volume * size * self.vt_symbols_dynamic_pnl[vt_symbol].rate
if self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_direction == Direction.LONG:
_pnl = (now_close_price - dynamic_cost_price) * dynamic_volume * size + \
self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_pnl - sing_commission
else:
_pnl = (dynamic_cost_price - now_close_price) * dynamic_volume * size + \
self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_pnl - sing_commission
_all_symbol_dynamic_pnl += _pnl
self.dynamic_total_occupy_capital = all_vt_symbol_submit_dynamic_total_occupy_capital
self.dynamic_total_capital = self.capital + all_total_pnl + _dynamic_total_pnl + _all_symbol_dynamic_pnl
self.strategy.update_capital(self.dynamic_total_capital, all_vt_symbol_submit_dynamic_total_occupy_capital)
def calculate_symbol_dynamic_capital(self, trade: TradeData):
# 首先考虑的是原本这个symbol没有仓位的情况下
trade_vt_symbol = generate_vt_symbol(trade.symbol, trade.exchange)
_vt_symbol_dynamic_pnl: VtSymbolDynamicPnl = self.vt_symbols_dynamic_pnl[trade_vt_symbol]
_rate = _vt_symbol_dynamic_pnl.rate
_size = _vt_symbol_dynamic_pnl.size
_margin_ratio = _vt_symbol_dynamic_pnl.margin_ratio
# 这里还要区分多空交易,从而导致的滑点加减方向不同
if trade.direction == Direction.LONG:
_actual_trade_price = trade.price + _vt_symbol_dynamic_pnl.slippage
else:
_actual_trade_price = trade.price - _vt_symbol_dynamic_pnl.slippage
_single_turnover = _actual_trade_price * _size * trade.volume
_single_commission = _single_turnover * _rate
# 2023-03-06新加考虑空方向
if _vt_symbol_dynamic_pnl.dynamic_volume == 0:
# 新开仓
_vt_symbol_dynamic_pnl.dynamic_direction = trade.direction
_vt_symbol_dynamic_pnl.dynamic_volume = trade.volume
_vt_symbol_dynamic_pnl.dynamic_open_pos_commission = _single_commission
_vt_symbol_dynamic_pnl.dynamic_occupy_capital = _single_turnover * _margin_ratio + _single_commission
_vt_symbol_dynamic_pnl.dynamic_cost_price = _actual_trade_price
elif _vt_symbol_dynamic_pnl.dynamic_volume != 0 and trade.offset == Offset.OPEN:
# 原有多仓加仓
_vt_symbol_dynamic_pnl.dynamic_open_pos_commission += _single_commission
_vt_symbol_dynamic_pnl.dynamic_occupy_capital += _single_turnover * _margin_ratio + _single_commission
_vt_symbol_dynamic_pnl.dynamic_cost_price = (
_actual_trade_price * trade.volume +
_vt_symbol_dynamic_pnl.dynamic_cost_price * _vt_symbol_dynamic_pnl.dynamic_volume) / \
(trade.volume + _vt_symbol_dynamic_pnl.dynamic_volume)
_vt_symbol_dynamic_pnl.dynamic_volume += trade.volume
elif trade.offset == Offset.CLOSE and trade.volume == _vt_symbol_dynamic_pnl.dynamic_volume:
# 这种是全平了的情况
if trade.direction == Direction.SHORT:
# 多头平仓,下的指令单为空单
trade_pnl_without_commission = (_actual_trade_price - _vt_symbol_dynamic_pnl.dynamic_cost_price) * _size * \
trade.volume
else:
trade_pnl_without_commission = (_vt_symbol_dynamic_pnl.dynamic_cost_price - _actual_trade_price) * _size * \
trade.volume
_vt_symbol_dynamic_pnl.dynamic_open_pos_commission += _single_commission
trade_pnl = trade_pnl_without_commission - _vt_symbol_dynamic_pnl.dynamic_open_pos_commission
_vt_symbol_dynamic_pnl.dynamic_pnl += trade_pnl
_vt_symbol_dynamic_pnl.total_pnl += _vt_symbol_dynamic_pnl.dynamic_pnl
# 计算完了盈亏情况,开始将动态数据清零
_vt_symbol_dynamic_pnl.dynamic_direction = None
_vt_symbol_dynamic_pnl.dynamic_open_pos_commission = 0
_vt_symbol_dynamic_pnl.dynamic_volume = 0
_vt_symbol_dynamic_pnl.dynamic_occupy_capital = 0
_vt_symbol_dynamic_pnl.dynamic_cost_price = 0
_vt_symbol_dynamic_pnl.dynamic_pnl = 0 # 上面已经将盈亏加入总盈亏,所以这里可以清零了
elif trade.volume != _vt_symbol_dynamic_pnl.dynamic_volume and trade.offset == Offset.CLOSE:
# 这里是半平仓的情况
_vt_symbol_dynamic_pnl.dynamic_volume -= trade.volume
# 根据现在动态交易额,反推该交易额实际会占用多少资金
_vt_symbol_dynamic_pnl.dynamic_occupy_capital -= _single_turnover * _margin_ratio + \
_vt_symbol_dynamic_pnl.dynamic_open_pos_commission + \
_single_commission
# 计算一次减仓的盈亏
if trade.direction == Direction.SHORT:
# 多头平仓,下的指令单为空单
trade_pnl = (_actual_trade_price - _vt_symbol_dynamic_pnl.dynamic_cost_price) * _size * trade.volume - \
_single_commission
else:
trade_pnl = (_vt_symbol_dynamic_pnl.dynamic_cost_price - _actual_trade_price) * _size * trade.volume - \
_single_commission
_vt_symbol_dynamic_pnl.dynamic_pnl += trade_pnl
_vt_symbol_dynamic_pnl.total_trade_times += 1
def my_set_parameters(
self,
vt_symbols: List[str],
interval: Interval,
start: datetime,
rates: Dict[str, float] or int or float,
slippages: Dict[str, float] or int or float,
sizes: Dict[str, float] or int or float,
priceticks: Dict[str, float] or int or float,
margin_ratios: Dict[str, float] or int or float,
capital: int = 0,
end: datetime = None,
risk_free: float = 0
) -> None:
""""""
if len(vt_symbols) == 2 and vt_symbols[0] in ALL_FUTURES_CODE_LIST:
futures_code = vt_symbols[0]
if futures_code == IndexFuturesCode.IC:
self.index_symbol = '000905.SSE'
elif futures_code == IndexFuturesCode.IF:
self.index_symbol = '000300.SSE'
elif futures_code == IndexFuturesCode.IH:
self.index_symbol = '000016.SSE'
self.vt_symbols.append(self.index_symbol)
if vt_symbols[1] == 'backtesting':
this_month_wfq_symbol = futures_code + 'L088.CFFEX'
this_month_hfq_symbol = futures_code + 'L0889.CFFEX'
next_month_wfq_symbol = futures_code + 'L188.CFFEX'
next_month_hfq_symbol = futures_code + 'L1889.CFFEX'
first_quarter_wfq_symbol = futures_code + 'L288.CFFEX'
first_quarter_hfq_symbol = futures_code + 'L2889.CFFEX'
second_quarter_wfq_symbol = futures_code + 'L388.CFFEX'
second_quarter_hfq_symbol = futures_code + 'L3889.CFFEX'
self.vt_symbols.append(this_month_wfq_symbol)
self.vt_symbols.append(this_month_hfq_symbol)
self.vt_symbols.append(next_month_wfq_symbol)
self.vt_symbols.append(next_month_hfq_symbol)
self.vt_symbols.append(first_quarter_wfq_symbol)
self.vt_symbols.append(first_quarter_hfq_symbol)
self.vt_symbols.append(second_quarter_wfq_symbol)
self.vt_symbols.append(second_quarter_hfq_symbol)
else:
self.vt_symbols = vt_symbols
self.interval = interval
# 其实只要一个是,其他都是,设置统一的比例,手续费,滑点等
if isinstance(rates, int) or isinstance(rates, float):
self.rates = set_ratios(rates, self.vt_symbols)
self.slippages = set_ratios(slippages, self.vt_symbols)
self.sizes = set_ratios(sizes, self.vt_symbols)
self.priceticks = set_ratios(priceticks, self.vt_symbols)
self.margin_ratios = set_ratios(margin_ratios, self.vt_symbols)
else:
self.rates = rates
self.slippages = slippages
self.sizes = sizes
self.priceticks = priceticks
self.margin_ratios = margin_ratios
self.start = start
self.end = end
self.capital = capital
self.risk_free = risk_free
self.dynamic_total_capital = capital # 魔改加入,为了二次使用engine的时候不用初始化
def load_data(self) -> None:
""""""
self.output("开始加载历史数据")
if not self.end:
self.end = datetime.now()
if self.start >= self.end:
self.output("起始日期必须小于结束日期")
return
# Clear previously loaded history data
self.history_data.clear()
self.dts.clear()
# Load 30 days of data each time and allow for progress update
progress_delta = timedelta(days=365)
total_delta = self.end - self.start
interval_delta = INTERVAL_DELTA_MAP[self.interval]
times = 0
for vt_symbol in self.vt_symbols:
start = self.start
end = self.start + progress_delta
progress = 0
data_count = 0
while start < self.end:
end = min(end, self.end) # Make sure end time stays within set range
data = load_bar_data_without_lru_cache(
vt_symbol,
self.interval,
start,
end
)
for bar in data:
self.dts.add(bar.datetime)
self.history_data[(bar.datetime, vt_symbol)] = bar
data_count += 1
progress += progress_delta / total_delta
progress = min(progress, 1)
progress_bar = "#" * int(progress * 10)
start = end + interval_delta
end += (progress_delta + interval_delta)
self.output(f"{vt_symbol}历史数据加载完成,数据量:{data_count}")
self.output("所有历史数据加载完成")
def run_backtesting(self) -> None:
""""""
self.strategy.on_init()
# Generate sorted datetime list
dts = list(self.dts)
dts.sort()
# Use the first [days] of history data for initializing strategy
day_count = 0
ix = 0
for ix, dt in enumerate(dts):
if self.datetime and dt.day != self.datetime.day:
day_count += 1
if day_count >= self.days:
break
try:
self.new_bars(dt)
except Exception:
self.output("触发异常,回测终止")
self.output(traceback.format_exc())
return
self.strategy.inited = True
self.output("策略初始化完成")
self.strategy.on_start()
self.strategy.trading = True
self.output("开始回放历史数据")
# Use the rest of history data for running backtesting
end_day = 0
for dt in dts[ix:]:
try:
if dt.day == end_day:
print("dt:", dt)
print("self.dynamic_total_capital:", self.dynamic_total_capital)
self.output("历史数据回放提前结束,资金清零")
return
self.new_bars(dt)
if self.dynamic_total_capital < self.advanced_stop_dynamic_capital_min:
if end_day == 0:
dt_str = dt.strftime('%Y%m%d')
end_day = int(get_next_trade_date(date=dt_str, add_day=1)[6:8])
except Exception:
self.output("触发异常,回测终止")
self.output(traceback.format_exc())
return
self.output("历史数据回放结束")
def clear_data(self) -> None:
"""
Clear all data of last backtesting.
"""
self.strategy = None
self.bars = {}
self.datetime = None
self.limit_order_count = 0
self.limit_orders.clear()
self.active_limit_orders.clear()
self.trade_count = 0
self.trades.clear()
self.logs.clear()
self.daily_results.clear()
self.daily_df = None
# 下面加入自己的魔改的变量的清零
self.trades_by_datetime = {}
self.dts: Set[datetime] = set()
self.dynamic_total_capital = 0
self.dynamic_total_occupy_capital = 0
self.days = 0
self.vt_symbols_dynamic_pnl = {}
self.history_data = {}
def calculate_statistics(self, df: DataFrame = None, output=True) -> None:
""""""
self.output("开始计算策略统计指标")
# Check DataFrame input exterior
if df is None:
df = self.daily_df
# Check for init DataFrame
if df is None:
# Set all statistics to 0 if no trade.
# 年化复合增长率
cagr_info = 0 # 这里魔改过
annual_volatility_info = 0
annual_downside_risk = 0
start_date = ""
end_date = ""
total_days = 0
profit_days = 0
loss_days = 0
end_balance = 0
max_drawdown = 0
max_ddpercent = 0
max_drawdown_duration = 0
total_net_pnl = 0
daily_net_pnl = 0
total_commission = 0
daily_commission = 0
total_slippage = 0
daily_slippage = 0
total_turnover = 0
daily_turnover = 0
total_trade_count = 0
daily_trade_count = 0
total_return = 0
annual_return = 0
daily_return = 0
return_std = 0
sharpe_ratio = 0
return_drawdown_ratio = 0
max_ddpercent_start = 0 # 2022-08-09魔改
max_drawdown_start = 0
else:
# Calculate balance related time series data
df["balance"] = df["net_pnl"].cumsum() + self.capital
try:
df["return"] = np.log(df["balance"] / df["balance"].shift(1)).fillna(0)
except:
# 出现了log错误
print('error1')
print('............')
df["highlevel"] = (
df["balance"].rolling(
min_periods=1, window=len(df), center=False).max()
)
df["drawdown"] = df["balance"] - df["highlevel"]
df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100
# Calculate statistics value
start_date = df.index[0]
end_date = df.index[-1]
total_days = len(df)
profit_days = len(df[df["net_pnl"] > 0])
loss_days = len(df[df["net_pnl"] < 0])
end_balance = df["balance"].iloc[-1]
max_drawdown = df["drawdown"].min()
max_ddpercent = df["ddpercent"].min()
max_ddpercent_end = df["ddpercent"].idxmin()
max_drawdown_end = df["drawdown"].idxmin()
if isinstance(max_ddpercent_end, date):
max_ddpercent_start = df["balance"][:max_ddpercent_end].idxmax()
max_ddpercent_duration = (max_ddpercent_end - max_ddpercent_start).days
else:
max_ddpercent_start = False
max_ddpercent_duration = 0
if isinstance(max_drawdown_end, date):
max_drawdown_start = df["balance"][:max_drawdown_end].idxmax()
max_drawdown_duration = (max_drawdown_end - max_drawdown_start).days
else:
max_drawdown_start = False
max_drawdown_duration = 0
total_net_pnl = df["net_pnl"].sum()
daily_net_pnl = total_net_pnl / total_days
total_commission = df["commission"].sum()
daily_commission = total_commission / total_days
total_slippage = df["slippage"].sum()
daily_slippage = total_slippage / total_days
total_turnover = df["turnover"].sum()
daily_turnover = total_turnover / total_days
total_trade_count = df["trade_count"].sum()
daily_trade_count = total_trade_count / total_days
total_return = (end_balance / self.capital - 1) * 100
annual_return = total_return / total_days * 240
daily_return = df["return"].mean() * 100
return_std = df["return"].std() * 100
# 年化波动率
annual_volatility_info = annual_volatility(df['return'])
# 年化复合增长率
cagr_info = cagr(df['return'])
# 年化下行风险率
annual_downside_risk = downside_risk(df['return'])
if return_std:
daily_risk_free = self.risk_free / np.sqrt(240)
sharpe_ratio = (daily_return - daily_risk_free) / return_std * np.sqrt(240)
else:
sharpe_ratio = 0
return_drawdown_ratio = -total_net_pnl / max_drawdown
# Output
if output:
self.output("-" * 30)
self.output(f"首个交易日:\t{start_date}")
self.output(f"最后交易日:\t{end_date}")
self.output(f"总交易日:\t{total_days}")
self.output(f"盈利交易日:\t{profit_days}")
self.output(f"亏损交易日:\t{loss_days}")
self.output(f"起始资金:\t{self.capital:,.2f}")
self.output(f"结束资金:\t{end_balance:,.2f}")
self.output(f"总收益率:\t{total_return:,.2f}%")
self.output(f"年化收益:\t{annual_return:,.2f}%")
self.output(f"最大回撤: \t{max_drawdown:,.2f}")
if max_ddpercent_start:
self.output(f"百分比最大回撤: \t{max_ddpercent:,.2f}%, 最大回撤日期:\t{max_ddpercent_start}至{max_ddpercent_end}, 最长回撤天数: \t{max_ddpercent_duration}")
if max_drawdown_start:
self.output(f"最大回撤日期:\t{max_drawdown_start}至{max_drawdown_end}最长回撤天数: \t{max_drawdown_duration}")
self.output(f"总盈亏:\t{total_net_pnl:,.2f}")
self.output(f"总手续费:\t{total_commission:,.2f}")
self.output(f"总滑点:\t{total_slippage:,.2f}")
self.output(f"总成交金额:\t{total_turnover:,.2f}")
self.output(f"总成交笔数:\t{total_trade_count}")
self.output(f"日均盈亏:\t{daily_net_pnl:,.2f}")
self.output(f"日均手续费:\t{daily_commission:,.2f}")
self.output(f"日均滑点:\t{daily_slippage:,.2f}")
self.output(f"日均成交金额:\t{daily_turnover:,.2f}")
self.output(f"日均成交笔数:\t{daily_trade_count}")
self.output(f"日均收益率:\t{daily_return:,.2f}%")
self.output(f"收益标准差:\t{return_std:,.2f}%")
self.output(f"年化波动率:\t{annual_volatility_info:,.3f}")
self.output(f"年化复合增长率:\t{cagr_info:,.3f}")
self.output(f"年化下行风险率:\t{annual_downside_risk:,.3f}")
self.output(f"Sharpe Ratio:\t{sharpe_ratio:,.2f}")
self.output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")
statistics = {
"start_date": start_date,
"end_date": end_date,
"total_days": total_days,
"profit_days": profit_days,
"loss_days": loss_days,
"capital": self.capital,
"end_balance": end_balance,
"max_drawdown": max_drawdown,
"max_ddpercent": max_ddpercent,
"max_drawdown_duration": max_drawdown_duration,
"total_net_pnl": total_net_pnl,
"daily_net_pnl": daily_net_pnl,
"total_commission": total_commission,
"daily_commission": daily_commission,
"total_slippage": total_slippage,
"daily_slippage": daily_slippage,
"total_turnover": total_turnover,
"daily_turnover": daily_turnover,
"total_trade_count": total_trade_count,
"daily_trade_count": daily_trade_count,
"total_return": total_return,
"annual_return": annual_return,
"daily_return": daily_return,
"return_std": return_std,
"sharpe_ratio": sharpe_ratio,
"return_drawdown_ratio": return_drawdown_ratio,
}
# Filter potential error infinite value
for key, value in statistics.items():
if value in (np.inf, -np.inf):
value = 0
statistics[key] = np.nan_to_num(value)
self.output("策略统计指标计算完成")
return statistics
def my_create_shared_memory_count(self):
self.zw = None
optimization_count_np_array = np.array([0], dtype=np.int)
# 运行次数
optimization_count_shm = SharedMemory(name=None, create=True, size=optimization_count_np_array.nbytes)
optimization_count_shm_array = np.ndarray(optimization_count_np_array.shape,
dtype=optimization_count_np_array.dtype,
buffer=optimization_count_shm.buf)
np.copyto(optimization_count_shm_array, optimization_count_np_array)
return {"shm": optimization_count_shm, 'shape': optimization_count_np_array.shape,
"dtype": optimization_count_np_array.dtype}
def my_create_shared_memory(self):
# 读取数据
self.output("开始读取共享内存所需数据")
self.load_data()
self.check_data(self.history_data, self.dts) # 校验数据的准确性
self.output(f"主进程{self.current_pid}:history_data字典数量:{len(self.history_data.keys())},dts数量:{len(self.dts)}")
self.output("*" * 30)
# ----------------------这一步非常重要----------------------
self.dts: List = list(self.dts)
self.dts.sort()
# def create_shm_dict():
dts = self.dts
symbol_list, exchange_list, datetime_list, interval_list = [], [], [], []
volume_list, turnover_list, open_interest_list, open_price_list = [], [], [], []
high_price_list, low_price_list, close_price_list, gateway_name_list = [], [], [], []
for dt in dts:
for vt_symbol in self.vt_symbols:
bar: BarData = self.history_data.get((dt, vt_symbol), None)
symbol_list.append(bar.symbol)
exchange_list.append(bar.exchange.value)
datetime_list.append(bar.datetime.timestamp())
interval_list.append(bar.interval.value)
volume_list.append(bar.volume)
turnover_list.append(bar.turnover)
open_interest_list.append(bar.open_interest)
open_price_list.append(bar.open_price)
high_price_list.append(bar.high_price)
low_price_list.append(bar.low_price)
close_price_list.append(bar.close_price)
gateway_name_list.append(bar.gateway_name)
# -------------统统转-------------
# -------------统统转-------------
symbol_np_array = np.array(symbol_list, dtype=np.str)
exchange_np_array = np.array(exchange_list, dtype=np.str)
datetime_np_array = np.array(datetime_list, dtype=np.float64)
interval_np_array = np.array(interval_list, dtype=np.str)
volume_np_array = np.array(volume_list, dtype=np.float64)
turnover_np_array = np.array(turnover_list, dtype=np.float64)
open_interest_np_array = np.array(open_interest_list, dtype=np.float64)
open_price_np_array = np.array(open_price_list, dtype=np.float64)
high_price_np_array = np.array(high_price_list, dtype=np.float64)
low_price_np_array = np.array(low_price_list, dtype=np.float64)
close_price_np_array = np.array(close_price_list, dtype=np.float64)
gateway_name_np_array = np.array(gateway_name_list, dtype=np.str)
if True:
symbol_shm = SharedMemory(name=None, create=True, size=symbol_np_array.nbytes)
symbol_shm_array = np.ndarray(symbol_np_array.shape, dtype=symbol_np_array.dtype, buffer=symbol_shm.buf)
np.copyto(symbol_shm_array, symbol_np_array)
exchange_shm = SharedMemory(name=None, create=True, size=exchange_np_array.nbytes)
exchange_shm_array = np.ndarray(exchange_np_array.shape, dtype=exchange_np_array.dtype,
buffer=exchange_shm.buf)
np.copyto(exchange_shm_array, exchange_np_array)
datetime_shm = SharedMemory(name=None, create=True, size=datetime_np_array.nbytes)
datetime_shm_array = np.ndarray(datetime_np_array.shape, dtype=datetime_np_array.dtype,
buffer=datetime_shm.buf)
np.copyto(datetime_shm_array, datetime_np_array)
interval_shm = SharedMemory(name=None, create=True, size=interval_np_array.nbytes)
interval_shm_array = np.ndarray(interval_np_array.shape, dtype=interval_np_array.dtype,
buffer=interval_shm.buf)
np.copyto(interval_shm_array, interval_np_array)
volume_shm = SharedMemory(name=None, create=True, size=volume_np_array.nbytes)
volume_shm_array = np.ndarray(volume_np_array.shape, dtype=volume_np_array.dtype,
buffer=volume_shm.buf)
np.copyto(volume_shm_array, volume_np_array)
turnover_shm = SharedMemory(name=None, create=True, size=turnover_np_array.nbytes)
turnover_shm_array = np.ndarray(turnover_np_array.shape, dtype=turnover_np_array.dtype,
buffer=turnover_shm.buf)
np.copyto(turnover_shm_array, turnover_np_array)
open_interest_shm = SharedMemory(name=None, create=True, size=open_interest_np_array.nbytes)
open_interest_shm_array = np.ndarray(open_interest_np_array.shape, dtype=open_interest_np_array.dtype,
buffer=open_interest_shm.buf)
np.copyto(open_interest_shm_array, open_interest_np_array)
open_price_shm = SharedMemory(name=None, create=True, size=open_price_np_array.nbytes)
open_price_shm_array = np.ndarray(open_price_np_array.shape, dtype=open_price_np_array.dtype,
buffer=open_price_shm.buf)
np.copyto(open_price_shm_array, open_price_np_array)
high_price_shm = SharedMemory(name=None, create=True, size=high_price_np_array.nbytes)
high_price_shm_array = np.ndarray(high_price_np_array.shape, dtype=high_price_np_array.dtype,
buffer=high_price_shm.buf)
np.copyto(high_price_shm_array, high_price_np_array)
low_price_shm = SharedMemory(name=None, create=True, size=low_price_np_array.nbytes)
low_price_shm_array = np.ndarray(low_price_np_array.shape, dtype=low_price_np_array.dtype,
buffer=low_price_shm.buf)
np.copyto(low_price_shm_array, low_price_np_array)
close_price_shm = SharedMemory(name=None, create=True, size=close_price_np_array.nbytes)
close_price_shm_array = np.ndarray(close_price_np_array.shape, dtype=close_price_np_array.dtype,
buffer=close_price_shm.buf)
np.copyto(close_price_shm_array, close_price_np_array)
gateway_name_shm = SharedMemory(name=None, create=True, size=gateway_name_np_array.nbytes)
gateway_name_shm_array = np.ndarray(gateway_name_np_array.shape, dtype=gateway_name_np_array.dtype,
buffer=gateway_name_shm.buf)
np.copyto(gateway_name_shm_array, gateway_name_np_array)
shm_dict = {
"symbol": {"shm": symbol_shm, 'shape': symbol_np_array.shape, "dtype": symbol_np_array.dtype},
"exchange": {"shm": exchange_shm, 'shape': exchange_np_array.shape, "dtype": exchange_np_array.dtype},
"datetime": {"shm": datetime_shm, 'shape': datetime_np_array.shape, "dtype": datetime_np_array.dtype},
"interval": {"shm": interval_shm, 'shape': interval_np_array.shape, "dtype": interval_np_array.dtype},
"volume": {"shm": volume_shm, 'shape': volume_np_array.shape, "dtype": volume_np_array.dtype},
"turnover": {"shm": turnover_shm, 'shape': turnover_np_array.shape, "dtype": turnover_np_array.dtype},
"open_interest": {"shm": open_interest_shm, 'shape': open_interest_np_array.shape,
"dtype": open_interest_np_array.dtype},
"open_price": {"shm": open_price_shm, 'shape': open_price_np_array.shape,
"dtype": open_price_np_array.dtype},
"high_price": {"shm": high_price_shm, 'shape': high_price_np_array.shape,
"dtype": high_price_np_array.dtype},
"low_price": {"shm": low_price_shm, 'shape': low_price_np_array.shape,
"dtype": low_price_np_array.dtype},
"close_price": {"shm": close_price_shm, 'shape': close_price_np_array.shape,
"dtype": close_price_np_array.dtype},
"gateway_name": {"shm": gateway_name_shm, 'shape': gateway_name_np_array.shape,
"dtype": gateway_name_np_array.dtype},
}
self.output("开始创建共享内存:")
self.output(
f"主进程{self.current_pid}:创建共享内存成功")
self.output("*" * 30)
del self.history_data
del self.dts
return shm_dict
def my_run_backtesting(self, shm_dict: dict, shm_count_dict: dict, optimization_count: int,
tag_str: str = 'tag_str'):
""""""
if shm_dict is None:
# 如果不进行共享内存,则采用原来的run_backtesting
self.load_data()
self.run_backtesting()
# 运行总数
tmp_shm = shm_count_dict.get('shm')
tmp_shape = shm_count_dict.get('shape')
tmp_dtype = shm_count_dict.get('dtype')
optimization_count_shm = SharedMemory(name=tmp_shm.name)
optimization_count_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype,
buffer=optimization_count_shm.buf)
optimization_count_shm_array[0] += 1
self.output(
f"【{tag_str}】子进程{self.current_pid}:采用默认进程内存方式回测。总:{optimization_count},当前:{optimization_count_shm_array[0]}")
self.output(f'采用参数组合:{self.strategy.get_parameters()}')
self.output("*" * 30)
return
# --------------------------采用共享内存的方式--------------------------
self.strategy.on_init()
symbol_shm, exchange_shm, datetime_shm, interval_shm = None, None, None, None
volume_shm, turnover_shm, open_interest_shm, open_price_shm = None, None, None, None
high_price_shm, low_price_shm, close_price_shm, gateway_name_shm = None, None, None, None
symbol_shm_array, exchange_shm_array, datetime_shm_array, interval_shm_array = None, None, None, None
volume_shm_array, turnover_shm_array, open_interest_shm_array, open_price_shm_array = None, None, None, None
high_price_shm_array, low_price_shm_array, close_price_shm_array, gateway_name_shm_array = None, None, None, None
for k, d in shm_dict.items():
tmp_shm = d.get('shm')
tmp_shape = d.get('shape')
tmp_dtype = d.get('dtype')
if True:
if k == 'symbol':
symbol_shm = SharedMemory(name=tmp_shm.name)
symbol_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=symbol_shm.buf)
elif k == 'exchange':
exchange_shm = SharedMemory(name=tmp_shm.name)
exchange_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=exchange_shm.buf)
elif k == 'datetime':
datetime_shm = SharedMemory(name=tmp_shm.name)
datetime_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=datetime_shm.buf)
elif k == 'interval':
interval_shm = SharedMemory(name=tmp_shm.name)
interval_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=interval_shm.buf)
elif k == 'volume':
volume_shm = SharedMemory(name=tmp_shm.name)
volume_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=volume_shm.buf)
elif k == 'turnover':
turnover_shm = SharedMemory(name=tmp_shm.name)
turnover_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=turnover_shm.buf)
elif k == 'open_interest':
open_interest_shm = SharedMemory(name=tmp_shm.name)
open_interest_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=open_interest_shm.buf)
elif k == 'open_price':
open_price_shm = SharedMemory(name=tmp_shm.name)
open_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=open_price_shm.buf)
elif k == 'high_price':
high_price_shm = SharedMemory(name=tmp_shm.name)
high_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=high_price_shm.buf)
elif k == 'low_price':
low_price_shm = SharedMemory(name=tmp_shm.name)
low_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=low_price_shm.buf)
elif k == 'close_price':
close_price_shm = SharedMemory(name=tmp_shm.name)
close_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=close_price_shm.buf)
elif k == 'gateway_name':
gateway_name_shm = SharedMemory(name=tmp_shm.name)
gateway_name_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=gateway_name_shm.buf)
# 运行总数
tmp_shm = shm_count_dict.get('shm')
tmp_shape = shm_count_dict.get('shape')
tmp_dtype = shm_count_dict.get('dtype')
optimization_count_shm = SharedMemory(name=tmp_shm.name)
optimization_count_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=optimization_count_shm.buf)
optimization_count_shm_array[0] += 1
self.output("*" * 30)
self.output(
f"【{tag_str}】子进程{self.current_pid}:取出共享内存进行回测。总:{optimization_count},当前:{optimization_count_shm_array[0]}")
# -------------遍历数据-------------
bars = []
bars_count = 0
vt_c = len(self.vt_symbols)
day_count = 0
exchange_members = Exchange.__members__
interval_members = Interval.__members__
for symbol, exchange_value, datetime_float, interval_value, volume, turnover, open_interest, open_price, high_price, low_price, close_price, gateway_name in zip(
symbol_shm_array, exchange_shm_array, datetime_shm_array, interval_shm_array,
volume_shm_array, turnover_shm_array, open_interest_shm_array, open_price_shm_array,
high_price_shm_array, low_price_shm_array, close_price_shm_array, gateway_name_shm_array):
dt = datetime.fromtimestamp(datetime_float - 28800)
bar: BarData = BarData(
symbol=symbol,
exchange=exchange_members.get(Exchange(exchange_value).name),
datetime=dt,
interval=interval_members.get(Interval(interval_value).name),
volume=volume,
turnover=turnover,
open_interest=open_interest,
open_price=open_price,
high_price=high_price,
low_price=low_price,
close_price=close_price,
gateway_name=gateway_name,
)
bars.append(bar)
bars_count += 1
if bars_count < vt_c:
continue
bars_count = 0
# 以下判断是否初始化结束
if not self.strategy.inited:
if self.datetime and dt.day != self.datetime.day:
day_count += 1
if day_count >= self.days:
self.strategy.inited = True
self.output("策略初始化完成")
self.strategy.on_start()
self.strategy.trading = True
self.output("开始回放历史数据")
try:
self.my_new_bars(dt, bars)
except Exception:
self.output("触发异常,回测终止")
self.output(traceback.format_exc())
return
bars = []
self.output("历史数据回放结束")
def my_new_bars(self, dt: datetime, bars_list: List[BarData]) -> None:
self.datetime = dt
bars: Dict[str, BarData] = {}
bars_ix = 0
for vt_symbol in self.vt_symbols:
bar: BarData = bars_list[bars_ix]
if bar:
self.bars[vt_symbol] = bar
bars[vt_symbol] = bar
else:
old_bar = self.bars[vt_symbol]
bar = BarData(
symbol=old_bar.symbol,
exchange=old_bar.exchange,
datetime=dt,
open_price=old_bar.close_price,
high_price=old_bar.close_price,
low_price=old_bar.close_price,
close_price=old_bar.close_price,
gateway_name=old_bar.gateway_name
)
self.bars[vt_symbol] = bar
bars_ix += 1
self.cross_limit_order()
self.update_daily_close(self.bars, dt)
self.update_dynamic_capital(dt, self.bars)
self.strategy.on_bars(bars)
def my_run_ga_optimization(self,
optimization_setting: OptimizationSetting,
max_workers: int,
output=True,
population_size: int = 200,
ngen_size: int = 60,
is_shared_memory: bool = False,
tag_str: str = '',
):
""""""
if not check_optimization_setting(optimization_setting):
return
shm_dict = None
if is_shared_memory:
shm_dict = self.my_create_shared_memory()
shm_count_dict = self.my_create_shared_memory_count()
evaluate_func: callable = my_wrap_evaluate(self,
optimization_setting.target_name,
shm_dict,
shm_count_dict,
len(optimization_setting.generate_settings()),
tag_str=tag_str)
results = my_run_ga_optimization(
evaluate_func,
optimization_setting,
get_target_value,
max_workers,
population_size=population_size,
ngen_size=ngen_size,
output=self.output
)
if output:
for result in results:
msg: str = f"参数:{result[0]}, 目标:{result[1]}"
self.output(msg)
# close共享内存
for key, dict_shm in shm_dict.items():
dict_shm.get('shm').close()
dict_shm.get('shm').unlink()
return results
@dataclass
class VtSymbolDynamicPnl(BaseData):
"""
Trade data contains information of a fill of an order. One order
can have several trade fills.
"""
vt_symbol: str
size: float # 比率
rate: float # 手续费率
margin_ratio: float # 保证金比例
slippage: float # 滑点
# 下面这些都是按照逐笔对冲的逻辑的变量
dynamic_direction: Direction = Direction.LONG
dynamic_open_pos_commission: float = 0 # 只涉及加仓的时候的手续费累积,平仓的时候不要加进去
dynamic_volume: float = 0 # 该品种的动态持仓
dynamic_occupy_capital: float = 0 # 当前占用资金
dynamic_pnl: float = 0 # 该品种的动态盈亏累计
dynamic_cost_price: float = 0 # 该品种每次加仓后,得出的平均成本价
total_pnl: float = 0 # 该品种的所有盈亏累计
total_trade_times: int = 0
datetime: datetime = None
def __post_init__(self):
""""""
self.symbol, self.exchange = extract_vt_symbol(self.vt_symbol)
def load_bar_data_without_lru_cache(
vt_symbol: str,
interval: Interval,
start: datetime,
end: datetime,
):
""""""
symbol, exchange = extract_vt_symbol(vt_symbol)
return my_database_manager.load_bar_data(
symbol, exchange, interval, start, end, collection_name=vt_symbol
)
def my_wrap_evaluate(engine: MyBacktestingEngine, target_name: str,
shm_dict: dict, shm_count_dict: dict, optimization_count: int, tag_str: str = '',
) -> callable:
"""
Wrap evaluate function with given setting from backtesting engine.
"""
func: callable = partial(
my_evaluate,
shm_dict,
shm_count_dict,
optimization_count,
tag_str,
engine.fixed_param_dict,
target_name,
engine.strategy_class,
engine.vt_symbols,
engine.interval,
engine.start,
engine.rates,
engine.slippages,
engine.sizes,
engine.priceticks,
engine.margin_ratios,
engine.capital,
engine.end
)
return func
def my_evaluate(
shm_dict: dict,
shm_count_dict: dict,
optimization_count: int,
tag_str: str,
fixed_param_dict: dict, # 固定参数
target_name: str,
strategy_class: StrategyTemplate,
vt_symbols: List[str],
interval: Interval,
start: datetime,
rates: Dict[str, float],
slippages: Dict[str, float],
sizes: Dict[str, float],
priceticks: Dict[str, float],
margin_ratios: Dict[str, float] or int or float,
capital: int,
end: datetime,
setting: dict
):
"""
Function for running in multiprocessing. Pool
"""
engine = MyBacktestingEngine()
engine.my_set_parameters(
vt_symbols=vt_symbols,
interval=interval,
start=start,
rates=rates,
slippages=slippages,
sizes=sizes,
priceticks=priceticks,
margin_ratios=margin_ratios,
capital=capital,
end=end,
)
setting.update(fixed_param_dict)
# noinspection PyTypeChecker
engine.add_strategy(strategy_class, setting)
engine.my_run_backtesting(shm_dict, shm_count_dict, optimization_count, tag_str=tag_str)
engine.calculate_result()
statistics = engine.calculate_statistics(output=True)
# statistics = engine.my_stats.statistics
target_value = statistics[target_name]
engine.clear_data()
return str(setting), target_value, statistics