vn.py量化社区
By Traders, For Traders.
Member
avatar
加入于:
帖子: 8
声望: 0

论坛大神提供的多进程遗传算法一直在使用,但一直纠结于多进程无法完美使用lru_cache,因为每个进程都只是缓存了部分数据,而这些数据将来也未必被同一个cpu分到,经过测试命中率极低。
最近由于需要做些参数平原的分析,顺带测试了在多进程遗传算法中引入redis,发现还是比较好用的。遗传算法毕竟有很多重复计算内容,当然如果你cpu主频够高,单次回测计算速度够快就不建议啦。
BTW, 个人试了下,在单进程下速度依然可以和lru_cache媲美,由于代码写的很简单,就不单独说明了,贴在下面,欢迎拍砖。

Member
avatar
加入于:
帖子: 8
声望: 0
import os
import sys
import importlib as imp
imp.reload(sys)
ROOT_PATH = os.path.abspath(os.path.join(
    os.path.dirname('__file__'), '..', '..', '..'))
sys.path.append(ROOT_PATH)
import multiprocessing
from time import time
from datetime import datetime
import random
import pandas as pd
import numpy as np
import pathlib
from deap import creator, base, tools, algorithms
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)
import redis
redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=99)

from vnpy.app.cta_strategy import BacktestingEngine, OptimizationSetting
from vnpy.app.cta_strategy.strategies.turtle_strategy_for_BTC import TurtleForBTCStrategy
from vnpy.app.cta_strategy.strategies.boll_channel_evolution_strategy import BollChannelEvolutionStrategy
from vnpy.app.cta_strategy.base import OptimizeForBacktesting

class GeneticOptimizeStrategy(object):

    def __init__(self):
        self.settings_ga = []
        self.s = OptimizationSetting()
        self.back = BacktestingEngine()
        self.settings_param = {}

    def backtesting_setting(self, vt_symbol, interval, start, end, rate, slippage, size, pricetick, capital, strategy, target):
        self.settings_param["vt_symbol"] = vt_symbol
        self.settings_param["interval"] = interval
        self.settings_param["start"] = start
        self.settings_param["end"] = end
        self.settings_param["rate"] = rate
        self.settings_param["slippage"] = slippage
        self.settings_param["size"] = size
        self.settings_param["pricetick"] = pricetick
        self.settings_param["capital"] = capital
        self.settings_param["strategy"] = strategy
        self.settings_param["target"] = target

        return self.settings_param

    def add_parameter(self, name, start, end, step):
        self.s.add_parameter(name, start, end, step)

    def generate_setting_ga(self):
        settings = self.s.generate_setting()     
        for d in settings:            
            param = [tuple(i) for i in d.items()]
            self.settings_ga.append(param)
        return self.settings_ga

    def generate_parameter(self):
        """"""
        return random.choice(self.settings_ga)

    def mutArrayGroup(self, individual, indpb):
        size = len(individual)
        paralist = self.generate_parameter()
        for i in range(size):
            if random.random() < indpb:
                individual[i] = paralist[i]
        return individual,

    def object_func(self, strategy_avg):
        """"""
        return self._object_func(tuple(strategy_avg))

    def optimize(self):
        """"""
        start = time()            
        toolbox = base.Toolbox()  
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
        toolbox.register("map", pool.map)
        # 初始化
        toolbox.register("individual", tools.initIterate, creator.Individual, self.generate_parameter)  
        toolbox.register("population", tools.initRepeat, list, toolbox.individual)  
        toolbox.register("mate", tools.cxTwoPoint) 
        toolbox.register("mutate", self.mutArrayGroup, indpb=1) 
        toolbox.register("evaluate", self.object_func) 
        # toolbox.register("select", tools.selNSGA2) 
        toolbox.register("select", tools.selBest) 

        MU =100
        LAMBDA = 80
        POP = 100
        pop = toolbox.population(POP)  
        CXPB, MUTPB, NGEN = 0.95, 0.05, 30
        hof = tools.ParetoFront() 

        stats = tools.Statistics(lambda ind: ind.fitness.values)
        np.set_printoptions(suppress=True) 
        stats.register("mean", np.mean, axis=0) 
        stats.register("std", np.std, axis=0) 
        stats.register("min", np.min, axis=0) 
        stats.register("max", np.max, axis=0) 

        print("开始运行遗传算法,每代族群总数:%s, 优良品种筛选个数:%s,迭代次数:%s,交叉概率:%s,突变概率:%s" % (POP, MU, NGEN, CXPB, MUTPB))
        algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats, halloffame=hof, verbose=True) 
        pool.close()
        end = time()
        cost = int((end - start))

        print("遗传算法优化完成,耗时%s秒" % (cost))
        print("----------输出帕累托前沿解集,解集数量%s----------" % (len(hof)))
        # return hof
        for i in range(len(hof)):
            solution = hof[i]
            print(solution)

    def _object_func(self, strategy_avg):
        #######################################################
        # redis 查询
        conn = redis.Redis(connection_pool=redis_pool)
        target = conn.get(str(strategy_avg))
        if target is not None:
            print(f'redis读取成功!!!')
            return float(target),
        #######################################################
        engine = self.back
        engine.set_parameters(
            vt_symbol=self.settings_param["vt_symbol"],
            interval=self.settings_param["interval"],
            start=self.settings_param["start"],
            end=self.settings_param["end"],
            rate=self.settings_param["rate"],
            slippage=self.settings_param["slippage"],
            size=self.settings_param["size"],
            pricetick=self.settings_param["pricetick"],
            capital=self.settings_param["capital"],
            optimize_mem=self.settings_param["optimize_mem"]
        )
        setting = dict(strategy_avg)
        engine.add_strategy(self.settings_param["strategy"], setting)
        engine.load_data()
        engine.run_backtesting()
        engine.calculate_result()
        result = engine.calculate_statistics(output=False)
        target = round(result[self.settings_param["target"]], 5)
        #######################################################
        # 插入 redis
        conn.setnx(str(strategy_avg), target)  # 存在则不增加
        print('redis写入成功')
        #######################################################
        return target,

    def get_result_from_redis(self):

        conn = redis.Redis(connection_pool=redis_pool)
        df = pd.DataFrame() # redis全部读入df

        keys = conn.keys()
        redis_keys = [tuple(eval(k)) for k in keys]
        col_keys = [ parameter[0] for parameter in redis_keys[0]]
        # 存入parameter进入df
        for key,i in zip(redis_keys[0],range(len(redis_keys[0]))):
            df[key[0]] = np.array([k[i][1] for k in redis_keys])

        # 存result
        result_temp = conn.mget(keys)
        df['result'] = np.array([float(_.decode(encoding='utf8')) for _ in result_temp])
        col_keys.append('result')

        # 存入cdv
        file_name = '_'.join([
            str(self.settings_param["strategy"]),
            str(self.settings_param["start"]),
            str(self.settings_param["end"]),
            '.csv'])
        resultl_path = os.path.abspath(
            os.path.join('put your own path')
        )
        path = pathlib.Path(resultl_path)

        if path.exists() and path.is_file():
            df.to_csv(result_path,mode='a', header=None)
        else:
            df.to_csv(resultl_path,columns=col_keys)

        # conn.flushall() 看情况是否需要清空redis

def main():
    GE = GeneticOptimizeStrategy()
    GE.backtesting_setting(
        vt_symbol="BTC/USD.BITFINEX",
        interval="1m",
        start=datetime(2018, 8, 11),
        end=datetime(2019, 12, 31),
        rate=0.2 / 100,
        slippage=0.3,
        size=3,
        pricetick=0.1,
        capital=1_000_000,
        strategy=BollChannelEvolutionStrategy,
        target="sharpe_ratio",
     )
    GE.add_parameter('boll_window', 50, 780,10)
    GE.add_parameter('boll_dev', 1.5, 3.0, 0.2)
    GE.add_parameter('cci_window', 100, 300, 10)
    GE.add_parameter('atr_window', 200, 600, 10)
    GE.add_parameter('sl_multiplier', 0.1, 3.0, 0.2)
    GE.generate_setting_ga()
    GE.generate_parameter()
    GE.optimize()
    GE.get_result_from_redis()

if __name__ == "__main__":
    main()
Member
avatar
加入于:
帖子: 1
声望: 0

b.下面这段代码里面resultl_path和result_path是不是有一个手误写错了..

resultl_path = os.path.abspath(
            os.path.join('put your own path')
        )
        path = pathlib.Path(resultl_path)
        if path.exists() and path.is_file():
            df.to_csv(result_path,mode='a', header=None)
        else:
            df.to_csv(resultl_path,columns=col_keys)

b. redis的python包需要低版本的才行(2.x) ,不然插入数据的时候会有类型错误问题.

Member
avatar
加入于:
帖子: 8
声望: 0

陈丹 wrote:

b.下面这段代码里面resultl_path和result_path是不是有一个手误写错了..

resultl_path = os.path.abspath(
            os.path.join('put your own path')
        )
        path = pathlib.Path(resultl_path)
        if path.exists() and path.is_file():
            df.to_csv(result_path,mode='a', header=None)
        else:
            df.to_csv(resultl_path,columns=col_keys)

b. redis的python包需要低版本的才行(2.x) ,不然插入数据的时候会有类型错误问题.

我个人没有发现这个问题,不过遇到问题的话,把数据str()下就好,我现场是python 3.7 ,安装的最新redis

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3