vn.py量化社区
By Traders, For Traders.
Member
avatar
加入于:
帖子: 100
声望: 17

以Bybit接口为例,思路是先保存所有contract的vt_symbol再推送到event_engine。data_recorder/engine订阅后处理后再写入data_recorder_setting.json里面
1.event.py里面加上EVENT_ALL_CONTRACTS

EVENT_CONTRACT = "eContract."            # 合约基础信息回报事件
EVENT_ALL_CONTRACTS = 'eAllContracts.'   # 全部合约回报事件
EVENT_LOG = "eLog"                       # 日志事件,全局通用

2.gateway.py里面加上订阅事件

from .event import (
    EVENT_TICK,
    EVENT_ORDER,
    EVENT_TRADE,
    EVENT_POSITION,
    EVENT_ACCOUNT,
    EVENT_CONTRACT,
    EVENT_ALL_CONTRACTS,
    EVENT_LOG,
)
class BaseGateway(ABC):
    def on_contract(self, contract: ContractData):
        """
        Contract event push.
        """
        self.on_event(EVENT_CONTRACT, contract)

    def on_all_contracts(self,activate_contracts:list):
        """所有合约vt_symbol推送"""
        self.on_event(EVENT_ALL_CONTRACTS, activate_contracts)

    def write_log(self, msg: str):
        """
        Write a log event from gateway.
        """
        log = LogData(msg=msg, gateway_name=self.gateway_name)
        self.on_log(log)

3.Bybit推送全市场合约到gateway

class BybitRestApi(RestClient):
    def __init__(self, gateway: BybitGateway):
        self.all_contracts = []         #所有vt_symbol合约列表
    #-------------------------------------------------------------------------------------------------  
    # 在查询合约参数的函数里面推送全市场合约,其他接口也差不多
    def on_query_contract(self, data: dict, request: Request):
            self.gateway.on_contract(contract)
            if contract.vt_symbol not in self.all_contracts:
                self.all_contracts.append(contract.vt_symbol)    
        self.gateway.on_all_contracts(self.all_contracts)
        self.gateway.write_log("合约信息查询成功")

4.data_recorder/engine.py订阅事件并处理
```
from vnpy.trader.event import (EVENT_TICK,EVENT_TIMER, EVENT_CONTRACT,EVENT_ALL_CONTRACTS)

#----------------------------------------------------------------------
def register_event(self):
    """"""
    self.event_engine.register(EVENT_TICK, self.process_tick_event)
    self.event_engine.register(EVENT_TIMER, self.process_timer_event)
    self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
    self.event_engine.register(EVENT_ALL_CONTRACTS, self.process_contracts_event)
    self.event_engine.register( EVENT_SPREAD_DATA, self.process_spread_event)
#----------------------------------------------------------------------
def process_contracts_event(self, event: Event):
    contract_data = event.data      #所有合约vt_symbol列表
    currency_contract_dict = {}   #币圈合约字典  
    ctp_contract_dict = {}          #ctp合约字典  
    ctp_recording = []  #ctp合约记录列表
    currency_recording = []   #数字货币合约记录列表
    trading_contract = ['a','l','v','i','j','rb','hc','ni','ru','eg','bu','fu','jm','SM','ZC','AP','CJ','TA','MA']   #交易合约
    for vt_symbol in contract_data:
        symbol,exchange,gateway_name = extract_vt_symbol(vt_symbol)
        if "1TOKEN" in gateway_name or "BYBIT" in gateway_name:    
            #dr_data添加币圈合约
            currency_contract_dict[vt_symbol] = {'exchange':exchange.value,'gateway_name':gateway_name,'symbol':symbol}
            self.dr_data['bar'].update(currency_contract_dict)
            if vt_symbol not in currency_recording:
                currency_recording.append(vt_symbol) 
            #增量更新dr_data recording_list数据
            for vt_symbol in currency_recording:
                if vt_symbol not in self.dr_data["recording_list"]:
                    self.dr_data["recording_list"].append(vt_symbol)
        elif "IB" in gateway_name:
            #dr_data添加IB合约
            ib_contract_dict[vt_symbol] = {'exchange':exchange.value,'gateway_name':gateway_name,'symbol':symbol}
            self.dr_data['bar'].update(ib_contract_dict)
            if vt_symbol not in ib_recording:
                ib_recording.append(vt_symbol) 
            #增量更新dr_data recording_list数据
            for vt_symbol in ib_recording:
                if vt_symbol not in self.dr_data["recording_list"]:
                    self.dr_data["recording_list"].append(vt_symbol)     

        elif "CTP" in gateway_name:
            #dr_data添加CTP合约,过滤非交易合约
            symbol_mark = "".join(list(filter(str.isalpha,symbol)))
            if symbol_mark in trading_contract:
                #增量更新dr_data bar数据
                ctp_contract_dict[vt_symbol] = {'exchange':exchange.value,'gateway_name':gateway_name,'symbol':symbol}              
                self.dr_data['bar'].update(ctp_contract_dict)
                if vt_symbol not in ctp_recording:
                    ctp_recording.append(vt_symbol)      
                #增量更新dr_data recording_list数据
                for vt_symbol in ctp_recording:
                    if vt_symbol not in self.dr_data["recording_list"]:
                        self.dr_data["recording_list"].append(vt_symbol)     

    save_json(self.setting_filename,self.dr_data)    #保存记录行情合约

``

Member
avatar
加入于:
帖子: 100
声望: 17

我改了vtsymbol是用"_"连接的,process_contracts_event里面是根据我的需求改的大家可以参考下按自己的需求改。recording_list保存的vt_symbol列表可以用在ctaengine里面订阅全市场合约,gateway里面下载历史数据,处理主力合约的识别都是很有用的。就这些了,🎇🐤

Member
avatar
加入于:
帖子: 13
声望: 1

strong text 哈哈哈哈_

emphasized text

_

Member
avatar
加入于:
帖子: 13
声望: 1

注释这一句# self.load_setting()
然后在 注释 那些过滤,,,就那样 了,,,

Member
avatar
加入于:
帖子: 100
声望: 17

@啃哒你伐木工 复制代码手打空格,论坛的空格有问题

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3