vn.py量化社区
By Traders, For Traders.
Member
avatar
加入于:
帖子: 148
声望: 33
  1. 首先需要保证vt_symbol的唯一,我使用的vt_symbol格式是self.vtsymbol = f"{self.symbol}{self.exchange.value}/{self.gateway_name}"。vnpy\trader\object.py里面的所有类都要继承BaseData,然后就是添加gateway_name参数到其他使用到object.py里面的类。举个例子

    @dataclass
    class HistoryRequest(BaseData):
     """
     Request sending to specific gateway for querying history data.
     """
    
     symbol: str
     exchange: Exchange
     start: datetime
     end: datetime = None
     interval: Interval = None
     #--------------------------------------------------------------------------------------------------
     def __post_init__(self):
         """"""
         self.vt_symbol = f"{self.symbol}_{self.exchange.value}/{self.gateway_name}"

    然后其他地方引用到HistoryRequest的地方添加gateway_name参数

     #-------------------------------------------------------------------------------------------------   
     def query_history(self,event):
         """查询合约历史数据"""
         if len(self.history_contract) > 0:
             symobl,exchange,gateway_name = extract_vt_symbol(self.history_contract[0])
             req = HistoryRequest(
                 symbol = symobl,
                 exchange = Exchange(exchange),
                 interval = Interval.MINUTE,
                 start = datetime.now() - timedelta(days = 1),
                 gateway_name = self.gateway_name
             )
             self.rest_api.query_history(req)
             self.history_contract.pop(0)

    需要分解vt_symbol建议使用extract_vt_symbol函数

    #------------------------------------------------------------------------------------
    def extract_vt_symbol(vt_symbol: str):
     """
     返回(symbol:str, exchange: Exchange,gateway_name:str)
     """
     *symbol_1, exchange_gateway_name = vt_symbol.split('_')
     exchange,gateway_name = exchange_gateway_name.split('/')
     if len(symbol_1) == 1:
         symbol = symbol_1[0]
     elif len(symbol_1) == 2:
         symbol = "_".join([symbol_1[0],symbol_1[1]])
     elif len(symbol_1) == 3:
         symbol = "_".join([symbol_1[0],symbol_1[1],symbol_1[2]])
     elif len(symbol_1) == 4:
         symbol = "_".join([symbol_1[0],symbol_1[1],symbol_1[2],symbol_1[3]])
     return symbol, Exchange(exchange),gateway_name
Member
avatar
加入于:
帖子: 148
声望: 33
  1. 复制gateway文件,并重命名文件以及修改init.py和gateway.py
    以1token为例:
    复制一份gateway\onetoken文件夹,重命名为onetoken_1,init.py里面修改成from .onetoken_1_gateway import Onetoken_1_Gateway,onetoken_gateway.py改为onetoken_1_gateway.py。修改onetoken_1_gateway里面类名为Onetoken_1_Gateway,gateway_name为"1TOKEN1",connect里面的账户信息自行更改
  2. onetoken_gateway.py发布行情(redis安装配置自行百度)

    import pickle
    import redis
    import zlib
    REDIS_POOL =redis.ConnectionPool(host = "localhost",port = 12580)
    REDIS_CLIENT = redis.StrictRedis(connection_pool = REDIS_POOL)
    #-------------------------------------------------------------------------------------------------   
    class OnetokenDataWebsocketApi(WebsocketClient):
     """"""
     #-------------------------------------------------------------------------------------------------   
     def on_tick_v3(self,packet:dict):
         """
         收到v3行情推送
         """
         contract_symbol = packet["c"]       #合约代码
         new_tick = self.ticks.get(contract_symbol,None)
         if not new_tick:
             return
         new_tick.last_price =  packet["l"]     #tick最新价
         new_tick.datetime = utc_to_local(packet["tm"])         #tick时间
         new_tick.volume = packet["v"]       #v,tick成交量,vu换算成USD的成交量,vc换算成人民币的成交量
         if packet.get("tp",None) == "s":         #收到快照行情
             self.order_book_bids[new_tick.vt_symbol] = {}
             self.order_book_asks[new_tick.vt_symbol] = {}
         bids,asks = packet["b"],packet["a"]          #tick买卖单信息 
         for bid_data in bids:
             self.order_book_bids[new_tick.vt_symbol].update({bid_data[0]:bid_data[1]})
             #order_book_bids删除委托量为0的价格缓存
             if not bid_data[1]:
                 del self.order_book_bids[new_tick.vt_symbol][bid_data[0]]
         for ask_data in asks:
             self.order_book_asks[new_tick.vt_symbol].update({ask_data[0]:ask_data[1]})   
             if not ask_data[1]:
                 del self.order_book_asks[new_tick.vt_symbol][ask_data[0]]
    
         sort_bids = sorted(self.order_book_bids[new_tick.vt_symbol].items(), key=lambda x:x[0],reverse=True)[:5]    #买单从高到低排序
         sort_asks = sorted(self.order_book_asks[new_tick.vt_symbol].items(), key=lambda x:x[0],reverse=False)[:5]   #卖单从低到高排序
         for n,buf in enumerate(sort_bids):
             new_tick.__setattr__(f"bid_price_{(n + 1)}", buf[0])
             new_tick.__setattr__(f"bid_volume_{(n + 1)}", buf[1])
         for n,buf in enumerate(sort_asks):
             new_tick.__setattr__(f"ask_price_{(n + 1)}" , buf[0])
             new_tick.__setattr__(f"ask_volume_{(n + 1)}", buf[1])
         new_tick = copy(new_tick)
         self.gateway.on_tick(new_tick)     
         if luncer_account["行情分发"]:
             channel = f"{new_tick.symbol}_{new_tick.exchange.value}"
             #redis发布tick数据
             REDIS_CLIENT.publish(channel, zlib.compress(pickle.dumps(new_tick), 5))
Member
avatar
加入于:
帖子: 148
声望: 33

1.onetoken_1_gateway.py里面订阅行情

from threading import Lock,Thread
import pickle
import redis
import zlib
REDIS_POOL =redis.ConnectionPool(host = "localhost",port = 12580)
REDIS_CLIENT = redis.StrictRedis(connection_pool = REDIS_POOL)
#-------------------------------------------------------------------------------------------------   
class OnetokenDataWebsocketApi(WebsocketClient):
    #-------------------------------------------------------------------------------------------------   
    def subscribe(self, req: SubscribeRequest):
        """
        订阅tick行情
        """
        tick = TickData(
            symbol=req.symbol,
            exchange=req.exchange,
            name=req.symbol,
            datetime=datetime.now(),
            gateway_name=self.gateway_name,
        )
        contract_symbol = f"{req.exchange.value.lower()}/{req.symbol.lower()}"
        self.ticks[contract_symbol] = tick  
        if luncer_account["行情分发"]:
            threads = Thread(target=self.receive_redis_data)
            threads.start() 
        else:
            req = { "uri": "subscribe-single-tick-verbose", 
            "contract": contract_symbol 
            }
            self.send_packet(req)
    def receive_redis_data(self):
        """
        1.redis订阅channel列表消息
        2.redis订阅消息会堵塞线程数据传输需要使用Thread
        """
        channels = [vt_symbol.split("/")[0] for vt_symbol in recording_list if extract_vt_symbol(vt_symbol)[2] == self.gateway_name]
        pubsub = REDIS_CLIENT.pubsub()
        pubsub.psubscribe(channels)
        for row_data in pubsub.listen():
            if row_data["type"] == "pmessage":
                new_tick = pickle.loads(zlib.decompress(row_data["data"]))
                symbol,exchange,gateway_name = extract_vt_symbol(new_tick.vt_symbol)
                new_tick.gateway_name = self.gateway_name
                new_tick.vt_symbol = f"{symbol}_{exchange.value}/{self.gateway_name}"
                self.gateway.on_tick(copy(new_tick))
Member
avatar
加入于:
帖子: 148
声望: 33

只要redis发布了行情,订阅端无论先后启动都能收到行情,而且订阅端的行情tick.datetime和发布端完全一致。友情提醒,使用订阅端gateway接口交易,策略里面的标的vt_symbol使用的gateway_name要改成订阅端的gateway_name。

Member
avatar
加入于:
帖子: 1
声望: 0

学习学习。
有个问题:

复制gateway文件,并重命名文件以及修改init.py和gateway.py
以1token为例:
复制一份gateway\onetoken文件夹,重命名为onetoken_1,init.py里面修改成from .onetoken_1_gateway import Onetoken_1_Gateway,onetoken_gateway.py改为onetoken_1_gateway.py。修改onetoken_1_gateway里面类名为Onetoken_1_Gateway,gateway_name为"1TOKEN1",connect里面的账户信息自行更改

举个例子,如果有10个账户,这个文件夹需要复制10次吗?还是只要1次就行。

Member
avatar
加入于:
帖子: 148
声望: 33

@zonquan
还是复制修改方便,真的

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3