VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 4
声望: 0
    解决问题:我们主要是做数据监控提醒,然后配合手工半自动操作,必须保证能及时提醒,之前把vnpy直接部署在香港云服务windows下,订阅tick多一些时,会踫到各种问题,CPU,内存一直占用很高,跑久会突然卡死界面,不能长时间稳定运行。分析后,发现很多资源消耗在界面显示,底层数据更新和策略运算只占用一部分。因此想把数据更新,策略运算作以后台无界面方式跑在CentOS上,本地作为控制端,只显示数据和配置策略,控制运行。

    如果有人希望用这个模式,希望这篇文章能帮到你。
    得益于vnpy优秀的架构,将它的RPC部分做一些简单改造,就能变成CS模式,工程量也不大。当前的RPC运行时,服务端只传数据,策略运行还是在本地,我们希望策略也运行在服务端,因此要把App里Engine的一些函数,在RPC模式下全部改成远程调用。
    1.扩展RpcServer,添加一个注册函数,注册Engine里的重要函数,例如,添加,启动策略之类,把{函数名}_{AppName}作为远程调用函数名
    2.扩展RpcClient,注册与服务端同样的函数,同时取消Engine的start与close相关的函数调用,直接从远程获取当前的状态信息。在实际运行过程中,长链接的RpcClient隔一段时间会无法连接远程,因此最好改成30s左右的短连接。
    3.扩展RpcEngine,对消息发布进行修改,CS模式不需要传输过多的消息,只传输界面需要显示的消息。类似像tick这种频繁大量,可以屏蔽掉或者加大的延时显示。同时远程传输时需要将对象pickle序列化,如果Event里的对象不能被pickle,则要在对象里加to_rpc_data函数,来转换对象。

主要代码如下:
1.RpcServerEx

class RpcServerEx(RpcServer):
    def register_funcs(self, funcs:List[Callable], engine_name="")->bool:
        for func in funcs:
            name = f"{engine_name}_{func.__name__}"
            if name in self.__functions:
                return False
            self.__functions[name] = func

        return True

2.RpcClientEx

class RpcClientEx(RpcClient):
    def register_funcs(self, obj:object, funcs:List[Callable], engine_name:str=""):
        for func in funcs:
            name = f"{engine_name}_{func.__name__}"
            obj.__setattr__(func.__name__, self.__getattr__(name))

3.RpcEngineEx,关键处理函数在process_event,根据需要添加相关消息的传输间隔

@dataclass
class LimitInfo:
    time:float
    key:str

class RpcEngineEx(RpcEngine):
    limit_event:Dict[str, LimitInfo] = {}

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        super().__init__(main_engine, event_engine)
        self._event_queue = Queue()
        self._pub_thread = Thread(target=self._pub_run)
        self.symbol_time:Dict[str, float] = {}

    def start(self, rep_address: str, pub_address: str):
        super().start(rep_address, pub_address)
        self._pub_thread.start()

    def stop(self):
        super().stop()
        self._pub_thread.join()

    def _pub_run(self):
        while self.server.is_active():
            try:
                event = self._event_queue.get(True, 1000)
                self.server.publish("", event)
            except Empty:
                pass
            except:
                print(sys.exc_info(), event.type, event.data)

    def process_event(self, event: Event):
        if not self.server.is_active():
            return
        limit_info = self.limit_event.get(event.type, None)
        if limit_info is not None:
            if limit_info.time==-1:
                return

            if isinstance(event.data, dict):
                symbol = event.data.get(limit_info.key, "")
            else:
                symbol = getattr(event.data, limit_info.key, "")
            symbol = f"{symbol}{event.type}"

            t = self.symbol_time.get(symbol, None)
            if t is not None:
                if (time.time()-t)<limit_info.time:
                    return

            self.symbol_time[symbol] = time.time()

        rpc_data:Callable = getattr(event.data, "to_rpc_data", None)
        if rpc_data is None:
            self._event_queue.put(event)
        else:
            self._event_queue.put(Event(event.type, rpc_data()))

4.MainEngineEx

class MainEngineEx(MainEngine):
    def is_rpc_client(self)->bool:
        return "RPC" in self.gateways

    def is_rpc_server(self)->bool:
        return RpcServiceApp.app_name in self.apps

以RadarEngine为例,其它的Engine也是类似,添加一个register_rpc函数,在RPC连接前里调用:

    def register_rpc(self):
        funcs = [
            self.add_rule,
            self.edit_rule,
            self.remove_rule,
            self.get_rule,
            self.get_rules,
            self.add_signal,
            self.edit_signal,
            self.remove_signal,
            self.get_signal,
            self.get_signals,
        ]

        if self.main_engine.is_rpc_server():
            rpc_engine:RpcEngine = self.main_engine.get_engine(RpcServiceApp.app_name)
            rpc_engine.server.register_funcs(funcs, APP_NAME)
            Thread(target=self.delay_init).start()

        if self.main_engine.is_rpc_client():
            rpc_gateway:RpcGateway = self.main_engine.get_gateway("RPC")
            rpc_gateway.client.register_funcs(self, funcs, APP_NAME)

    def delay_init(self):
        time.sleep(5)
        self.init()

    def init(self):
        """"""
        if not self.inited:
            self.inited = True
            if self.main_engine.is_rpc_client():
                for rule in self.get_rules():
                    rule_data = {
                        "name": rule.name,
                        "formula": rule.formula,
                        "params": rule.params,
                        "ndigits": rule.ndigits,
                        "type": rule.type.value,
                    }
                    self.put_event(EVENT_RADAR_RULE, rule_data)

                    update_data = {
                        "name": rule.name,
                        "value": rule.value,
                        "time": rule.time,
                    }

                    self.put_event(EVENT_RADAR_UPDATE, update_data)

                for signal in self.get_signals():
                    self.put_event(EVENT_RADAR_SIGNAL, signal)
            else:
                self.load_setting()

            self.write_log("初始化成功")

大体上是这样子,后台无界面的模式在实测中,2GPU/4G内存,2M带宽的服务器,订阅多个tick,也可以稳定低占用运行很长一段时间。VNPY的架构很棒,扩展起来很方便,感谢开源分享。

Member
avatar
加入于:
帖子: 126
声望: 14

优秀。
另外,默认的no ui模式就可以呀

Member
avatar
加入于:
帖子: 8
声望: 0

非常感谢分享。受益匪浅。我们在商品期货里遇到同样的问题,在实践您这个方案时,遇到一个挑战,

“扩展RpcEngine,对消息发布进行修改,CS模式不需要传输过多的消息,只传输界面需要显示的消息。类似像tick这种频繁大量,可以屏蔽掉或者加大的延时显示。同时远程传输时需要将对象pickle序列化,如果Event里的对象不能被pickle,则要在对象里加 to_rpc_data函数,来转换对象。”

就是 提到的, “to_rpc_data” 实现的思路是什么样的呢,我尝试了多种,都没走通

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】