在rpc测试程序中无法订阅topic关键字,只能传递参数“”,订阅全部。
from time import sleep
from vnpy.rpc import RpcClient
class TestClient(RpcClient):
"""
Test RpcClient
"""
def __init__(self):
"""
Constructor
"""
super(TestClient, self).__init__()
def callback(self, topic, data):
"""
Realize callable function
"""
print(f"client received topic:{topic}, data:{data}")
if name == "main":
req_address = "tcp://192.168.1.109:2014"
sub_address = "tcp://192.168.1.109:4102"
tc = TestClient()
tc.subscribe_topic("t1")
tc.start(req_address, sub_address)
while 1:
print(tc.add(1, 3))
sleep(2)
如果“t1”改为空“”。可以显示全部,如果订阅“t1”就无任何打印。