VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 7
声望: 0

在rpc测试程序中无法订阅topic关键字,只能传递参数“”,订阅全部。
from time import sleep

from vnpy.rpc import RpcClient

class TestClient(RpcClient):
"""
Test RpcClient
"""

def __init__(self):
    """
    Constructor
    """
    super(TestClient, self).__init__()

def callback(self, topic, data):
    """
    Realize callable function
    """
    print(f"client received topic:{topic}, data:{data}")


if name == "main":
req_address = "tcp://192.168.1.109:2014"
sub_address = "tcp://192.168.1.109:4102"

tc = TestClient()
tc.subscribe_topic("t1")
tc.start(req_address, sub_address)

while 1:
    print(tc.add(1, 3))
    sleep(2)

如果“t1”改为空“”。可以显示全部,如果订阅“t1”就无任何打印。

description

description

Member
avatar
加入于:
帖子: 7
声望: 0

请问,是我提问的问题有什么不对?还是其他原因,怎么没有理睬呢

Member
avatar
加入于:
帖子: 1834
声望: 140

请贴一下服务端推送数据时的代码?尤其是调用推送函数的部分

Member
avatar
加入于:
帖子: 7
声望: 0

from time import sleep, time

from vnpy.rpc import RpcServer

class TestServer(RpcServer):
"""
Test RpcServer
"""

def __init__(self):
    """
    Constructor
    """
    super(TestServer, self).__init__()

    self.register(self.add)

def add(self, a, b):
    """
    Test function
    """
    print(f"receiving:{a} {b}")
    return a + b


if name == "main":
rep_address = "tcp://192.168.1.109:2014"
pub_address = "tcp://192.168.1.109:4102"

ts = TestServer()
ts.start(rep_address, pub_address)

while 1:
    content = f"current server time is {time()}"
    print(content)
    ts.publish("t1", content)
    sleep(0.5)

我就是用的程序自身的测试代码,感谢

Member
avatar
加入于:
帖子: 1834
声望: 140

试试把"t1"这个str类型,改为b"t1"这个bytes类型,客户端和服务端都改下

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】