VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 58
声望: 0

基于python3.8以上版本,我个人是最新的vnpy3.6.0。
首先该功能的实现可以节省多进程回测时大部分的内存消耗,原有1个进程准备1份K线数据,现在可以16个进程准备1分K线数据,占用率大大降低。
感谢Q群上的“广西_漫步“提供的代码,我在此基础上整理了一下实现的思路与难点,并提供部分代码,为后续有必要实现这个功能的小伙伴提供参考。代码改的部分非常多,所以不建议小白去尝试,除非是确实有这个需求非要实现不可。
以下是实现时要解决的问题:
1,官方目前应该还没支持多标的遗传算法(如果有错可以指出),所以要个人实现多进程遗传算法参数优化。(多看下CTA单合约部分是怎么实现的,再改到多标的)
2,K线数据怎么通过ShareMemory共享到各进程里面。在这里,我花费了很多时间去研究,最后还是通过参考“广西_漫步”的代码,才悟出了实现方式。简单的说,就是将原有的历史数据拆分成12个ShareMemory,拆分前先按时间排序,然后再按次序拆分。backtesting读取的时候,直接去遍历整个history数据,由于已经按时间排序好了,所以就符合原有backtesting的数据按时间推送的逻辑。

代码如下,混杂了很多我自己的代码,如果有确实要实现这个功能的小伙伴可以爬下这个屎山,按上面这两个处理难题将其中精华的部分抽取出来,就可以实现自己的多进程多标的遗传算法优化功能了。难度还是有的,但是这个路子是可以走得通的,值得花时间研究研究。
第一份代码是my_backtesting.py,对应的改动原版是官方的vnpy_portfoliostrategy.backtesting文件

# coding=utf-8
from datetime import date, datetime, timedelta
from typing import Dict, List, Set, Tuple
import traceback
from functools import partial
import os
from multiprocessing.shared_memory import SharedMemory

import numpy as np
import seaborn as sns
from pandas import DataFrame

from empyrical import cagr, annual_volatility, downside_risk
from dataclasses import dataclass
from vnpy.trader.constant import Direction, Offset, Interval, Status
from vnpy.trader.object import OrderData, TradeData, BarData, BaseData
from vnpy.trader.utility import round_to, extract_vt_symbol, generate_vt_symbol
from vnpy.trader.constant import Exchange
from vnpy_ctastrategy.backtesting import get_target_value
from vnpy_portfoliostrategy.backtesting import BacktestingEngine, PortfolioDailyResult, ContractDailyResult,\
    INTERVAL_DELTA_MAP
from vnpy_portfoliostrategy.template import StrategyTemplate
from vnpy_spreadtrading.backtesting import BacktestingMode
from my_vnpy.my_trader.my_optimize import my_run_ga_optimization, OptimizationSetting, check_optimization_setting
from my_vnpy.my_app.my_portfolio_strategy.my_template import MyStrategyTemplate
from my_vnpy.my_trader.my_database import my_database_manager
from FuturesResearch.Definite_value import FuturesSchema, IndexFuturesCode, ALL_FUTURES_CODE_LIST
from my_vnpy.my_app.my_portfolio_strategy.my_utility import set_ratios, UTC
from SaveData.SaveDateUtility import get_next_trade_date

# Set seaborn style
sns.set_style("whitegrid")


class MyBacktestingEngine(BacktestingEngine):
    """"""
    gateway_name = "BACKTESTING"
    def __init__(self):
        """"""
        super().__init__()

        self.mode = BacktestingMode.BAR
        self.inverse = False
        self.strategy: MyStrategyTemplate = None
        self.strategy_class = None

        self.trades_by_datetime = {}  # 2021-11-15用来计算每笔盈亏的魔改参数
        # 2021-11-15用来计算每笔盈亏的魔改参数

        self.dynamic_total_occupy_capital_on_minute = 0  # 这个根据每分钟的数据都改变
        self.dynamic_profit_and_loss = 0
        self.dynamic_commission = 0

        # 2022-11-22新加
        self.margin_ratios: Dict[str, float] = 0

        self.dynamic_total_capital = self.capital  # 这里新增一个动态仓位参数
        self.advanced_stop_dynamic_capital_min = 120000  # 这里设置,如果动态资金小于12万,就提前结束回测
        self.dynamic_total_occupy_capital = 0
        self.risk_free: float = 0.02

        self.vt_symbols_dynamic_pnl: Dict[str, VtSymbolDynamicPnl] = {}

        # 下面是为了共享内存多进程回测加入的参数
        self.optimization_count = 0
        self.zw = None
        self.my_stats = None
        self.fixed_param_dict = {}
        self.current_pid = os.getpid()

    def check_data(self, h_data, d_data):
        for vt_symbol in self.vt_symbols:
            for dt in self.dts:
                bar = self.history_data.get((dt, vt_symbol))
                if bar is None:
                    print(dt, vt_symbol)

        vt_s_len = len(self.vt_symbols)
        if len(d_data) * vt_s_len != len(h_data):
            raise ValueError(f"history的总数必须等于 vt_symbols的数量 乘以 dts,dts长度:{len(d_data)}")

    def new_bars(self, dt: datetime) -> None:
        """"""
        self.datetime = dt

        self.bars.clear()

        # 2022-01-18新加,预期加入动态的vt_symbols
        for vt_symbol in self.vt_symbols:
            bar = self.history_data.get((dt, vt_symbol), None)
            if bar:
                # print('这里是new_bar里面合成bars的bar:', bar.__dict__.items())
                self.bars[vt_symbol] = bar
            else:
                dt_str = dt.strftime("%Y-%m-%d %H:%M:%S")
                if vt_symbol == 'AU.SHFE':
                    if (dt.hour < 9 or
                            (dt.hour == 9 and dt.minute <= 30) or
                            dt.hour > 15 or
                            (dt.hour == 10 and 15 <= dt.minute <= 30) or
                            (dt.hour == 13 and dt.minute < 30)):

                        # self.output(f"数据缺失:{dt_str} {vt_symbol}")
                        return
                    elif dt.year == 2020 and dt.month == 7 and dt.day == 24:
                        return
                    elif dt.year == 2019 and dt.month == 5 and dt.day == 10:
                        return
                    elif dt.year == 2019 and dt.month == 11 and dt.day == 19:
                        return
                    elif dt.year == 2018 and dt.month == 11 and dt.day == 12:
                        return
                    elif dt.year == 2018 and dt.month == 5 and dt.day == 7:
                        return
                    elif dt.year == 2017 and dt.month == 11 and dt.day == 15:
                        return
                    elif dt.year == 2017 and dt.month == 5 and dt.day == 8:
                        return
                    elif dt.year == 2016 and dt.month == 11 and dt.day == 11:
                        return
                    elif dt.year == 2016 and dt.month == 5 and dt.day == 5:
                        return
                self.output(f"数据缺失:{dt_str} {vt_symbol}")
                return

        self.cross_limit_order()
        self.update_daily_close(self.bars, dt)  # 这里调换顺序,先算当日收益,再把bars输入on_bars计算。
        self.update_dynamic_capital(dt, self.bars)  # 更新剩余资金

        self.strategy.on_bars(self.bars)

    def cross_limit_order(self) -> None:
        """
        Cross limit order with last bar/tick data.
        """
        for order in list(self.active_limit_orders.values()):
            bar = self.bars[order.vt_symbol]

            long_cross_price = bar.low_price
            short_cross_price = bar.high_price
            long_best_price = bar.open_price
            short_best_price = bar.open_price

            # Push order update with status "not traded" (pending).
            if order.status == Status.SUBMITTING:
                order.status = Status.NOTTRADED
                self.strategy.update_order(order)

            # Check whether limit orders can be filled.
            long_cross = (
                order.direction == Direction.LONG
                and order.price >= long_cross_price
                and long_cross_price > 0
            )

            short_cross = (
                order.direction == Direction.SHORT
                and order.price <= short_cross_price
                and short_cross_price > 0
            )

            if not long_cross and not short_cross:
                continue

            # Push order update with status "all traded" (filled).
            order.traded = order.volume
            order.status = Status.ALLTRADED
            self.strategy.update_order(order)

            self.active_limit_orders.pop(order.vt_orderid)

            # Push trade update
            self.trade_count += 1

            if long_cross:
                trade_price = min(order.price, long_best_price)
            else:
                trade_price = max(order.price, short_best_price)

            trade = TradeData(
                symbol=order.symbol,
                exchange=order.exchange,
                orderid=order.orderid,
                tradeid=str(self.trade_count),
                direction=order.direction,
                offset=order.offset,
                price=trade_price,
                volume=order.volume,
                datetime=self.datetime,
                gateway_name=self.gateway_name,
            )

            self.strategy.update_trade(trade)
            self.trades[trade.vt_tradeid] = trade

            # 这里经过魔改,将trade按datetime为key加入字典
            if trade.datetime in self.trades_by_datetime.keys():
                self.trades_by_datetime[trade.datetime].append(trade)
            else:
                self.trades_by_datetime[trade.datetime] = [trade]

    def update_dynamic_capital(self, dt: datetime, bars) -> None:
        # self.output("开始计算逐日盯市实时资金净值")
        if not self.vt_symbols_dynamic_pnl:
            # 如果vt_symbols_dynamic_pnl是空字典,就初始化,生成每个symbol对应的变量容器
            for vt_symbol in self.vt_symbols:
                vt_symbol_dynamic_pnl = VtSymbolDynamicPnl(
                    vt_symbol=vt_symbol,
                    rate=self.rates[vt_symbol],
                    slippage=self.slippages[vt_symbol],
                    size=self.sizes[vt_symbol],
                    margin_ratio=self.margin_ratios[vt_symbol],
                    gateway_name=self.gateway_name
                )
                self.vt_symbols_dynamic_pnl[vt_symbol] = vt_symbol_dynamic_pnl

        # 归纳每天持仓的市值,计算每天的每个品种累计未平仓的盈亏
        if not self.trades_by_datetime:
            self.strategy.update_capital(self.dynamic_total_capital, self.dynamic_total_occupy_capital)
            return

        if dt in self.trades_by_datetime:
            for trade in self.trades_by_datetime[dt]:
                # 将每一笔交易计算动态盈亏及其保证金持仓
                self.calculate_symbol_dynamic_capital(trade)

        # 2022-11-22改动,将所有持仓手续费汇总再加进去
        all_vt_symbol_dynamic_commission = 0
        all_vt_symbol_submit_dynamic_total_occupy_capital = 0
        _all_symbol_dynamic_pnl = 0
        all_total_pnl = 0
        _dynamic_total_pnl = 0
        for vt_symbol in self.vt_symbols_dynamic_pnl:
            all_vt_symbol_dynamic_commission += self.vt_symbols_dynamic_pnl[
                vt_symbol].dynamic_open_pos_commission
            all_total_pnl += self.vt_symbols_dynamic_pnl[vt_symbol].total_pnl
            dynamic_volume = self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_volume
            if dynamic_volume != 0:
                now_close_price = bars[vt_symbol].close_price
                dynamic_cost_price = self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_cost_price
                size = self.vt_symbols_dynamic_pnl[vt_symbol].size
                _dynamic_pnl = (now_close_price - dynamic_cost_price) * dynamic_volume
                _dynamic_total_pnl += _dynamic_pnl
                single_submit_dynamic_total_occupy_capital = now_close_price * size * self.margin_ratios[vt_symbol] * \
                                                             dynamic_volume
                all_vt_symbol_submit_dynamic_total_occupy_capital += single_submit_dynamic_total_occupy_capital
                sing_commission = now_close_price * dynamic_volume * size * self.vt_symbols_dynamic_pnl[vt_symbol].rate
                if self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_direction == Direction.LONG:
                    _pnl = (now_close_price - dynamic_cost_price) * dynamic_volume * size + \
                           self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_pnl - sing_commission
                else:
                    _pnl = (dynamic_cost_price - now_close_price) * dynamic_volume * size + \
                           self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_pnl - sing_commission
                _all_symbol_dynamic_pnl += _pnl

        self.dynamic_total_occupy_capital = all_vt_symbol_submit_dynamic_total_occupy_capital
        self.dynamic_total_capital = self.capital + all_total_pnl + _dynamic_total_pnl + _all_symbol_dynamic_pnl

        self.strategy.update_capital(self.dynamic_total_capital, all_vt_symbol_submit_dynamic_total_occupy_capital)

    def calculate_symbol_dynamic_capital(self, trade: TradeData):
        # 首先考虑的是原本这个symbol没有仓位的情况下
        trade_vt_symbol = generate_vt_symbol(trade.symbol, trade.exchange)
        _vt_symbol_dynamic_pnl: VtSymbolDynamicPnl = self.vt_symbols_dynamic_pnl[trade_vt_symbol]
        _rate = _vt_symbol_dynamic_pnl.rate
        _size = _vt_symbol_dynamic_pnl.size
        _margin_ratio = _vt_symbol_dynamic_pnl.margin_ratio

        # 这里还要区分多空交易,从而导致的滑点加减方向不同
        if trade.direction == Direction.LONG:
            _actual_trade_price = trade.price + _vt_symbol_dynamic_pnl.slippage
        else:
            _actual_trade_price = trade.price - _vt_symbol_dynamic_pnl.slippage

        _single_turnover = _actual_trade_price * _size * trade.volume
        _single_commission = _single_turnover * _rate

        # 2023-03-06新加考虑空方向
        if _vt_symbol_dynamic_pnl.dynamic_volume == 0:
            # 新开仓
            _vt_symbol_dynamic_pnl.dynamic_direction = trade.direction
            _vt_symbol_dynamic_pnl.dynamic_volume = trade.volume
            _vt_symbol_dynamic_pnl.dynamic_open_pos_commission = _single_commission
            _vt_symbol_dynamic_pnl.dynamic_occupy_capital = _single_turnover * _margin_ratio + _single_commission
            _vt_symbol_dynamic_pnl.dynamic_cost_price = _actual_trade_price

        elif _vt_symbol_dynamic_pnl.dynamic_volume != 0 and trade.offset == Offset.OPEN:
            # 原有多仓加仓
            _vt_symbol_dynamic_pnl.dynamic_open_pos_commission += _single_commission

            _vt_symbol_dynamic_pnl.dynamic_occupy_capital += _single_turnover * _margin_ratio + _single_commission
            _vt_symbol_dynamic_pnl.dynamic_cost_price = (
                _actual_trade_price * trade.volume +
                _vt_symbol_dynamic_pnl.dynamic_cost_price * _vt_symbol_dynamic_pnl.dynamic_volume) / \
                (trade.volume + _vt_symbol_dynamic_pnl.dynamic_volume)
            _vt_symbol_dynamic_pnl.dynamic_volume += trade.volume
        elif trade.offset == Offset.CLOSE and trade.volume == _vt_symbol_dynamic_pnl.dynamic_volume:
            # 这种是全平了的情况
            if trade.direction == Direction.SHORT:
                # 多头平仓,下的指令单为空单
                trade_pnl_without_commission = (_actual_trade_price - _vt_symbol_dynamic_pnl.dynamic_cost_price) * _size * \
                                               trade.volume
            else:
                trade_pnl_without_commission = (_vt_symbol_dynamic_pnl.dynamic_cost_price - _actual_trade_price) * _size * \
                                               trade.volume

            _vt_symbol_dynamic_pnl.dynamic_open_pos_commission += _single_commission
            trade_pnl = trade_pnl_without_commission - _vt_symbol_dynamic_pnl.dynamic_open_pos_commission
            _vt_symbol_dynamic_pnl.dynamic_pnl += trade_pnl
            _vt_symbol_dynamic_pnl.total_pnl += _vt_symbol_dynamic_pnl.dynamic_pnl
            # 计算完了盈亏情况,开始将动态数据清零
            _vt_symbol_dynamic_pnl.dynamic_direction = None
            _vt_symbol_dynamic_pnl.dynamic_open_pos_commission = 0
            _vt_symbol_dynamic_pnl.dynamic_volume = 0
            _vt_symbol_dynamic_pnl.dynamic_occupy_capital = 0
            _vt_symbol_dynamic_pnl.dynamic_cost_price = 0
            _vt_symbol_dynamic_pnl.dynamic_pnl = 0  # 上面已经将盈亏加入总盈亏,所以这里可以清零了

        elif trade.volume != _vt_symbol_dynamic_pnl.dynamic_volume and trade.offset == Offset.CLOSE:
            # 这里是半平仓的情况
            _vt_symbol_dynamic_pnl.dynamic_volume -= trade.volume
            # 根据现在动态交易额,反推该交易额实际会占用多少资金
            _vt_symbol_dynamic_pnl.dynamic_occupy_capital -= _single_turnover * _margin_ratio + \
                                                             _vt_symbol_dynamic_pnl.dynamic_open_pos_commission + \
                                                             _single_commission

            # 计算一次减仓的盈亏
            if trade.direction == Direction.SHORT:
                # 多头平仓,下的指令单为空单
                trade_pnl = (_actual_trade_price - _vt_symbol_dynamic_pnl.dynamic_cost_price) * _size * trade.volume - \
                            _single_commission
            else:
                trade_pnl = (_vt_symbol_dynamic_pnl.dynamic_cost_price - _actual_trade_price) * _size * trade.volume - \
                            _single_commission

            _vt_symbol_dynamic_pnl.dynamic_pnl += trade_pnl

        _vt_symbol_dynamic_pnl.total_trade_times += 1

    def my_set_parameters(
        self,
        vt_symbols: List[str],
        interval: Interval,
        start: datetime,
        rates: Dict[str, float] or int or float,
        slippages: Dict[str, float] or int or float,
        sizes: Dict[str, float] or int or float,
        priceticks: Dict[str, float] or int or float,
        margin_ratios: Dict[str, float] or int or float,
        capital: int = 0,
        end: datetime = None,
        risk_free: float = 0
    ) -> None:
        """"""
        if len(vt_symbols) == 2 and vt_symbols[0] in ALL_FUTURES_CODE_LIST:
            futures_code = vt_symbols[0]
            if futures_code == IndexFuturesCode.IC:
                self.index_symbol = '000905.SSE'
            elif futures_code == IndexFuturesCode.IF:
                self.index_symbol = '000300.SSE'
            elif futures_code == IndexFuturesCode.IH:
                self.index_symbol = '000016.SSE'
            self.vt_symbols.append(self.index_symbol)
            if vt_symbols[1] == 'backtesting':
                this_month_wfq_symbol = futures_code + 'L088.CFFEX'
                this_month_hfq_symbol = futures_code + 'L0889.CFFEX'
                next_month_wfq_symbol = futures_code + 'L188.CFFEX'
                next_month_hfq_symbol = futures_code + 'L1889.CFFEX'
                first_quarter_wfq_symbol = futures_code + 'L288.CFFEX'
                first_quarter_hfq_symbol = futures_code + 'L2889.CFFEX'
                second_quarter_wfq_symbol = futures_code + 'L388.CFFEX'
                second_quarter_hfq_symbol = futures_code + 'L3889.CFFEX'
                self.vt_symbols.append(this_month_wfq_symbol)
                self.vt_symbols.append(this_month_hfq_symbol)
                self.vt_symbols.append(next_month_wfq_symbol)
                self.vt_symbols.append(next_month_hfq_symbol)
                self.vt_symbols.append(first_quarter_wfq_symbol)
                self.vt_symbols.append(first_quarter_hfq_symbol)
                self.vt_symbols.append(second_quarter_wfq_symbol)
                self.vt_symbols.append(second_quarter_hfq_symbol)
        else:
            self.vt_symbols = vt_symbols

        self.interval = interval
        # 其实只要一个是,其他都是,设置统一的比例,手续费,滑点等
        if isinstance(rates, int) or isinstance(rates, float):
            self.rates = set_ratios(rates, self.vt_symbols)
            self.slippages = set_ratios(slippages, self.vt_symbols)
            self.sizes = set_ratios(sizes, self.vt_symbols)
            self.priceticks = set_ratios(priceticks, self.vt_symbols)
            self.margin_ratios = set_ratios(margin_ratios, self.vt_symbols)
        else:
            self.rates = rates
            self.slippages = slippages
            self.sizes = sizes
            self.priceticks = priceticks
            self.margin_ratios = margin_ratios

        self.start = start
        self.end = end
        self.capital = capital
        self.risk_free = risk_free

        self.dynamic_total_capital = capital  # 魔改加入,为了二次使用engine的时候不用初始化

    def load_data(self) -> None:
        """"""
        self.output("开始加载历史数据")

        if not self.end:
            self.end = datetime.now()

        if self.start >= self.end:
            self.output("起始日期必须小于结束日期")
            return

        # Clear previously loaded history data
        self.history_data.clear()
        self.dts.clear()

        # Load 30 days of data each time and allow for progress update
        progress_delta = timedelta(days=365)
        total_delta = self.end - self.start
        interval_delta = INTERVAL_DELTA_MAP[self.interval]

        times = 0

        for vt_symbol in self.vt_symbols:
            start = self.start
            end = self.start + progress_delta
            progress = 0

            data_count = 0
            while start < self.end:
                end = min(end, self.end)  # Make sure end time stays within set range

                data = load_bar_data_without_lru_cache(
                    vt_symbol,
                    self.interval,
                    start,
                    end
                )

                for bar in data:
                    self.dts.add(bar.datetime)
                    self.history_data[(bar.datetime, vt_symbol)] = bar
                    data_count += 1

                progress += progress_delta / total_delta
                progress = min(progress, 1)
                progress_bar = "#" * int(progress * 10)

                start = end + interval_delta
                end += (progress_delta + interval_delta)

            self.output(f"{vt_symbol}历史数据加载完成,数据量:{data_count}")

        self.output("所有历史数据加载完成")

    def run_backtesting(self) -> None:
        """"""
        self.strategy.on_init()

        # Generate sorted datetime list
        dts = list(self.dts)
        dts.sort()

        # Use the first [days] of history data for initializing strategy
        day_count = 0
        ix = 0

        for ix, dt in enumerate(dts):
            if self.datetime and dt.day != self.datetime.day:
                day_count += 1
                if day_count >= self.days:
                    break
            try:
                self.new_bars(dt)
            except Exception:
                self.output("触发异常,回测终止")
                self.output(traceback.format_exc())
                return

        self.strategy.inited = True
        self.output("策略初始化完成")

        self.strategy.on_start()
        self.strategy.trading = True
        self.output("开始回放历史数据")

        # Use the rest of history data for running backtesting

        end_day = 0
        for dt in dts[ix:]:
            try:
                if dt.day == end_day:
                    print("dt:", dt)
                    print("self.dynamic_total_capital:", self.dynamic_total_capital)
                    self.output("历史数据回放提前结束,资金清零")
                    return

                self.new_bars(dt)

                if self.dynamic_total_capital < self.advanced_stop_dynamic_capital_min:
                    if end_day == 0:
                        dt_str = dt.strftime('%Y%m%d')
                        end_day = int(get_next_trade_date(date=dt_str, add_day=1)[6:8])
            except Exception:
                self.output("触发异常,回测终止")
                self.output(traceback.format_exc())
                return

        self.output("历史数据回放结束")

    def clear_data(self) -> None:
        """
        Clear all data of last backtesting.
        """
        self.strategy = None
        self.bars = {}
        self.datetime = None

        self.limit_order_count = 0
        self.limit_orders.clear()
        self.active_limit_orders.clear()

        self.trade_count = 0
        self.trades.clear()

        self.logs.clear()
        self.daily_results.clear()
        self.daily_df = None

        # 下面加入自己的魔改的变量的清零
        self.trades_by_datetime = {}
        self.dts: Set[datetime] = set()
        self.dynamic_total_capital = 0
        self.dynamic_total_occupy_capital = 0
        self.days = 0
        self.vt_symbols_dynamic_pnl = {}
        self.history_data = {}

    def calculate_statistics(self, df: DataFrame = None, output=True) -> None:
        """"""
        self.output("开始计算策略统计指标")

        # Check DataFrame input exterior
        if df is None:
            df = self.daily_df

        # Check for init DataFrame
        if df is None:
            # Set all statistics to 0 if no trade.
            # 年化复合增长率

            cagr_info = 0  # 这里魔改过
            annual_volatility_info = 0
            annual_downside_risk = 0

            start_date = ""
            end_date = ""
            total_days = 0
            profit_days = 0
            loss_days = 0
            end_balance = 0
            max_drawdown = 0
            max_ddpercent = 0
            max_drawdown_duration = 0
            total_net_pnl = 0
            daily_net_pnl = 0
            total_commission = 0
            daily_commission = 0
            total_slippage = 0
            daily_slippage = 0
            total_turnover = 0
            daily_turnover = 0
            total_trade_count = 0
            daily_trade_count = 0
            total_return = 0
            annual_return = 0
            daily_return = 0
            return_std = 0
            sharpe_ratio = 0
            return_drawdown_ratio = 0

            max_ddpercent_start = 0  # 2022-08-09魔改
            max_drawdown_start = 0
        else:
            # Calculate balance related time series data
            df["balance"] = df["net_pnl"].cumsum() + self.capital
            try:
                df["return"] = np.log(df["balance"] / df["balance"].shift(1)).fillna(0)
            except:
                # 出现了log错误
                print('error1')
                print('............')
            df["highlevel"] = (
                df["balance"].rolling(
                    min_periods=1, window=len(df), center=False).max()
            )
            df["drawdown"] = df["balance"] - df["highlevel"]
            df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

            # Calculate statistics value
            start_date = df.index[0]
            end_date = df.index[-1]

            total_days = len(df)
            profit_days = len(df[df["net_pnl"] > 0])
            loss_days = len(df[df["net_pnl"] < 0])

            end_balance = df["balance"].iloc[-1]
            max_drawdown = df["drawdown"].min()
            max_ddpercent = df["ddpercent"].min()
            max_ddpercent_end = df["ddpercent"].idxmin()
            max_drawdown_end = df["drawdown"].idxmin()

            if isinstance(max_ddpercent_end, date):
                max_ddpercent_start = df["balance"][:max_ddpercent_end].idxmax()
                max_ddpercent_duration = (max_ddpercent_end - max_ddpercent_start).days
            else:
                max_ddpercent_start = False
                max_ddpercent_duration = 0

            if isinstance(max_drawdown_end, date):
                max_drawdown_start = df["balance"][:max_drawdown_end].idxmax()
                max_drawdown_duration = (max_drawdown_end - max_drawdown_start).days
            else:
                max_drawdown_start = False
                max_drawdown_duration = 0

            total_net_pnl = df["net_pnl"].sum()
            daily_net_pnl = total_net_pnl / total_days

            total_commission = df["commission"].sum()
            daily_commission = total_commission / total_days

            total_slippage = df["slippage"].sum()
            daily_slippage = total_slippage / total_days

            total_turnover = df["turnover"].sum()
            daily_turnover = total_turnover / total_days

            total_trade_count = df["trade_count"].sum()
            daily_trade_count = total_trade_count / total_days

            total_return = (end_balance / self.capital - 1) * 100
            annual_return = total_return / total_days * 240
            daily_return = df["return"].mean() * 100
            return_std = df["return"].std() * 100

            # 年化波动率
            annual_volatility_info = annual_volatility(df['return'])
            # 年化复合增长率
            cagr_info = cagr(df['return'])
            # 年化下行风险率
            annual_downside_risk = downside_risk(df['return'])


            if return_std:
                daily_risk_free = self.risk_free / np.sqrt(240)
                sharpe_ratio = (daily_return - daily_risk_free) / return_std * np.sqrt(240)
            else:
                sharpe_ratio = 0

            return_drawdown_ratio = -total_net_pnl / max_drawdown

        # Output
        if output:
            self.output("-" * 30)
            self.output(f"首个交易日:\t{start_date}")
            self.output(f"最后交易日:\t{end_date}")

            self.output(f"总交易日:\t{total_days}")
            self.output(f"盈利交易日:\t{profit_days}")
            self.output(f"亏损交易日:\t{loss_days}")

            self.output(f"起始资金:\t{self.capital:,.2f}")
            self.output(f"结束资金:\t{end_balance:,.2f}")

            self.output(f"总收益率:\t{total_return:,.2f}%")
            self.output(f"年化收益:\t{annual_return:,.2f}%")
            self.output(f"最大回撤: \t{max_drawdown:,.2f}")
            if max_ddpercent_start:
                self.output(f"百分比最大回撤: \t{max_ddpercent:,.2f}%, 最大回撤日期:\t{max_ddpercent_start}至{max_ddpercent_end}, 最长回撤天数: \t{max_ddpercent_duration}")
            if max_drawdown_start:
                self.output(f"最大回撤日期:\t{max_drawdown_start}至{max_drawdown_end}最长回撤天数: \t{max_drawdown_duration}")

            self.output(f"总盈亏:\t{total_net_pnl:,.2f}")
            self.output(f"总手续费:\t{total_commission:,.2f}")
            self.output(f"总滑点:\t{total_slippage:,.2f}")
            self.output(f"总成交金额:\t{total_turnover:,.2f}")
            self.output(f"总成交笔数:\t{total_trade_count}")

            self.output(f"日均盈亏:\t{daily_net_pnl:,.2f}")
            self.output(f"日均手续费:\t{daily_commission:,.2f}")
            self.output(f"日均滑点:\t{daily_slippage:,.2f}")
            self.output(f"日均成交金额:\t{daily_turnover:,.2f}")
            self.output(f"日均成交笔数:\t{daily_trade_count}")

            self.output(f"日均收益率:\t{daily_return:,.2f}%")
            self.output(f"收益标准差:\t{return_std:,.2f}%")
            self.output(f"年化波动率:\t{annual_volatility_info:,.3f}")
            self.output(f"年化复合增长率:\t{cagr_info:,.3f}")
            self.output(f"年化下行风险率:\t{annual_downside_risk:,.3f}")

            self.output(f"Sharpe Ratio:\t{sharpe_ratio:,.2f}")
            self.output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")

        statistics = {
            "start_date": start_date,
            "end_date": end_date,
            "total_days": total_days,
            "profit_days": profit_days,
            "loss_days": loss_days,
            "capital": self.capital,
            "end_balance": end_balance,
            "max_drawdown": max_drawdown,
            "max_ddpercent": max_ddpercent,
            "max_drawdown_duration": max_drawdown_duration,
            "total_net_pnl": total_net_pnl,
            "daily_net_pnl": daily_net_pnl,
            "total_commission": total_commission,
            "daily_commission": daily_commission,
            "total_slippage": total_slippage,
            "daily_slippage": daily_slippage,
            "total_turnover": total_turnover,
            "daily_turnover": daily_turnover,
            "total_trade_count": total_trade_count,
            "daily_trade_count": daily_trade_count,
            "total_return": total_return,
            "annual_return": annual_return,
            "daily_return": daily_return,
            "return_std": return_std,
            "sharpe_ratio": sharpe_ratio,
            "return_drawdown_ratio": return_drawdown_ratio,
        }

        # Filter potential error infinite value
        for key, value in statistics.items():
            if value in (np.inf, -np.inf):
                value = 0
            statistics[key] = np.nan_to_num(value)

        self.output("策略统计指标计算完成")
        return statistics

    def my_create_shared_memory_count(self):
        self.zw = None
        optimization_count_np_array = np.array([0], dtype=np.int)
        # 运行次数
        optimization_count_shm = SharedMemory(name=None, create=True, size=optimization_count_np_array.nbytes)
        optimization_count_shm_array = np.ndarray(optimization_count_np_array.shape,
                                                  dtype=optimization_count_np_array.dtype,
                                                  buffer=optimization_count_shm.buf)
        np.copyto(optimization_count_shm_array, optimization_count_np_array)

        return {"shm": optimization_count_shm, 'shape': optimization_count_np_array.shape,
                "dtype": optimization_count_np_array.dtype}

    def my_create_shared_memory(self):
        # 读取数据
        self.output("开始读取共享内存所需数据")
        self.load_data()
        self.check_data(self.history_data, self.dts)  # 校验数据的准确性

        self.output(f"主进程{self.current_pid}:history_data字典数量:{len(self.history_data.keys())},dts数量:{len(self.dts)}")
        self.output("*" * 30)

        # ----------------------这一步非常重要----------------------
        self.dts: List = list(self.dts)
        self.dts.sort()

        # def create_shm_dict():
        dts = self.dts
        symbol_list, exchange_list, datetime_list, interval_list = [], [], [], []
        volume_list, turnover_list, open_interest_list, open_price_list = [], [], [], []
        high_price_list, low_price_list, close_price_list, gateway_name_list = [], [], [], []

        for dt in dts:
            for vt_symbol in self.vt_symbols:
                bar: BarData = self.history_data.get((dt, vt_symbol), None)
                symbol_list.append(bar.symbol)
                exchange_list.append(bar.exchange.value)
                datetime_list.append(bar.datetime.timestamp())
                interval_list.append(bar.interval.value)
                volume_list.append(bar.volume)
                turnover_list.append(bar.turnover)
                open_interest_list.append(bar.open_interest)
                open_price_list.append(bar.open_price)
                high_price_list.append(bar.high_price)
                low_price_list.append(bar.low_price)
                close_price_list.append(bar.close_price)
                gateway_name_list.append(bar.gateway_name)

        # -------------统统转-------------
        # -------------统统转-------------
        symbol_np_array = np.array(symbol_list, dtype=np.str)
        exchange_np_array = np.array(exchange_list, dtype=np.str)
        datetime_np_array = np.array(datetime_list, dtype=np.float64)
        interval_np_array = np.array(interval_list, dtype=np.str)
        volume_np_array = np.array(volume_list, dtype=np.float64)
        turnover_np_array = np.array(turnover_list, dtype=np.float64)
        open_interest_np_array = np.array(open_interest_list, dtype=np.float64)
        open_price_np_array = np.array(open_price_list, dtype=np.float64)
        high_price_np_array = np.array(high_price_list, dtype=np.float64)
        low_price_np_array = np.array(low_price_list, dtype=np.float64)
        close_price_np_array = np.array(close_price_list, dtype=np.float64)
        gateway_name_np_array = np.array(gateway_name_list, dtype=np.str)

        if True:
            symbol_shm = SharedMemory(name=None, create=True, size=symbol_np_array.nbytes)
            symbol_shm_array = np.ndarray(symbol_np_array.shape, dtype=symbol_np_array.dtype, buffer=symbol_shm.buf)
            np.copyto(symbol_shm_array, symbol_np_array)

            exchange_shm = SharedMemory(name=None, create=True, size=exchange_np_array.nbytes)
            exchange_shm_array = np.ndarray(exchange_np_array.shape, dtype=exchange_np_array.dtype,
                                            buffer=exchange_shm.buf)
            np.copyto(exchange_shm_array, exchange_np_array)

            datetime_shm = SharedMemory(name=None, create=True, size=datetime_np_array.nbytes)
            datetime_shm_array = np.ndarray(datetime_np_array.shape, dtype=datetime_np_array.dtype,
                                            buffer=datetime_shm.buf)
            np.copyto(datetime_shm_array, datetime_np_array)

            interval_shm = SharedMemory(name=None, create=True, size=interval_np_array.nbytes)
            interval_shm_array = np.ndarray(interval_np_array.shape, dtype=interval_np_array.dtype,
                                            buffer=interval_shm.buf)
            np.copyto(interval_shm_array, interval_np_array)

            volume_shm = SharedMemory(name=None, create=True, size=volume_np_array.nbytes)
            volume_shm_array = np.ndarray(volume_np_array.shape, dtype=volume_np_array.dtype,
                                          buffer=volume_shm.buf)
            np.copyto(volume_shm_array, volume_np_array)

            turnover_shm = SharedMemory(name=None, create=True, size=turnover_np_array.nbytes)
            turnover_shm_array = np.ndarray(turnover_np_array.shape, dtype=turnover_np_array.dtype,
                                            buffer=turnover_shm.buf)
            np.copyto(turnover_shm_array, turnover_np_array)

            open_interest_shm = SharedMemory(name=None, create=True, size=open_interest_np_array.nbytes)
            open_interest_shm_array = np.ndarray(open_interest_np_array.shape, dtype=open_interest_np_array.dtype,
                                                 buffer=open_interest_shm.buf)
            np.copyto(open_interest_shm_array, open_interest_np_array)

            open_price_shm = SharedMemory(name=None, create=True, size=open_price_np_array.nbytes)
            open_price_shm_array = np.ndarray(open_price_np_array.shape, dtype=open_price_np_array.dtype,
                                              buffer=open_price_shm.buf)
            np.copyto(open_price_shm_array, open_price_np_array)

            high_price_shm = SharedMemory(name=None, create=True, size=high_price_np_array.nbytes)
            high_price_shm_array = np.ndarray(high_price_np_array.shape, dtype=high_price_np_array.dtype,
                                              buffer=high_price_shm.buf)
            np.copyto(high_price_shm_array, high_price_np_array)

            low_price_shm = SharedMemory(name=None, create=True, size=low_price_np_array.nbytes)
            low_price_shm_array = np.ndarray(low_price_np_array.shape, dtype=low_price_np_array.dtype,
                                             buffer=low_price_shm.buf)
            np.copyto(low_price_shm_array, low_price_np_array)

            close_price_shm = SharedMemory(name=None, create=True, size=close_price_np_array.nbytes)
            close_price_shm_array = np.ndarray(close_price_np_array.shape, dtype=close_price_np_array.dtype,
                                               buffer=close_price_shm.buf)
            np.copyto(close_price_shm_array, close_price_np_array)

            gateway_name_shm = SharedMemory(name=None, create=True, size=gateway_name_np_array.nbytes)
            gateway_name_shm_array = np.ndarray(gateway_name_np_array.shape, dtype=gateway_name_np_array.dtype,
                                                buffer=gateway_name_shm.buf)
            np.copyto(gateway_name_shm_array, gateway_name_np_array)

            shm_dict = {
                "symbol": {"shm": symbol_shm, 'shape': symbol_np_array.shape, "dtype": symbol_np_array.dtype},
                "exchange": {"shm": exchange_shm, 'shape': exchange_np_array.shape, "dtype": exchange_np_array.dtype},
                "datetime": {"shm": datetime_shm, 'shape': datetime_np_array.shape, "dtype": datetime_np_array.dtype},
                "interval": {"shm": interval_shm, 'shape': interval_np_array.shape, "dtype": interval_np_array.dtype},
                "volume": {"shm": volume_shm, 'shape': volume_np_array.shape, "dtype": volume_np_array.dtype},
                "turnover": {"shm": turnover_shm, 'shape': turnover_np_array.shape, "dtype": turnover_np_array.dtype},
                "open_interest": {"shm": open_interest_shm, 'shape': open_interest_np_array.shape,
                                  "dtype": open_interest_np_array.dtype},
                "open_price": {"shm": open_price_shm, 'shape': open_price_np_array.shape,
                               "dtype": open_price_np_array.dtype},
                "high_price": {"shm": high_price_shm, 'shape': high_price_np_array.shape,
                               "dtype": high_price_np_array.dtype},
                "low_price": {"shm": low_price_shm, 'shape': low_price_np_array.shape,
                              "dtype": low_price_np_array.dtype},
                "close_price": {"shm": close_price_shm, 'shape': close_price_np_array.shape,
                                "dtype": close_price_np_array.dtype},
                "gateway_name": {"shm": gateway_name_shm, 'shape': gateway_name_np_array.shape,
                                 "dtype": gateway_name_np_array.dtype},
            }

        self.output("开始创建共享内存:")

        self.output(
            f"主进程{self.current_pid}:创建共享内存成功")
        self.output("*" * 30)
        del self.history_data
        del self.dts

        return shm_dict

    def my_run_backtesting(self, shm_dict: dict, shm_count_dict: dict, optimization_count: int,
                           tag_str: str = 'tag_str'):
        """"""
        if shm_dict is None:
            # 如果不进行共享内存,则采用原来的run_backtesting
            self.load_data()
            self.run_backtesting()
            # 运行总数
            tmp_shm = shm_count_dict.get('shm')
            tmp_shape = shm_count_dict.get('shape')
            tmp_dtype = shm_count_dict.get('dtype')
            optimization_count_shm = SharedMemory(name=tmp_shm.name)
            optimization_count_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype,
                                                      buffer=optimization_count_shm.buf)
            optimization_count_shm_array[0] += 1
            self.output(
                f"【{tag_str}】子进程{self.current_pid}:采用默认进程内存方式回测。总:{optimization_count},当前:{optimization_count_shm_array[0]}")
            self.output(f'采用参数组合:{self.strategy.get_parameters()}')
            self.output("*" * 30)
            return

        # --------------------------采用共享内存的方式--------------------------
        self.strategy.on_init()
        symbol_shm, exchange_shm, datetime_shm, interval_shm = None, None, None, None
        volume_shm, turnover_shm, open_interest_shm, open_price_shm = None, None, None, None
        high_price_shm, low_price_shm, close_price_shm, gateway_name_shm = None, None, None, None

        symbol_shm_array, exchange_shm_array, datetime_shm_array, interval_shm_array = None, None, None, None
        volume_shm_array, turnover_shm_array, open_interest_shm_array, open_price_shm_array = None, None, None, None
        high_price_shm_array, low_price_shm_array, close_price_shm_array, gateway_name_shm_array = None, None, None, None
        for k, d in shm_dict.items():
            tmp_shm = d.get('shm')
            tmp_shape = d.get('shape')
            tmp_dtype = d.get('dtype')
            if True:
                if k == 'symbol':
                    symbol_shm = SharedMemory(name=tmp_shm.name)
                    symbol_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=symbol_shm.buf)
                elif k == 'exchange':
                    exchange_shm = SharedMemory(name=tmp_shm.name)
                    exchange_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=exchange_shm.buf)
                elif k == 'datetime':
                    datetime_shm = SharedMemory(name=tmp_shm.name)
                    datetime_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=datetime_shm.buf)
                elif k == 'interval':
                    interval_shm = SharedMemory(name=tmp_shm.name)
                    interval_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=interval_shm.buf)
                elif k == 'volume':
                    volume_shm = SharedMemory(name=tmp_shm.name)
                    volume_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=volume_shm.buf)
                elif k == 'turnover':
                    turnover_shm = SharedMemory(name=tmp_shm.name)
                    turnover_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=turnover_shm.buf)
                elif k == 'open_interest':
                    open_interest_shm = SharedMemory(name=tmp_shm.name)
                    open_interest_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=open_interest_shm.buf)
                elif k == 'open_price':
                    open_price_shm = SharedMemory(name=tmp_shm.name)
                    open_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=open_price_shm.buf)
                elif k == 'high_price':
                    high_price_shm = SharedMemory(name=tmp_shm.name)
                    high_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=high_price_shm.buf)
                elif k == 'low_price':
                    low_price_shm = SharedMemory(name=tmp_shm.name)
                    low_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=low_price_shm.buf)
                elif k == 'close_price':
                    close_price_shm = SharedMemory(name=tmp_shm.name)
                    close_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=close_price_shm.buf)
                elif k == 'gateway_name':
                    gateway_name_shm = SharedMemory(name=tmp_shm.name)
                    gateway_name_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=gateway_name_shm.buf)

        # 运行总数
        tmp_shm = shm_count_dict.get('shm')
        tmp_shape = shm_count_dict.get('shape')
        tmp_dtype = shm_count_dict.get('dtype')
        optimization_count_shm = SharedMemory(name=tmp_shm.name)
        optimization_count_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=optimization_count_shm.buf)
        optimization_count_shm_array[0] += 1

        self.output("*" * 30)
        self.output(
            f"【{tag_str}】子进程{self.current_pid}:取出共享内存进行回测。总:{optimization_count},当前:{optimization_count_shm_array[0]}")

        # -------------遍历数据-------------
        bars = []
        bars_count = 0
        vt_c = len(self.vt_symbols)
        day_count = 0
        exchange_members = Exchange.__members__
        interval_members = Interval.__members__
        for symbol, exchange_value, datetime_float, interval_value, volume, turnover, open_interest, open_price, high_price, low_price, close_price, gateway_name in zip(
                symbol_shm_array, exchange_shm_array, datetime_shm_array, interval_shm_array,
                volume_shm_array, turnover_shm_array, open_interest_shm_array, open_price_shm_array,
                high_price_shm_array, low_price_shm_array, close_price_shm_array, gateway_name_shm_array):
            dt = datetime.fromtimestamp(datetime_float - 28800)
            bar: BarData = BarData(
                symbol=symbol,
                exchange=exchange_members.get(Exchange(exchange_value).name),
                datetime=dt,
                interval=interval_members.get(Interval(interval_value).name),
                volume=volume,
                turnover=turnover,
                open_interest=open_interest,
                open_price=open_price,
                high_price=high_price,
                low_price=low_price,
                close_price=close_price,
                gateway_name=gateway_name,
            )

            bars.append(bar)

            bars_count += 1
            if bars_count < vt_c:
                continue
            bars_count = 0

            # 以下判断是否初始化结束
            if not self.strategy.inited:
                if self.datetime and dt.day != self.datetime.day:
                    day_count += 1
                    if day_count >= self.days:
                        self.strategy.inited = True
                        self.output("策略初始化完成")
                        self.strategy.on_start()
                        self.strategy.trading = True
                        self.output("开始回放历史数据")

            try:
                self.my_new_bars(dt, bars)
            except Exception:
                self.output("触发异常,回测终止")
                self.output(traceback.format_exc())
                return
            bars = []

        self.output("历史数据回放结束")

    def my_new_bars(self, dt: datetime, bars_list: List[BarData]) -> None:
        self.datetime = dt

        bars: Dict[str, BarData] = {}
        bars_ix = 0
        for vt_symbol in self.vt_symbols:
            bar: BarData = bars_list[bars_ix]
            if bar:
                self.bars[vt_symbol] = bar
                bars[vt_symbol] = bar
            else:
                old_bar = self.bars[vt_symbol]
                bar = BarData(
                    symbol=old_bar.symbol,
                    exchange=old_bar.exchange,
                    datetime=dt,
                    open_price=old_bar.close_price,
                    high_price=old_bar.close_price,
                    low_price=old_bar.close_price,
                    close_price=old_bar.close_price,
                    gateway_name=old_bar.gateway_name
                )
                self.bars[vt_symbol] = bar
            bars_ix += 1

        self.cross_limit_order()
        self.update_daily_close(self.bars, dt)
        self.update_dynamic_capital(dt, self.bars)
        self.strategy.on_bars(bars)

    def my_run_ga_optimization(self,
                               optimization_setting: OptimizationSetting,
                               max_workers: int,
                               output=True,
                               population_size: int = 200,
                               ngen_size: int = 60,
                               is_shared_memory: bool = False,
                               tag_str: str = '',
                               ):
        """"""
        if not check_optimization_setting(optimization_setting):
            return

        shm_dict = None
        if is_shared_memory:
            shm_dict = self.my_create_shared_memory()

        shm_count_dict = self.my_create_shared_memory_count()

        evaluate_func: callable = my_wrap_evaluate(self,
                                                   optimization_setting.target_name,
                                                   shm_dict,
                                                   shm_count_dict,
                                                   len(optimization_setting.generate_settings()),
                                                   tag_str=tag_str)
        results = my_run_ga_optimization(
            evaluate_func,
            optimization_setting,
            get_target_value,
            max_workers,
            population_size=population_size,
            ngen_size=ngen_size,
            output=self.output
        )

        if output:
            for result in results:
                msg: str = f"参数:{result[0]}, 目标:{result[1]}"
                self.output(msg)

        # close共享内存
        for key, dict_shm in shm_dict.items():
            dict_shm.get('shm').close()
            dict_shm.get('shm').unlink()

        return results


@dataclass
class VtSymbolDynamicPnl(BaseData):
    """
    Trade data contains information of a fill of an order. One order
    can have several trade fills.
    """

    vt_symbol: str
    size: float  # 比率
    rate: float  # 手续费率
    margin_ratio: float  # 保证金比例
    slippage: float  # 滑点

    # 下面这些都是按照逐笔对冲的逻辑的变量
    dynamic_direction: Direction = Direction.LONG
    dynamic_open_pos_commission: float = 0  # 只涉及加仓的时候的手续费累积,平仓的时候不要加进去
    dynamic_volume: float = 0  # 该品种的动态持仓
    dynamic_occupy_capital: float = 0  # 当前占用资金
    dynamic_pnl: float = 0  # 该品种的动态盈亏累计

    dynamic_cost_price: float = 0  # 该品种每次加仓后,得出的平均成本价

    total_pnl: float = 0  # 该品种的所有盈亏累计
    total_trade_times: int = 0

    datetime: datetime = None

    def __post_init__(self):
        """"""
        self.symbol, self.exchange = extract_vt_symbol(self.vt_symbol)


def load_bar_data_without_lru_cache(
    vt_symbol: str,
    interval: Interval,
    start: datetime,
    end: datetime,
):
    """"""
    symbol, exchange = extract_vt_symbol(vt_symbol)
    return my_database_manager.load_bar_data(
        symbol, exchange, interval, start, end, collection_name=vt_symbol
    )


def my_wrap_evaluate(engine: MyBacktestingEngine, target_name: str,
                     shm_dict: dict, shm_count_dict: dict, optimization_count: int, tag_str: str = '',
                     ) -> callable:
    """
    Wrap evaluate function with given setting from backtesting engine.
    """
    func: callable = partial(
        my_evaluate,
        shm_dict,
        shm_count_dict,
        optimization_count,
        tag_str,
        engine.fixed_param_dict,
        target_name,
        engine.strategy_class,
        engine.vt_symbols,
        engine.interval,
        engine.start,
        engine.rates,
        engine.slippages,
        engine.sizes,
        engine.priceticks,
        engine.margin_ratios,
        engine.capital,
        engine.end
    )
    return func


def my_evaluate(
        shm_dict: dict,
        shm_count_dict: dict,
        optimization_count: int,
        tag_str: str,
        fixed_param_dict: dict,  # 固定参数
        target_name: str,
        strategy_class: StrategyTemplate,
        vt_symbols: List[str],
        interval: Interval,
        start: datetime,
        rates: Dict[str, float],
        slippages: Dict[str, float],
        sizes: Dict[str, float],
        priceticks: Dict[str, float],
        margin_ratios: Dict[str, float] or int or float,
        capital: int,
        end: datetime,
        setting: dict
):
    """
    Function for running in multiprocessing. Pool
    """
    engine = MyBacktestingEngine()

    engine.my_set_parameters(
        vt_symbols=vt_symbols,
        interval=interval,
        start=start,
        rates=rates,
        slippages=slippages,
        sizes=sizes,
        priceticks=priceticks,
        margin_ratios=margin_ratios,
        capital=capital,
        end=end,
    )
    setting.update(fixed_param_dict)
    # noinspection PyTypeChecker
    engine.add_strategy(strategy_class, setting)
    engine.my_run_backtesting(shm_dict, shm_count_dict, optimization_count, tag_str=tag_str)
    engine.calculate_result()
    statistics = engine.calculate_statistics(output=True)

    # statistics = engine.my_stats.statistics
    target_value = statistics[target_name]

    engine.clear_data()

    return str(setting), target_value, statistics
Member
avatar
加入于:
帖子: 58
声望: 0

第二份代码是my_optimize.py

from typing import Dict, List, Tuple
from random import random, choice
from time import perf_counter
from multiprocessing import Manager, Pool
from deap import creator, base, tools, algorithms
from vnpy.trader.optimize import OUTPUT_FUNC, EVALUATE_FUNC, KEY_FUNC, OptimizationSetting, check_optimization_setting


def my_run_ga_optimization(
    evaluate_func: EVALUATE_FUNC,
    optimization_setting: OptimizationSetting,
    key_func: KEY_FUNC,
    max_workers: int = None,
    population_size: int = 100,
    ngen_size: int = 30,
    output: OUTPUT_FUNC = print,
) -> List[Tuple]:
    """Run genetic algorithm optimization"""
    # Define functions for generate parameter randomly
    buf: List[Dict] = optimization_setting.generate_settings()
    settings: List[Tuple] = [list(d.items()) for d in buf]

    def generate_parameter() -> list:
        """"""
        return choice(settings)

    def mutate_individual(individual: list, indpb: float) -> tuple:
        """"""
        size: int = len(individual)
        paramlist: list = generate_parameter()
        for i in range(size):
            if random() < indpb:
                individual[i] = paramlist[i]
        return individual,

    # Set up multiprocessing Pool and Manager
    with Manager() as manager, Pool(max_workers) as pool:
        # 创建共享内存
        # Create shared dict for result cache
        cache: Dict[Tuple, Tuple] = manager.dict()

        # Set up toolbox
        toolbox: base.Toolbox = base.Toolbox()
        toolbox.register("individual", tools.initIterate, creator.Individual, generate_parameter)
        toolbox.register("population", tools.initRepeat, list, toolbox.individual)
        toolbox.register("mate", tools.cxTwoPoint)
        toolbox.register("mutate", mutate_individual, indpb=1)
        toolbox.register("select", tools.selNSGA2)
        toolbox.register("map", pool.map)
        toolbox.register(
            "evaluate",
            my_ga_evaluate,
            cache,
            evaluate_func,
            key_func,
        )

        total_size: int = len(settings)
        pop_size: int = population_size                      # number of individuals in each generation
        lambda_: int = pop_size                              # number of children to produce at each generation
        mu: int = int(pop_size * 0.8)                        # number of individuals to select for the next generation

        cxpb: float = 0.95         # probability that an offspring is produced by crossover
        mutpb: float = 1 - cxpb    # probability that an offspring is produced by mutation
        ngen: int = ngen_size    # number of generation

        pop: list = toolbox.population(pop_size)

        # Run ga optimization
        output("开始执行遗传算法优化")
        output(f"参数优化空间:{total_size}")
        output(f"每代族群总数:{pop_size}")
        output(f"优良筛选个数:{mu}")
        output(f"迭代次数:{ngen}")
        output(f"交叉概率:{cxpb:.0%}")
        output(f"突变概率:{mutpb:.0%}")

        start: int = perf_counter()

        algorithms.eaMuPlusLambda(
            pop,
            toolbox,
            mu,
            lambda_,
            cxpb,
            mutpb,
            ngen,
            verbose=True
        )

        end: int = perf_counter()
        cost: int = int((end - start))

        output("总共读取{}次缓存数据".format(cache["old_cal"]))
        output("总{}次新计算".format(cache["new_cal"]))

        output(f"遗传算法优化完成,耗时{cost}秒")

        results: list = list(cache.values())
        results.sort(reverse=True, key=key_func)
        return results


def my_ga_evaluate(
    cache: dict,
    evaluate_func: callable,
    key_func: callable,
    parameters: list,
):
    """
        Functions to be run in genetic algorithm optimization.
        """
    tp: tuple = tuple(parameters)

    if "new_cal" not in cache:
        cache['new_cal'] = 0
        cache['old_cal'] = 0

    if tp in cache:
        result: tuple = cache[tp]
        cache['old_cal'] += 1
        print("第{}次读取缓存数据".format(cache['old_cal']))
    else:
        cache['new_cal'] += 1
        print("第{}次计算新数据".format(cache['new_cal']))
        setting: dict = dict(parameters)
        result: dict = evaluate_func(setting)
        cache[tp] = result

    value: float = key_func(result)
    return (value,)
Member
avatar
加入于:
帖子: 58
声望: 0

第三部分是不完整的代码,主要表述的是怎么调动这两个文件,由于有大部分我个人的隐私代码,所以只给出下面这个核心,看懂应该问题不大

my_backtesting_engine = MyBacktestingEngine()
my_backtesting_engine.add_strategy(ga_strategy_class, {})
my_backtesting_engine.my_set_parameters(vt_symbols=ga_vt_symbols,
                                                interval=Interval.MINUTE,
                                                start=ga_start,
                                                end=ga_end,
                                                rates=ga_rates,
                                                slippages=ga_slippages,
                                                sizes=ga_sizes,
                                                priceticks=ga_price_ticks,
                                                margin_ratios=ga_margin_ratios,
                                                capital=ga_capital)
result_list = my_backtesting_engine.my_run_ga_optimization(optimization_setting=op_setting,
                                                                   max_workers=pool_num,
                                                                   is_shared_memory=True)
Member
avatar
加入于:
帖子: 1613
声望: 115

感谢分享!

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】