解决问题:我们主要是做数据监控提醒,然后配合手工半自动操作,必须保证能及时提醒,之前把vnpy直接部署在香港云服务windows下,订阅tick多一些时,会踫到各种问题,CPU,内存一直占用很高,跑久会突然卡死界面,不能长时间稳定运行。分析后,发现很多资源消耗在界面显示,底层数据更新和策略运算只占用一部分。因此想把数据更新,策略运算作以后台无界面方式跑在CentOS上,本地作为控制端,只显示数据和配置策略,控制运行。
如果有人希望用这个模式,希望这篇文章能帮到你。
得益于vnpy优秀的架构,将它的RPC部分做一些简单改造,就能变成CS模式,工程量也不大。当前的RPC运行时,服务端只传数据,策略运行还是在本地,我们希望策略也运行在服务端,因此要把App里Engine的一些函数,在RPC模式下全部改成远程调用。
1.扩展RpcServer,添加一个注册函数,注册Engine里的重要函数,例如,添加,启动策略之类,把{函数名}_{AppName}作为远程调用函数名
2.扩展RpcClient,注册与服务端同样的函数,同时取消Engine的start与close相关的函数调用,直接从远程获取当前的状态信息。在实际运行过程中,长链接的RpcClient隔一段时间会无法连接远程,因此最好改成30s左右的短连接。
3.扩展RpcEngine,对消息发布进行修改,CS模式不需要传输过多的消息,只传输界面需要显示的消息。类似像tick这种频繁大量,可以屏蔽掉或者加大的延时显示。同时远程传输时需要将对象pickle序列化,如果Event里的对象不能被pickle,则要在对象里加to_rpc_data函数,来转换对象。
主要代码如下:
1.RpcServerEx
class RpcServerEx(RpcServer):
def register_funcs(self, funcs:List[Callable], engine_name="")->bool:
for func in funcs:
name = f"{engine_name}_{func.__name__}"
if name in self.__functions:
return False
self.__functions[name] = func
return True
2.RpcClientEx
class RpcClientEx(RpcClient):
def register_funcs(self, obj:object, funcs:List[Callable], engine_name:str=""):
for func in funcs:
name = f"{engine_name}_{func.__name__}"
obj.__setattr__(func.__name__, self.__getattr__(name))
3.RpcEngineEx,关键处理函数在process_event,根据需要添加相关消息的传输间隔
@dataclass
class LimitInfo:
time:float
key:str
class RpcEngineEx(RpcEngine):
limit_event:Dict[str, LimitInfo] = {}
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
super().__init__(main_engine, event_engine)
self._event_queue = Queue()
self._pub_thread = Thread(target=self._pub_run)
self.symbol_time:Dict[str, float] = {}
def start(self, rep_address: str, pub_address: str):
super().start(rep_address, pub_address)
self._pub_thread.start()
def stop(self):
super().stop()
self._pub_thread.join()
def _pub_run(self):
while self.server.is_active():
try:
event = self._event_queue.get(True, 1000)
self.server.publish("", event)
except Empty:
pass
except:
print(sys.exc_info(), event.type, event.data)
def process_event(self, event: Event):
if not self.server.is_active():
return
limit_info = self.limit_event.get(event.type, None)
if limit_info is not None:
if limit_info.time==-1:
return
if isinstance(event.data, dict):
symbol = event.data.get(limit_info.key, "")
else:
symbol = getattr(event.data, limit_info.key, "")
symbol = f"{symbol}{event.type}"
t = self.symbol_time.get(symbol, None)
if t is not None:
if (time.time()-t)<limit_info.time:
return
self.symbol_time[symbol] = time.time()
rpc_data:Callable = getattr(event.data, "to_rpc_data", None)
if rpc_data is None:
self._event_queue.put(event)
else:
self._event_queue.put(Event(event.type, rpc_data()))
4.MainEngineEx
class MainEngineEx(MainEngine):
def is_rpc_client(self)->bool:
return "RPC" in self.gateways
def is_rpc_server(self)->bool:
return RpcServiceApp.app_name in self.apps
以RadarEngine为例,其它的Engine也是类似,添加一个register_rpc函数,在RPC连接前里调用:
def register_rpc(self):
funcs = [
self.add_rule,
self.edit_rule,
self.remove_rule,
self.get_rule,
self.get_rules,
self.add_signal,
self.edit_signal,
self.remove_signal,
self.get_signal,
self.get_signals,
]
if self.main_engine.is_rpc_server():
rpc_engine:RpcEngine = self.main_engine.get_engine(RpcServiceApp.app_name)
rpc_engine.server.register_funcs(funcs, APP_NAME)
Thread(target=self.delay_init).start()
if self.main_engine.is_rpc_client():
rpc_gateway:RpcGateway = self.main_engine.get_gateway("RPC")
rpc_gateway.client.register_funcs(self, funcs, APP_NAME)
def delay_init(self):
time.sleep(5)
self.init()
def init(self):
""""""
if not self.inited:
self.inited = True
if self.main_engine.is_rpc_client():
for rule in self.get_rules():
rule_data = {
"name": rule.name,
"formula": rule.formula,
"params": rule.params,
"ndigits": rule.ndigits,
"type": rule.type.value,
}
self.put_event(EVENT_RADAR_RULE, rule_data)
update_data = {
"name": rule.name,
"value": rule.value,
"time": rule.time,
}
self.put_event(EVENT_RADAR_UPDATE, update_data)
for signal in self.get_signals():
self.put_event(EVENT_RADAR_SIGNAL, signal)
else:
self.load_setting()
self.write_log("初始化成功")
大体上是这样子,后台无界面的模式在实测中,2GPU/4G内存,2M带宽的服务器,订阅多个tick,也可以稳定低占用运行很长一段时间。VNPY的架构很棒,扩展起来很方便,感谢开源分享。