VeighNa量化社区
你的开源社区量化交易平台
Member
加入于:
帖子: 162
声望: 71

之前为了实现利用遗传算法,进行多进程策略的优化,学习研究了python的多进程库Multiprocessing。以前感觉真是黑科技,学习后发现,还是python优点,简单好用,对于一般应用还是很好理解。

首先,由于GIL(全局解释锁)的问题,全局对象只能一个进程调用,python多线程并不能充分利用多核处理器,比如有时候用pandas跑大型数据分析,发现只有一核在累死累活。如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。multiprocessing可以给每个进程赋予单独的Python解释器,这样就规避了全局解释锁所带来的问题。可以理解为多核CPU分配好一个工作任务,这个工作任务包括工作方法和工作内容。

其实python多线程很简单,相对于其他语言来说。其实简单就是针对需要多线程的方法func(a),a是参数。相当于工作内容;使用 Multiprocessing. Process(target = func, args =(a,)),创建一个Prcoess对象,也就是工作任务,再启动这个对象,这样一个多进程任务就完成了。等CPU分配一个独立核去干活,func(a)就开动了。这里唯一要注意args是默认输入元祖参数。

P = Multiprocessing.Process(target = func, args =(a,))
P.start()

Multiprocessing提供了更简洁的pool做为进程池,其实叫任务池更为恰当。把需要干的工作任务打包好,放在这个池子里面,这样空闲下来的核心就捡pool的任务干活。

常见的pool的使用如下,其中prcesses = 4 是定义任务池大小,不一定要小于或者等于cpu核心数量,可以大于cpu核心数量,不过这样就有几个任务空挂着还占用内存。

然后使用pool方法apply_async(task, args=(x,)),把打包好的任务插入池中。 apply_asyncs是异步的带返回值。如果用apply也可以正常,但是会没有返回值,此处不仔细研究了。

之后close()是把这个任务池关闭,不再接受新的任务;但是还有一些已有任务在跑,所以用pool.join(),吊着主程序,直到所有任务完成才进入下一步。

if __name__ == '__main__':
    Multiprocessing.pool = Pool(processes=4)
    for x in range(10):
        pool.apply_async(task, args=(x,))
    pool.close()
    pool.join()

下面看看VNPY多进程优化方法。其实很好理解了,runParallelOptimization是类BacktestingEngine的一个方法。

传入参数strategyClass就是这个策略类,setting是要优化参数范围,后面通过optimizationSetting.generateSetting()生成策略参数队列,做为任务内容;optimizationSetting.optimizeTarget是后面返回值。至于回测品种,回测时间段,交易费用什么,在 BacktestingEngine创建时候维护了。

然后创建任务池pool,大小刚好是cpu核数,这个也是比较稳妥设置。

之后做一个l队列来放返回值。

然后打包策略类,回测参数,策略参数做为任务内容,和任务方法optimize一起组合为一个工作任务。然后插入任务池给cpu核心去跑。这个时候在系统监视器可以看到于核心数相同的python虚拟环境运作。

然后就是对返回值排序。后面详细说说。

df =  engine.runParallelOptimization(AtrRsiStrategy, setting)
def runParallelOptimization(self, strategyClass, optimizationSetting):
    """并行优化参数"""
    # 获取优化设置        
    settingList = optimizationSetting.generateSetting()
    targetName = optimizationSetting.optimizeTarget

    # 检查参数设置问题
    if not settingList or not targetName:
        self.output(u'优化设置有问题,请检查')

    # 多进程优化,启动一个对应CPU核心数量的进程池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    l = []
    for setting in settingList:
        l.append(pool.apply_async(optimize, (strategyClass, setting,
                                             targetName, self.mode, 
                                             self.startDate, self.initDays, self.endDate,
                                             self.slippage, self.rate, self.size, self.priceTick,
                                             self.dbName, self.symbol)))
    pool.close()
    pool.join()

    # 显示结果
    resultList = [res.get() for res in l]
    resultList.sort(reverse=True, key=lambda result:result[1])
    return resultList

像现在双核四线程就有四个python环境在跑任务。

enter image description here
这里会发现是用静态方法optimize,如果直接调用 BacktestingEngine的回测方法更简洁,为什么没有呢,这个是python2.7的Multiprocessing的一个局限,只能打包静态方法做为工作方法,如果打包类中的方法,会提示错误。

cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup builtin .instanceme

如果VNPY2.0基于python3.6版本,应该就会更简化一些。

下面看看 静态方法 optimize,其实没什么好说,就是新建一个回测引擎BacktestingEngine对象,按照参数跑一遍回测,返回一个元祖,包含了这次回测的参数,针对回测目标的值,和一个包含回测结果的字典,这个字典包括什么年化收入,sharpe等一堆回测结果。

然后所有的回测结果元祖组成一个回测结果队列,这个结果队列按照targetValue反向排序,最大放在第一位。

因为太多了,一般我都是输出到excel里面,之前说过怎么实现。

#----------------------------------------------------------------------
def optimize(strategyClass, setting, targetName,
             mode, startDate, initDays, endDate,
             slippage, rate, size, priceTick,
             dbName, symbol):
    """多进程优化时跑在每个进程中运行的函数"""
    engine = BacktestingEngine()
    engine.setBacktestingMode(mode)
    engine.setStartDate(startDate, initDays)
    engine.setEndDate(endDate)
    engine.setSlippage(slippage)
    engine.setRate(rate)
    engine.setSize(size)
    engine.setPriceTick(priceTick)
    engine.setDatabase(dbName, symbol)
    engine.initStrategy(strategyClass, setting)
    engine.runBacktesting()
    engine.calculateDailyResult()
    d, result = engine.calculateDailyStatistics()
    try:
        targetValue = result[targetName]
    except KeyError:
        targetValue = 0
    return (str(setting), targetValue, result)

其实python的多进程库Multiprocessing不算复杂,但是用在回测上效果很好;现在有了遗传算法,进行策略优化更加方便了。

Member
加入于:
帖子: 22
声望: 1

解析的很清楚,非常感谢

Member
avatar
加入于:
帖子: 2
声望: 0

学习,向大神致敬!

Member
avatar
加入于:
帖子: 4
声望: 0

解释得很棒,非常感谢

Member
avatar
加入于:
帖子: 1
声望: 0

感谢分享!想咨询下,我下载VNPY源码运行, 在rest_client这里的多进程处理函数, 在centos7.6 环境下发现在
task = pool.apply_async(self._process_request, args=[request, ], callback=self._clean_finished_tasks,)
多进程函数时无法调用处理函数self._process_request, 在windows环境下运行是正常的, 请问这个问题该如何解决?谢谢!

Member
avatar
加入于:
帖子: 2
声望: 0

感谢分享!想咨询下
暴力穷举优化中创建进程时,强制使用spawn而非fork,有何原因吗?

Super Moderator
avatar
加入于:
帖子: 31
声望: 14

如果是Linux系统,需要使用fork 。

如果是window系统,需要使用spawn。

详情请查看文档python multiprocess

Member
avatar
加入于:
帖子: 2
声望: 0

Use multiprocessing pool for running backtesting with different setting

Force to use spawn method to create new process (instead of fork on Linux)

ctx = multiprocessing.get_context("spawn")
pool = ctx.Pool(multiprocessing.cpu_count())

谢谢解答!
by the way
我以为是说:即使在linux也应该使用spawn,(不应该用linux默认的fork)。

Member
avatar
加入于:
帖子: 26
声望: 12

谢谢老师的指导!您给论坛贡献的文章,价值非常高!

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】