在做VNPY 策略参数优化的时候, 优化数量多的时候要好几天,有时候也不知道完成数量,只能干等。就稍微改了下代码,加入log显示完成数量和总数量,心里面有个底。
代码是在VNPY 1.9.2里面的因为我还在1.9.2的版本泥潭里,2.0之后直接用应该也可以的,只要稍微改下变量名称。
实质内容,就是python多进程共享变量访问更新,python提供几个不同的多进程变量共享的方法,比如使用可变对象queue。这里其实只用定义一个int类型的counter,每个进程跑完后,counter加一,并在log中记录。
这里用了一个multiprocessing.Manager类,Manager可以对定义共享变量,和锁管理
把下面代码加入到class BacktestingEngine的runParallelOptimization方法中
#----------------------------------------------------------------------
def runParallelOptimization(self, strategyClass, optimizationSetting):
"""并行优化参数"""
# 获取优化设置
settingList = optimizationSetting.generateSetting()
targetName = optimizationSetting.optimizeTarget
# 检查参数设置问题
if not settingList or not targetName:
self.output(u'优化设置有问题,请检查')
# 多进程优化,启动一个对应CPU核心数量的进程池
pool = multiprocessing.Pool(multiprocessing.cpu_count()-1, maxtasksperchild = 10)
l = []
length0 = len(settingList) # 获得参数优化带运行条
manger = multiprocessing.Manager() # 定义一个多进程管理对象
counter = manger.Value('i',0) # 定义一个共享变量记录完成条目,int类型的
lock = manger.Lock() # 顶一个访问锁,如果counter被一个进程修改时候,加锁,
for setting in settingList:
# 把刚刚顶一个的作为参数传入每个进程,实现变量共享
l.append(pool.apply_async(optimize, (strategyClass, setting,
targetName, self.mode,
self.startDate, self.initDays, self.endDate,
self.slippage, self.rate, self.size, self.priceTick,
self.dbName, self.symbol,counter,lock,length0)))
pool.close()
pool.join()
# 显示结果
resultList = [res.get() for res in l]
resultList.sort(reverse=True, key=lambda result:result[1])
return resultList
然后在optimize中,定义counter加1
#----------------------------------------------------------------------
def optimize(strategyClass, setting, targetName,
mode, startDate, initDays, endDate,
slippage, rate, size, priceTick,
dbName, symbol,counter,lock,length0):
"""多进程优化时跑在每个进程中运行的函数"""
......
engine.runBacktesting()
# 当优化完成时候,counter加1
with lock:
counter.value += 1
# 为了使用engine的log显示,更改calculateDailyResult,传入counter.value, length0
engine.calculateDailyResult(counter.value, length0)
.....
为了在log中显示,不得不修改下calculateDailyResult,其实这样改源代码不太合适,反正1.9.2已经被我搞得乱了,仅作参考,不合适的更新方法
#----------------------------------------------------------------------
def calculateDailyResult(self,count = 0,length0 = 0):
"""计算按日统计的交易结果"""
self.output(u'计算按日统计结果')
# 显示完成数
if count != 0:
self.output(u'执行完成个数 %s / %s' % (count,length0))
......
实现效果如下图,可以预估之后还有多少要完成。
当然有心可以加一个共享变量,记录下已经 每次更新间隔时间,乘上未完成个数,估算出还有多少时间才完成。