vn.py量化社区
By Traders, For Traders.
Member
avatar
加入于:
帖子: 10
声望: 3

基于月总在群里提供的指数合成代码,做了一些新的设计和修改,感谢月总的无私付出
_方案思路:
1.增加中间组件IndexGenerator,代码放在gateway.py文件中(放在其他位置也可以),不与任何一个Gateway绑定,专门合成指数数据
2.IndexGenerator实例挂在main_engine上, 起main_engine时直接起一个,index_generator订阅tick事件
3.在main_engine的subscribe中识别订阅的类型,如果是指数合约,则向index_generato订阅,同时分解成品种下所有的分散合约向gateway订阅;还要把合约名列表传给index_gennerator
4.index_generator内部根据有订阅的品种合成指数数据,通过tickevent传递出去

=======================================================================================================
代码部分:

1、IndexGenerator:放在vnpy->trader->gateway.py文件中


class IndexGenerator(ABC):

    def __init__(self, main_engine, event_engine: EventEngine):
        self.main_engine = main_engine
        self.event_engine = event_engine

        self.subscribe_index_symbol: Set[str] = set()  # 保存已订阅的指数编号
        self.subscribe_index_contract: Dict[str, ContractData] = {}  # 指数合约
        self.subscribe_sec_id: Set[str] = set()  # 保存已经订阅的sec编号
        self.symbol_tick_dict: Dict[str, dict] = {}  # 保存每个指数的每个合约的最新tick
        self.symbol_last_tick: Dict[str, TickData] = {}  # 保存每个指数的下的最后一个tick

        self.register_event()

    def register_event(self):
        self.event_engine.register(EVENT_TICK, self.process_tick_event)

    def subscribe(self, req: SubscribeRequest):
        index_symbol_id = vt_symbol_to_index_symbol(req.vt_symbol)
        self.subscribe_index_symbol.add(index_symbol_id)
        sec_id = extract_sec_id(req.vt_symbol)
        self.subscribe_sec_id.add(sec_id)
        self.subscribe_index_contract[sec_id] = self.main_engine.get_index_contract(req.vt_symbol)

    def process_tick_event(self, event: Event):
        tick_data = event.data
        vt_symbol = tick_data.vt_symbol
        # 过滤掉指数数据
        if vt_symbol == vt_symbol_to_index_symbol(vt_symbol):
            return
        sec_id = extract_sec_id(vt_symbol)
        if sec_id not in self.subscribe_sec_id:
            return
        if tick_data.bid_price_1 > 9999999 or tick_data.ask_price_1 > 9999999:
            return
        # 下面合成最新的指数tick:每秒合成1个
        symbol_tick_dict = self.symbol_tick_dict.setdefault(sec_id, {})
        symbol_last_tick = self.symbol_last_tick.get(sec_id)
        if symbol_last_tick and tick_data.datetime.second != symbol_last_tick.datetime.second and symbol_tick_dict:
            index_tick = TickData(
                symbol=f"{sec_id}99",
                exchange=tick_data.exchange,
                datetime=tick_data.datetime,
                gateway_name=tick_data.gateway_name,
                name=self.subscribe_index_contract[sec_id].name
            )
            for tick in symbol_tick_dict.values():
                index_tick.open_interest += tick.open_interest
            if index_tick.open_interest:
                for tick in symbol_tick_dict.values():
                    tick_weight = float(tick.open_interest) / index_tick.open_interest
                    index_tick.last_price += tick.last_price * tick_weight
                    index_tick.volume += tick.volume
                    index_tick.last_volume += tick.last_volume
                    index_tick.limit_up += tick.limit_up * tick_weight
                    index_tick.limit_down += tick.limit_down * tick_weight

                    index_tick.open_price += tick.open_price * tick_weight
                    index_tick.high_price += tick.high_price * tick_weight
                    index_tick.low_price += tick.low_price * tick_weight
                    index_tick.pre_close += tick.pre_close * tick_weight

                    index_tick.bid_price_1 += tick.bid_price_1 * tick_weight
                    index_tick.ask_price_1 += tick.ask_price_1 * tick_weight
                    index_tick.bid_volume_1 += tick.bid_volume_1
                    index_tick.ask_volume_1 += tick.ask_volume_1

                    # 5档有需要再加进来吧,省点计算资源
                    # tick_data.ask_price_2 += tick.ask_price_2 * tick_weight
                    # tick_data.ask_price_3 += tick.ask_price_3 * tick_weight
                    # tick_data.ask_price_4 += tick.ask_price_4 * tick_weight
                    # tick_data.ask_price_5 += tick.ask_price_5 * tick_weight
                    # tick_data.bid_price_2 += tick.bid_price_2 * tick_weight
                    # tick_data.bid_price_3 += tick.bid_price_3 * tick_weight
                    # tick_data.bid_price_4 += tick.bid_price_4 * tick_weight
                    # tick_data.bid_price_5 += tick.bid_price_5 * tick_weight
                    # tick_data.bid_volume_2 += tick.bid_volume_2 * tick_weight
                    # tick_data.bid_volume_3 += tick.bid_volume_3 * tick_weight
                    # tick_data.bid_volume_4 += tick.bid_volume_4 * tick_weight
                    # tick_data.bid_volume_5 += tick.bid_volume_5 * tick_weight
                    # tick_data.ask_volume_2 += tick.ask_volume_2 * tick_weight
                    # tick_data.ask_volume_3 += tick.ask_volume_3 * tick_weight
                    # tick_data.ask_volume_4 += tick.ask_volume_4 * tick_weight
                    # tick_data.ask_volume_5 += tick.ask_volume_5 * tick_weight
                # 价格取整到最小价位变动
                price_tick = self.subscribe_index_contract[sec_id].pricetick
                index_tick.last_price = round_to(index_tick.last_price, price_tick)

                index_tick.bid_price_1 = round_to(index_tick.bid_price_1, price_tick)
                index_tick.ask_price_1 = round_to(index_tick.ask_price_1, price_tick)
                index_tick.limit_up = round_to(index_tick.limit_up, price_tick)
                index_tick.limit_down = round_to(index_tick.limit_down, price_tick)
                index_tick.open_price = round_to(index_tick.open_price, price_tick)
                index_tick.high_price = round_to(index_tick.high_price, price_tick)
                index_tick.low_price = round_to(index_tick.low_price, price_tick)
                index_tick.pre_close = round_to(index_tick.pre_close, price_tick)

                event = Event(EVENT_TICK, index_tick)
                self.event_engine.put(event)

        symbol_tick_dict[vt_symbol] = tick_data
        self.symbol_last_tick[sec_id] = tick_data

===========================================================================================
2、vnpy->trader->engine文件修改:
1)MainEngine的init函数中,挂载IndexGenerator实例:

        # 挂载指数数据生成器
        self.index_generator: IndexGenerator = IndexGenerator(self, self.event_engine)

2)MainEngine的subscrible函数中,注册指数合约

 def subscribe(self, req: SubscribeRequest, gateway_name: str) -> None:
        """
        Subscribe tick data update of a specific gateway.
        """
        gateway = self.get_gateway(gateway_name)
        # 同类账户全部订阅
        if req.vt_symbol == vt_symbol_to_index_symbol(req.vt_symbol):
            # 指数订阅
            contract_list = self.get_all_index_trade_contract(req.vt_symbol)
            print(contract_list)
            for contract in contract_list:
                symbol, exchange = extract_vt_symbol(contract.vt_symbol)
                contract_req = SubscribeRequest(symbol, exchange)
                    gateway.subscribe(contract_req)
            self.index_generator.subscribe(req)
        else:
            gateway.subscribe(req)

3)OMSEngine的add_function()增加两个main_engine接口

        self.main_engine.get_all_index_trade_contract = self.get_all_index_trade_contract
        self.main_engine.get_index_contract = self.get_index_contract

4)OMSEngine的合约影响函数,自动添加指数合约的contract

def process_contract_event(self, event: Event) -> None:
        """"""
        contract = event.data
        if contract.vt_symbol not in self.contracts.keys():
            self.contracts[contract.vt_symbol] = contract
            # 插入指数合约contract
            sec_id = extract_sec_id(contract.vt_symbol)
            index_id = f"{sec_id}99"
            index_symbol_id = f"{index_id}.{contract.exchange.value}"
            if index_symbol_id not in self.contracts.keys():
                index_contract = ContractData(
                    symbol=index_id,
                    exchange=contract.exchange,
                    name=f"{index_id}指数合约",
                    product=contract.product,
                    size=contract.size,
                    pricetick=contract.pricetick,
                    margin_ratio=contract.margin_ratio,
                    open_date="19990101",
                    expire_date="20990101",
                    gateway_name=contract.gateway_name
                )
                index_contract.is_index_contract = True
                self.contracts[index_symbol_id] = index_contract

5)OMSEngine增加两个查询函数

def get_all_index_trade_contract(self, vt_symbol):
        # 查询该合约对应品种的所有在市合约
        contract_list = []
        target_sec_id = extract_sec_id(vt_symbol)
        contracts = self.contracts
        for vt_symbol, contract_data in contracts.items():
            sec_id = extract_sec_id(vt_symbol)
            if target_sec_id == sec_id and not contract_data.is_index_contract:
                contract_list.append(contract_data)
        return contract_list

    def get_index_contract(self, vt_symbol):
        contract_id = vt_symbol_to_index_symbol(vt_symbol)
        return self.get_contract(contract_id)

=========================================================
3、Utility增加两个功能函数

def extract_sec_id(vt_symbol: str) -> str:
    """
    return sec_id
    """
    return vt_symbol[:2] if vt_symbol[1].isalpha() else vt_symbol[0]


def vt_symbol_to_index_symbol(vt_symbol):
    symbol_id, exchange_value = vt_symbol.split(".")
    sec_id = extract_sec_id(vt_symbol)
    index_id = f"{sec_id}99"
    return f"{index_id}.{exchange_value}"

========================================================
4、Object的ContractData增加一个字段表示该合约是否有指数合约

    is_index_contract: bool = False     # 是否是指数合约,默认为否

5、各代码中的一些import请自行添加

Member
avatar
加入于:
帖子: 10
声望: 3

补充说明:
1、策略订阅的时候,直接使用品种的指数编号即可,如rb99.SHFE
2、以指数为信号,具体合约做交易的时候,具体合约要自己在策略里指定,这里没有做将指数合约映射到主力合约下单的功能

Member
avatar
加入于:
帖子: 97
声望: 7

优秀

Member
avatar
加入于:
帖子: 189
声望: 40

很方便,也可以在IndexGenerator直接取按持仓量排序的指数成分合约列表

Member
avatar
加入于:
帖子: 189
声望: 40

附上我加的指数成分合约按持仓量排序 + 成分合约权重获取的写法

# 我的vt_symbol 格式是 symbol + "_" + exchange.value + "/" + gateway_name,大家可以自行改下下面两个函数
def get_symbol_mark(vt_symbol:str):
    """
    获取合约标识
    """
    symbol,exchange,gateway_name = extract_vt_symbol(vt_symbol)
    if remain_alpha(gateway_name) in ["CTP"]:
        #CTP合约标识
        symbol_mark = remain_alpha(vt_symbol)
    else:    
        #数字货币合约标识
        if remain_alpha(gateway_name) in ["BYBIT","BYBITUSDT","HUOBIS","OKEXS"]:
            symbol_mark = remain_alpha(symbol.split("USD")[0]).lower()
        elif remain_alpha(gateway_name) in ["TOKEN"]:
            symbol_mark =symbol.split(".")[0]
        else:
            symbol_mark = remain_alpha(vt_symbol)
    return symbol_mark
#------------------------------------------------------------------------------------
def remain_alpha(convert_contract:str) -> str:
    """
    返回合约symbol或字符串的字母部分
    """
    if "_" in convert_contract:
        convert_contract = extract_vt_symbol(convert_contract)[0]
    symbol_mark = "".join(list(filter(str.isalpha,convert_contract)))
    return symbol_mark
#------------------------------------------------------------------------------------
def remain_digit(convert_contract:str) -> str:
    """
    返回合约symbol或字符串的数字部分
    """
    if "_" in convert_contract:
        convert_contract = extract_vt_symbol(convert_contract)[0]
    symbol_mark = "".join(list(filter(str.isdigit,convert_contract)))
    return symbol_mark

class IndexGenerator(ABC):
    """
    指数生成器
    """
    #--------------------------------------------------------------------------------------------------
    def __init__(self, main_engine, event_engine: EventEngine):
        self.component_contracts:Dict[str,dict] = defaultdict(dict)        #排序后的成分合约:持仓量权重映射字典
    def subscribe(self, req: SubscribeRequest):
        vt_symbol = req.vt_symbol
        symbol_mark = get_symbol_mark(vt_symbol)
        self.component_contracts.setdefault(symbol_mark, defaultdict(list))
    #--------------------------------------------------------------------------------------------------
    def get_component_contracts(self,symbol_mark:str) -> List[Tuple[str,float]]:
        """
        获取成分合约持仓量权重排序后的列表
        """
        component_contracts = self.component_contracts[symbol_mark]
        sorted_contracts = sorted(component_contracts.items(), key=lambda x:x[1],reverse=True)
        return sorted_contracts        
    #--------------------------------------------------------------------------------------------------
    def process_tick_event(self, event: Event):
        """
        合成指数合约
        """
        #成分合约字典
        vt_symbol = tick_data.vt_symbol
        symbol_mark = get_symbol_mark(vt_symbol)
        component_contracts = self.component_contracts[symbol_mark]
        for tick in list(index_tick_components.values()):
            #累加指数合约持仓量,成交量
            index_tick.open_interest += tick.open_interest
            index_tick.volume += tick.volume
            index_tick.last_volume += tick.last_volume

        if index_tick.open_interest:
            for tick in list(index_tick_components.values()):
                #成分合约权重
                tick_weight = tick.open_interest / index_tick.open_interest
                component_contracts[tick.vt_symbol] = tick_weight

vnpy/trader/engine.py里面修改

class MainEngine:
    #--------------------------------------------------------------------------------------------------
    def __init__(self, event_engine: EventEngine = None):
        # 挂载指数数据生成器
        self.index_generator: IndexGenerator = IndexGenerator(self, self.event_engine)
        #成分合约持仓量权重排序后的列表
        self.get_component_contracts = self.index_generator.get_component_contracts
# 使用方法
symbol_mark = get_symbol_mark(vt_symbol)
# 主力合约
main_vt_symol = self.main_engine.get_component_contracts(symbol_mark)[0][0]
# 主力合约权重
main_weight = self.main_engine.get_component_contracts(symbol_mark)[0][1]
Member
avatar
加入于:
帖子: 97
声望: 7

月总的教程不如楼主的细致。楼主的基本是即插即用。哈哈

Member
avatar
加入于:
帖子: 97
声望: 7

更新一个bug修复:
bug表现:
大多数时候大家都有simnow第一套环境测试,该环境没有期权。
当在有期权环境的实盘订阅指数合约时,将会连同除所有该品种期货合约之外的期权一起订阅。
这将会导致,额外订阅了N多合约后使得Tick事件大大增加,对系统造成了很多额外的不必要开销。
解决办法:
修改vnpy\trader\engine.py中OmsEngine中的get_all_index_trade_contract,

        for vt_symbol, contract_data in contracts.items():

之后添加过滤:

            if contract_data.product != Product.FUTURES:
                continue

记得在文件头部引入Product:

from .constant import Product
Member
avatar
加入于:
帖子: 18
声望: 0

你们难道没有这种错误吗,TypeError: init() missing 1 required positional argument: 'margin_ratio'。。。。。
就是在object中添加的合约几个信息
在\vnstudio\lib\site-packages\vnpy\gateway\ctp\ctp_gateway.py(621): onRspQryInstrument
的合约里没有写,要填一下

Member
avatar
加入于:
帖子: 6
声望: 0

mengrong wrote:

你们难道没有这种错误吗,TypeError: init() missing 1 required positional argument: 'margin_ratio'。。。。。
就是在object中添加的合约几个信息
在\vnstudio\lib\site-packages\vnpy\gateway\ctp\ctp_gateway.py(621): onRspQryInstrument
的合约里没有写,要填一下

最新的V2.2版本中 步骤2、4)中的OMSEngine的合约影响函数,自动添加指数合约的contract 对应的代码中需要注释掉以下三行
margin_ratio=contract.margin_ratio,
open_date="19990101",
expire_date="20990101",

Member
avatar
加入于:
帖子: 30
声望: 0

还是不要改原有的文件,以扩展的思路,做新的文件和上层应用,来完成功能,应该比这种思路要好吧?
一种是内置的改,一直是外挂的方式,我选择外挂解决
这种做法,容易搞乱,同时,不便于维护
个人意见,仅供参考

Member
avatar
加入于:
帖子: 6
声望: 0

非常感谢分享,学习了,发现拼写错误
原文:2)MainEngine的subscrible函数中,注册指数合约
应该为:2)MainEngine的subscribe函数中,注册指数合约

Member
avatar
加入于:
帖子: 6
声望: 0

弱弱的问一句,这个是怎么用的,我猜是实盘的时候,用于识别当前时刻的主力连续合约对应的实际合约并且订阅,然后订阅了的实际合约的数据拼接到主力连续合约那里?请指教

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3

沪公网安备 31011502017034号