其实用VnTrader都知道,CTA策略启动分两步,第一步初始化,第二部启动。初始化要做的比较多,比如回放历史数据,加载保存交易数据等,如果加载历史数据较多,或者从数据源实时下载,是要一会。
如果no_ui, 或者定时运行;能不呢让CTA策略初始化完成后,自动启动呢。看起来很简单。
cta_engine.init_strategy(strategy.strategy_name)
cta_engine.start_strategy(strategy.strategy_name)
直接这样执行,后果都是提示: 策略{strategy.strategy_name}启动失败,请先初始化
为什么呢,因为虽然大多数情况python是单线程运行,但是这里init_strategy 使用了一个python 3的新特性线程池。
cta_engine源代码如下
self.init_executor = ThreadPoolExecutor(max_workers=1)
def init_strategy(self, strategy_name: str):
self.init_executor.submit(self._init_strategy, strategy_name)
这里不用具体去学习这个线程池,只要知道相对于vnTrade主线程,多建立一个独立线程专门处理策略初始化;不会因此阻塞主线程的运行。所以当初始化代码执行后,虽然初始化没有完成,系统也直接跳到下一步启动代码了,因此报错。
补充下,线程池有相对直接新建Thread有不少优点,主线程可以实时查询某一个线程(或者任务的)的状态,以及返回值。该线程完成的时候,主线程能够立即知道。
好了,知道上面这个线程池的情况;那么如何实现自动启动了;首先看看vnTrader自带的 no_ui示例代码。
cta_engine.init_all_strategies()
sleep(60) # Leave enough time to complete strategy initialization
main_engine.write_log("CTA策略全部初始化")
cta_engine.start_all_strategies()
main_engine.write_log("CTA策略全部启动")
好吧,这段代码有点笨拙,阻塞主线程60秒等待,虽然可以实现目的。但是如果CTA策略需要从数据源实时下载数据,或者CTA策略有上百个时候;还没有等全部初始话完成,就启动了。
改进下,去查询CTA策略初始化状态,等策略状态初始化完成后再启动。
下面代码是每隔5秒去遍历所有CTA策略,如果有一个策略未初始化,就跳出,等所有策略都初始化,进入下一步,启动全部CTA策略。
cta_engine.init_all_strategies()
all_strategise_inited = False
while not all_strategise_inited:
sleep(5)
for strategy in cta_engine.strategies.values():
if strategy.inited == False:
all_strategise_inited = False
break
all_strategise_inited = True
main_engine.write_log("CTA策略全部初始化=====")
cta_engine.start_all_strategies()
main_engine.write_log("CTA策略全部启动=====")
因为初始化线程池是单线池一个worker,初始化是一个接一个完成。 还有一个写法,针对每个策略等待;如果策略已经初始化完成,就执行,在观察下一个策略。
cta_engine.init_all_strategies()
strategies_list = cta_engine.strategies.values()
for strategy in strategies_list:
while not strategy.inited:
sleep(2)
cta_engine.start_strategy(strategy.strategy_name)
但是两个写法都有个一个问题,一个是sleep来阻塞主程不是很好代码写法;更重要是因为如果其中有一个CTA初始化失败报错,那么主线程就不会进入下一步,卡在while循环了。
有没有可能不用主动去查询CTA策略,而是等CTA策略自动完成初始化来回报提醒了。当然可以,这里有就几个实现方法,最方便就是使用VNPY自带的Event事件机制。真实很方便的黑技术。
当CTA策略状态改变时候会发送事件EVENT_CTA_STRATEGY,此时订阅这个事件,当接收到CTA状态时是初始化完成但是未启动时候,设置策略启动。代码如下
cta_engine.init_engine()
main_engine.write_log("CTA策略引擎初始化完成")
for_init_system = True
def once_strategy_inited(event):
strategy = event.data
vars = strategy["variables"]
if for_init_system and vars["inited"] and (not vars["trading"]):
main_engine.write_log(f"CTA策略{strategy['strategy_name']}初始化完成")
cta_engine.start_strategy(strategy["strategy_name"])
main_engine.write_log(f"CTA策略{strategy['strategy_name']}启动完成")
event_engine.register(EVENT_CTA_STRATEGY,once_strategy_inited)
main_engine.write_log("注册CTA策略事件监听")
cta_engine.init_all_strategies()
注意地方就是这里使用方法中直接写入方法的代码写法,这样就不用去传输main_engine和cta_engine了;还有就是需要有个for_init_system变量,在全部初始化后该变量改为False,这样就不会出现使用中停止不了策略的情况。
最后补充下,在实际使用中,因为我使用时子线程界面化启动vnTrader,就是vnTrader也是在子线程状态运行,如果服务器只是双核CPU的情况,在初始化时候会出现PyQt5的界面更新被初始化阻塞情况,只有等全策略自动初始化完成后,才会刷新。个人感觉应该是双核,一个被父线程,一个被初始化子线程都占用了,造成vnTrader界面卡死情况,但是手动全部启动时候又不会,有点奇怪。具体还是再研究把。