vn.py官网
开源量化社区
Member
avatar
加入于:
帖子: 190
声望: 48

以Bybit接口为例,思路是先保存所有contract的vt_symbol再推送到event_engine。data_recorder/engine订阅后处理后再写入data_recorder_setting.json里面
1.event.py里面加上EVENT_ALL_CONTRACTS

EVENT_CONTRACT = "eContract."            # 合约基础信息回报事件
EVENT_ALL_CONTRACTS = 'eAllContracts.'   # 全部合约回报事件
EVENT_LOG = "eLog"                       # 日志事件,全局通用

2.gateway.py里面加上订阅事件

from .event import (
    EVENT_TICK,
    EVENT_ORDER,
    EVENT_TRADE,
    EVENT_POSITION,
    EVENT_ACCOUNT,
    EVENT_CONTRACT,
    EVENT_ALL_CONTRACTS,
    EVENT_LOG,
)
class BaseGateway(ABC):
    def on_contract(self, contract: ContractData):
        """
        Contract event push.
        """
        self.on_event(EVENT_CONTRACT, contract)

    def on_all_contracts(self,activate_contracts:list):
        """所有合约vt_symbol推送"""
        self.on_event(EVENT_ALL_CONTRACTS, activate_contracts)

    def write_log(self, msg: str):
        """
        Write a log event from gateway.
        """
        log = LogData(msg=msg, gateway_name=self.gateway_name)
        self.on_log(log)

3.Bybit推送全市场合约到gateway

class BybitRestApi(RestClient):
    def __init__(self, gateway: BybitGateway):
        self.all_contracts = []         #所有vt_symbol合约列表
    #-------------------------------------------------------------------------------------------------  
    # 在查询合约参数的函数里面推送全市场合约,其他接口也差不多
    def on_query_contract(self, data: dict, request: Request):
            self.gateway.on_contract(contract)
            if contract.vt_symbol not in self.all_contracts:
                self.all_contracts.append(contract.vt_symbol)    
        self.gateway.on_all_contracts(self.all_contracts)
        self.gateway.write_log("合约信息查询成功")

4.data_recorder/engine.py订阅事件并处理

from vnpy.trader.event import (EVENT_TICK,EVENT_TIMER, EVENT_CONTRACT,EVENT_ALL_CONTRACTS)
    #----------------------------------------------------------------------
    def register_event(self):
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_TIMER, self.process_timer_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
        self.event_engine.register(EVENT_ALL_CONTRACTS, self.process_contracts_event)
        self.event_engine.register( EVENT_SPREAD_DATA, self.process_spread_event)
    #----------------------------------------------------------------------
    def process_contracts_event(self, event: Event):
        """
        保存合约数据到data_recorder_setting.json
        """
        contract_data:List = event.data      #所有合约vt_symbol列表
        currency_recording_postfix = ["n","q","td"]# 数字货币记录合约后缀,"t"当周合约,"n"下周合约,"q"季度合约,"b"次季度合约,"td"永续
        ctp_recording_prefix = ["a","l","v","i","j","rb","hc","ni","ru","eg","bu","fu","sc","jm","SM","ZC","AP","CJ","TA","MA","IH","IF","IC"]   #CTP记录数据合约前缀
        for vt_symbol in contract_data:
            symbol,exchange,gateway_name = extract_vt_symbol(vt_symbol)
            if "1TOKEN" in gateway_name or "BYBIT" in gateway_name:    
                #过滤数字货币非记录合约
                if "." in symbol:
                    if symbol.split(".")[-1] not in currency_recording_postfix:
                        continue
                #bar字典添加数字货币合约数据
                self.dr_data["bar"].update({vt_symbol:{"exchange":exchange.value,"gateway_name":gateway_name,"symbol":symbol}})
                #增量更新dr_data recording_list数据
                if vt_symbol not in self.dr_data["recording_list"]:
                    self.dr_data["recording_list"].append(vt_symbol)

            elif "CTP" in gateway_name:
                #dr_data添加CTP合约,过滤非记录合约
                symbol_prefix = remain_alpha(vt_symbol)
                if symbol_prefix not in ctp_recording_prefix:
                    continue
                #bar字典添加CTP合约数据
                self.dr_data["bar"].update({vt_symbol:{"exchange":exchange.value,"gateway_name":gateway_name,"symbol":symbol}})
                #增量更新dr_data recording_list数据
                if vt_symbol not in self.dr_data["recording_list"]:
                    self.dr_data["recording_list"].append(vt_symbol)     
            else:
                #bar字典添加其他交易所合约数据
                self.dr_data["bar"].update({vt_symbol:{"exchange":exchange.value,"gateway_name":gateway_name,"symbol":symbol}})
                #增量更新dr_data recording_list数据
                if vt_symbol not in self.dr_data["recording_list"]:
                    self.dr_data["recording_list"].append(vt_symbol)  

        self.dr_data["recording_list"].sort()       
        save_json(self.setting_filename,self.dr_data)    #保存记录行情合约
Member
avatar
加入于:
帖子: 190
声望: 48

我改了vtsymbol是用"_"连接的,process_contracts_event里面是根据我的需求改的大家可以参考下按自己的需求改。recording_list保存的vt_symbol列表可以用在ctaengine里面订阅全市场合约,gateway里面下载历史数据,处理主力合约的识别都是很有用的。就这些了,🎇🐤

Member
avatar
加入于:
帖子: 15
声望: 1

strong text 哈哈哈哈_

emphasized text

_

Member
avatar
加入于:
帖子: 15
声望: 1

注释这一句# self.load_setting()
然后在 注释 那些过滤,,,就那样 了,,,

Member
avatar
加入于:
帖子: 190
声望: 48

@啃哒你伐木工 复制代码手打空格,论坛的空格有问题

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3

沪公网安备 31011502017034号