VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 419
声望: 170

1. 问题的由来

  • 在文章 发现了vnpy的BarGenerator两个隐藏很深的错误 !中我就已经分析过tick数据对bar生成器的影响。
  • 当前vnpy系统对集合竞价tick与其他tick没有区分能力
  • 当前vnpy系统没有充分利用行情接口提供的状态信息,无法识别有效tick与无效tick,一股脑地发送到策略和应用中,导致bar合成的错误。

2. 问题的解决方法

在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。

tick数据过滤器的功能:

  1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用,每个网关对应一个ick数据过滤;
    要做到这一条,就必须做到对所有已经订阅过的合约的tick的缓存,否则你再次重启系统的时候是无法知道你收到第一个tick是否已经参与过之前bar的合成了。这样你可能重复使用该tick,这是错误的。
    为此我们需要将所有已经订阅过的合约的最新tick进行实时更新,并定期做持久化保存,且在每次系统启动的时候读取加载到系统中。
  2. 过滤无效tick,转发有效交易状态下的tick到系统中,不在有效交易状态下tick做丢弃处理,有效交易状态包括:集合竞价状态和连续竞价状态;
    CTP系统的行情接口中包含的实时更新的合约交易状态通知推送接口,OnRtnInstrumentStatus()。关于这个问题我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中做了详细的介绍,再次就不赘述。总之合约交易状态通知可以让我识别一个tick是否是有些大tick。
  3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持。
    合约交易状态通知可以让我们知道那些tick是tick,同时可以可以让我们区分那个tick是集合竞价tick,那些是连续竞价tick。对有效tick进性分析利用于我们策略或者应用生成出正确的bar。
  4. 本文只对CtpGateway,CtaEngine、CtaTemplate进行了更改,其他网关系统的道理都是相同的。如果您觉得对您有启发,也可以按同样的方法修改。

3. 过滤无效tick数据的实现代码

声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。

3.1 相关数据类型定义

4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:

class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"


# 有效交易状态
VALID_TRADE_STATUSES = [
    InstrumentStatus.CONTINOUS,
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]

# 集合竞价交易状态
AUCTION_STATUS = [
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]


class StatusEnterReason(Enum):
    """
    品种进入交易状态原因类型 hxxjava debug
    """
    AUTOMATIC = "自动切换"
    MANUAL = "手动切换"
    FUSE = "熔断"

在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:

@dataclass
class StatusData(BaseData):
    """
    hxxjava debug
    """
    symbol:str       
    exchange : Exchange    
    settlement_group_id : str = ""  
    instrument_status : InstrumentStatus = None   
    trading_segment_sn : int = None 
    enter_time : str = ""      
    enter_reason : str = ""  
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def belongs_to(self,vt_symbol:str):
        symbol,exchange_str = vt_symbol.split(".")
        instrument = left_alphas(symbol).upper()
        return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)

3.2 相关消息定义

在vnpy\trader\event.py中增加交易状态消息类型

EVENT_STATUS = "eStatus"                        # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick."            # hxxjava debug

3.3 Gateway的修改

在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:

引用部分增加:

from .event import EVENT_ORIGIN_TICK,EVENT_STATUS         # hxxjava add
from .object import StatusData    # hxxjava add

修改class BaseGateway的on_tick()接口,增加on_status()接口:

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_ORIGIN_TICK, tick)  # hxxjava add             
        # self.on_event(EVENT_TICK, tick)                   
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)  

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)
        self.on_event(EVENT_STATUS + status.vt_symbol, status)

3.4 CtpGateway的修改

修改vnpy_cpt\ctp_gateway.py:

增加引用部分

from vnpy.trader.constant import InstrumentStatus,StatusEnterReason  # hxxjava debug
rom vnpy.trader.object import StatusData,     # hxxjava debug

增加几个映射字典:

# 品种状态进入原因映射  hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
    "0": InstrumentStatus.BEFORE_TRADING,
    "1": InstrumentStatus.NO_TRADING,
    "2": InstrumentStatus.CONTINOUS,
    "3": InstrumentStatus.AUCTION_ORDERING,
    "4": InstrumentStatus.AUCTION_BALANCE,
    "5": InstrumentStatus.AUCTION_MATCH,
    "6": InstrumentStatus.CLOSE,
    "7": InstrumentStatus.CLOSE
}


# 品种状态进入原因映射  hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
    "1": StatusEnterReason.AUTOMATIC,
    "2": StatusEnterReason.MANUAL,
    "3": StatusEnterReason.FUSE
}

为class CtpTdApi增加下面合约状态推送接口:

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约品种状态信息 # hxxjava debug 
        """
        if data:
            # print(f"【data={data}】")
            status =  StatusData(
                symbol = data["InstrumentID"],
                exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
                settlement_group_id = data["SettlementGroupID"],
                instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
                trading_segment_sn = data["TradingSegmentSN"],
                enter_time = data["EnterTime"],
                enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
                exchange_inst_id = data["ExchangeInstID"],
                gateway_name=self.gateway_name
            )
            # print(f"status={status}")
            self.gateway.on_status(status)

3.5 对CtaEngine的进行扩展

增加引用部分

from vnpy.trader.event import EVENT_AUCTION_TICK  # hxxjava add

增加一个对CtaEgine的扩展MyCtaEngine

class MyCtaEngine(CtaEngine):
    """  """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 """

        tick:TickData = event.data
        strategies = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

把vnpy_ctastrategy目录下 的__init__.py中的CtaStrategyApp做如下修改

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

3.6 CtaTemplate的修改

修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

3.7 为数据库增加最新Tick保存函数

3.7.1 修改vnpy\trader\database.py

为class BaseDatabase增加下面两个接口函数:

    @abstractmethod
    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        pass

    @abstractmethod
    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        pass

3.7.2 修改vnpy_mysql\mysql_database.py

class MyDateTimeField(DateTimeField):
    def get_modifiers(self):
        return [6]

class DbLastTick(Model):    # hxxjava add
    """ 最新TICK数据表映射对象 """

    id = AutoField()

    gateway_name: str = CharField()

    symbol: str = CharField()
    exchange: str = CharField()
    datetime: datetime = MyDateTimeField()

    name: str = CharField()
    volume: float = FloatField()
    turnover: float = FloatField()
    open_interest: float = FloatField()
    last_price: float = FloatField()
    last_volume: float = FloatField()
    limit_up: float = FloatField()
    limit_down: float = FloatField()

    open_price: float = FloatField()
    high_price: float = FloatField()
    low_price: float = FloatField()
    pre_close: float = FloatField()

    bid_price_1: float = FloatField()
    bid_price_2: float = FloatField(null=True)
    bid_price_3: float = FloatField(null=True)
    bid_price_4: float = FloatField(null=True)
    bid_price_5: float = FloatField(null=True)

    ask_price_1: float = FloatField()
    ask_price_2: float = FloatField(null=True)
    ask_price_3: float = FloatField(null=True)
    ask_price_4: float = FloatField(null=True)
    ask_price_5: float = FloatField(null=True)

    bid_volume_1: float = FloatField()
    bid_volume_2: float = FloatField(null=True)
    bid_volume_3: float = FloatField(null=True)
    bid_volume_4: float = FloatField(null=True)
    bid_volume_5: float = FloatField(null=True)

    ask_volume_1: float = FloatField()
    ask_volume_2: float = FloatField(null=True)
    ask_volume_3: float = FloatField(null=True)
    ask_volume_4: float = FloatField(null=True)
    ask_volume_5: float = FloatField(null=True)

    localtime: datetime = DateTimeField(null=True)

    class Meta:
        database = db
        indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)

class MysqlDatabase的初始化做如下修改:

    def __init__(self) -> None:
        """"""
        self.db = db
        self.db.connect()
        self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview])   # hxxjava add DbLastTick,DbContractData

再为class MysqlDatabase添加下面两个函数:

    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        vt_symbols = [t.vt_symbol for t in ticks]

        # 删除ticks列表中包含合约的旧的tick记录
        d: ModelDelete = DbLastTick.delete().where(
            (DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
        )
        count = d.execute()
        # print(f"delete {count} last ticks")

        # 构造最新的ticks列表数据
        data = []
        for t in ticks:
            tick:TickData = deepcopy(t)     # hxxjava change
            tick.datetime = tick.datetime

            d = tick.__dict__
            d["exchange"] = d["exchange"].value
            d.pop("vt_symbol")
            data.append(d)
            # print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))

        # 使用upsert操作将数据更新到数据库中
        with self.db.atomic():
            for c in chunked(data, 50):
                DbLastTick.insert_many(c).on_conflict_replace().execute()

        return True

    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        try:
            # 从DbLastTick查询符合条件的最新tick记录
            s: ModelSelect = (
                DbLastTick.select().where(
                    (DbLastTick.gateway_name == gateway_name)
                    & (exchange is None or DbLastTick.exchange == exchange.value)
                    & (symbol is None or DbLastTick.symbol == symbol)
                ).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
            )

            # 利用最新tick记录构造ticks列表
            ticks: List[TickData] = []
            for db_tick in s:
                tick:TickData = TickData(
                    symbol=db_tick.symbol,
                    exchange=Exchange(db_tick.exchange),
                    datetime=to_china_tz(db_tick.datetime),
                    name=db_tick.name,
                    volume=db_tick.volume,
                    turnover=db_tick.turnover,
                    open_interest=db_tick.open_interest,
                    last_price=db_tick.last_price,
                    last_volume=db_tick.last_volume,
                    limit_up=db_tick.limit_up,
                    limit_down=db_tick.limit_down,
                    open_price=db_tick.open_price,
                    high_price=db_tick.high_price,
                    low_price=db_tick.low_price,
                    pre_close=db_tick.pre_close,
                    bid_price_1=db_tick.bid_price_1,
                    bid_price_2=db_tick.bid_price_2,
                    bid_price_3=db_tick.bid_price_3,
                    bid_price_4=db_tick.bid_price_4,
                    bid_price_5=db_tick.bid_price_5,
                    ask_price_1=db_tick.ask_price_1,
                    ask_price_2=db_tick.ask_price_2,
                    ask_price_3=db_tick.ask_price_3,
                    ask_price_4=db_tick.ask_price_4,
                    ask_price_5=db_tick.ask_price_5,
                    bid_volume_1=db_tick.bid_volume_1,
                    bid_volume_2=db_tick.bid_volume_2,
                    bid_volume_3=db_tick.bid_volume_3,
                    bid_volume_4=db_tick.bid_volume_4,
                    bid_volume_5=db_tick.bid_volume_5,
                    ask_volume_1=db_tick.ask_volume_1,
                    ask_volume_2=db_tick.ask_volume_2,
                    ask_volume_3=db_tick.ask_volume_3,
                    ask_volume_4=db_tick.ask_volume_4,
                    ask_volume_5=db_tick.ask_volume_5,
                    localtime=db_tick.localtime,
                    gateway_name=db_tick.gateway_name
                )
                ticks.append(tick)

            return ticks

        except:
            # 当DbLastTick表不存在的时候,会发生错误
            return []

3.8 tick数据过滤器的实现

在vnpy.usertools下创建tickfilter.py文件,其内容如下:

"""
本文件主要实现tick数据过滤器——TickFilter。

tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持

作者:hxxjava
日期:2022-06-16
修改日期:              修改原因:  
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
    EVENT_ORIGIN_TICK,
    EVENT_AUCTION_TICK,
    EVENT_TICK,
    EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol


def left_alphas(instr:str):
    """
    得到字符串左边的字符部分
    """
    ret_str = ''
    for s in instr:
        if s.isalpha():
            ret_str += s
        else:
            break
    return ret_str

def get_vt_instrument(vt_symbol:str):
    """
    从完整合约代码转换到完整品种代码
    """    
    symbol,exchange = extract_vt_symbol(vt_symbol)
    instrument = left_alphas(symbol)
    return f"{instrument}.{exchange.value}"


class TickFilter():
    """ tick数据过滤器 """
    CHECK_INTERVAL:int = 5  # 更新到数据库间隔

    def __init__(self,event_engine:EventEngine,gateway_name:str):
        """ tick数据过滤器初始化 """
        self.event_engine = event_engine
        self.gateway_name = gateway_name
        self.db = get_database()

        # 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
        self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}

        # 品种及合约状态字典 { vt_symbol : StatusData }
        self.statuses:Dict[str,StatusData] = {}
        self.second_cnt = 0

        self.load_last_ticks()
        self.register_event()

        # print(f"TickFilter {gateway_name}")

    def load_last_ticks(self):
        """ 
        加载属于网关名称为self.gateway_name的最新tick列表 
        """
        last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
        for tick in last_ticks:
            self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)

        # print(f"load {len(last_ticks)} last ticks")

    def register_event(self):
        """ 注册消息 """
        self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_STATUS,self.process_status_event)
        self.event_engine.register(EVENT_TIMER,self.check_last_ticks)        

    def process_tick_event(self,event:Event):
        """ 对原始tick进行过滤 """
        tick:TickData = event.data

        # 检查tick合约的经验状态是否位有效交易状态
        status:StatusData = self.statuses.get(tick.vt_symbol,None)
        if not status:
            vt_instrument = get_vt_instrument(tick.vt_symbol)
            status = self.statuses.get(vt_instrument,None)
            if not status:
                # 未收到交易状态,返回
                return

        if status.instrument_status not in VALID_TRADE_STATUSES:
            # 不在有效交易状态,返回
            return

        key = (tick.gateway_name,tick.vt_symbol)
        _,oldtick = self.last_ticks.get(key,(None,None))
        valid_tick = False
        if not oldtick:
            # 没有该合约的历史tick
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        elif tick.datetime > oldtick.datetime:
            # 
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        else:
            print(f"【特别tick = {tick}】")

        if valid_tick == True:
            # 如果是有效的tick
            if status.instrument_status != InstrumentStatus.CONTINOUS:
                # 发送集合竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
                self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))  

            else:
                # 发送连续竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_TICK,tick))
                self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))  

    def process_status_event(self, event: Event):  
        """ 交易状态通知消息处理 """
        status:StatusData = event.data
        self.statuses[status.vt_symbol] = status

        # print(f"【{status.gateway_name} {status}】")

    def check_last_ticks(self,event:Event) -> None:
        """ 原始tick过滤器 """
        self.second_cnt += 1
        if self.second_cnt % self.CHECK_INTERVAL == 0:
            # 如果到了定时间隔

            # 查询所有更新的tick
            changed_ticks = [] 

            for key,(update,tick) in self.last_ticks.items():
                if update:
                    changed_ticks.append(tick)
                    self.last_ticks[key] = (False,tick)

            if changed_ticks:
                # 如果存在更新的tick,保存到数据库
                t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
                t.start()
                # print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")

3.9 把tick数据过滤器安装到主引擎MainEngine上去

修改vnpy\trader\engine.py

添加引用部分

from vnpy.usertools.tickfilter import TickFilter    # hxxjava add

修改MainEngine的

在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:

self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add

修改其add_gateway(),内容如下:

    def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
        """
        Add gateway.
        """
        # Use default name if gateway_name not passed
        if not gateway_name:
            gateway_name = gateway_class.default_name

        gateway = gateway_class(self.event_engine, gateway_name)
        self.gateways[gateway_name] = gateway

        # Add gateway supported exchanges into engine
        for exchange in gateway.exchanges:
            if exchange not in self.exchanges:
                self.exchanges.append(exchange)

        # add a tick data filter for the gateway    #  hxxjava add  
        if gateway_name not in self.tick_filters:
            self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)

        return gateway

4. 经过上面的一系列修改,你获得了哪些好处?

  • 你的策略再也不会收到重复数据和垃圾数据
  • 此以后你的CTA策略中必须加入一个on_auction_tick()接口函数,用来接受每个交易日集合竞价所产生的tick。如何使用这个tick你有你的方法。
  • 在合成K线的时候你才可能构成正确的K线,比如BarGenerator对跨日tick时间戳的处理错误问题,在此也会迎刃而解。

4.1 现在来梳理下我们都干了哪些事情

  1. 在CtpGateway中引入了合约交易状态,这可以用来过滤无效数据,同时还能够识别集合竞价tick。
  2. 在database中增加了最新tick持久化保存,这为新的tick是否是重复的判断提供支持。
  3. 提供有效tick的分类,在CTA策略的模板中增加on_auction_tick()接口使得BarGenerator正确处1分钟bar的成交量和成交额成为可能。

4.2 非CTP网关使用者是否也可以这样做?

只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。

5. 解决BarGenerator统计bar成交量和成交额错误的方法

5.1 这是对BarGenerator做出点修改,

  • 修改BarGenerator的初始化函数
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ):
        """ Constructor """      

        ... ...  # 其他代码省略

        self.auction_tick:TickData = None
        self.last_tick: TickData = None
  • 增加BarGenerator的集合竞价tick处理函数
    def update_auction_tick(self,tick:TickData):
        """ 更新集合竞价tick """
        self.auction_tick = tick
  • 修改BarGenerator的1分钟bar合成函数
    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        if self.auction_tick:
            # 合约集合竞价tick到当前tick
            tick.high_price = max(tick.high_price,self.auction_tick.high_price)
            tick.low_price = min(tick.low_price,self.auction_tick.low_price)

            # 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
            self.last_tick = deepcopy(self.auction_tick)
            # 成交量和成交额每天从0开始单调递增
            self.last_tick.volume = 0.0   
            self.last_tick.turnover = 0.0

            # 用完集合竞价tick就丢弃
            self.auction_tick = None

      ... ...  # 其他代码省略

5.2 您的策略关于集合竞价tick更新的回调函数:

    def on_auction_tick(self, tick: TickData):
        """
        集合竞价tick处理
        """
        self.bg.update_auction_tick(tick)    # 假设self.bg是已经创建过的bar生成器

两点说明:

  1. 如果你在阅读本文的时候觉得有点一头雾水,可以搜索'hxxjava'字符串,将会显示大部分修改的代码,仔细揣摩下,就会知道我做了什么了!
  2. 另外本贴中还有一部分涉及到条件单的代码,如果出现错误,可以查找我的关于条件单的帖子比停止单更好用的条件单——ConditionOrder,这里就不再重复贴出那部分代码了。
Member
avatar
加入于:
帖子: 1446
声望: 102

感谢分享!

Member
avatar
加入于:
帖子: 419
声望: 170

MTF wrote:

感谢分享!

系统如果引入上面的tick过滤器,BarGenerator的跨日成交量和成交额处理错误问题就可以解决了,详细的解决方法请参见第5节吧。

Member
avatar
加入于:
帖子: 35
声望: 1

感谢分享,希望官方能够评估后把这个更新到新版本里面去~

Member
avatar
加入于:
帖子: 2
声望: 0

感谢分享,衷心建议您自己在Github/Gitee上fork原仓库,把您这么多新加的功能添加进去,这样也方便官方merge您的修改

Member
avatar
加入于:
帖子: 59
声望: 0

能不能直接根据合约的交易时间来过滤?

Member
avatar
加入于:
帖子: 419
声望: 170

渔哥 wrote:

能不能直接根据合约的交易时间来过滤?

可以,

  • 目前是判断tick的合约或者品种当前的交易状态是否是有效的,
  • 并没有判断tick的时间戳与当时交易状态中的起止时间是否矛盾,可以加上
Member
avatar
加入于:
帖子: 39
声望: 1

感谢分享

Member
avatar
加入于:
帖子: 59
声望: 0

谢谢是我没细看,把你几个帖子看完了,其实思想是一样的。

Member
avatar
加入于:
帖子: 39
声望: 1

description
onRtnInstrumentStatus响应
推送会有延迟
连接ctp后会集中推送一波,状态变化后会推送.

Member
avatar
加入于:
帖子: 26
声望: 1

感谢hxx分享,我模仿您的方法对vnpy_portfoliostrategy和vnpy_tts(gateway)进行了修改,运行起来发现所有的tick都被过滤掉了,检查发现是TickFilter.process_status_event从未被触发过,因此TickFilter.statuses始终为空,这样的话说明EVENT_STATUS从未发生?请问有什么办法能排查某个事件从未发生的原因?

Member
avatar
加入于:
帖子: 26
声望: 1

进一步检查发现TtsTdApi.onRtnInstrumentStatus 从未被触发

Member
avatar
加入于:
帖子: 419
声望: 170

c787297017 wrote:

感谢hxx分享,我模仿您的方法对vnpy_portfoliostrategy和vnpy_tts(gateway)进行了修改,运行起来发现所有的tick都被过滤掉了,检查发现是TickFilter.process_status_event从未被触发过,因此TickFilter.statuses始终为空,这样的话说明EVENT_STATUS从未发生?请问有什么办法能排查某个事件从未发生的原因?

如果你使用的是TTS或者UFT的gateway的话,是不可以的,因为它们都不是完整的CTP柜台,它们都没有实现合约状态推送功能。

Member
avatar
加入于:
帖子: 26
声望: 1

多谢大佬解惑

Member
avatar
加入于:
帖子: 19
声望: 1

解决tick这些问题,是不是不用再解决K线合成器的问题了?

Member
avatar
加入于:
帖子: 419
声望: 170

zjcha wrote:

解决tick这些问题,是不是不用再解决K线合成器的问题了?

解决tick这些问题,仍然需要再解决K线合成器的问题,二者相辅相成。

Member
avatar
加入于:
帖子: 26
声望: 1

在将大佬设计的过滤器布置到实盘环境后,测试发现郑商所品种的tick_data中的datetime未精确到毫秒,所以每秒的两次tick数据datetime标识完全一致,后面的一次会被过滤器认为是无效tick,并且打印了【特别tick =】,上期所和大商所的品种tick数据datetime是精确到毫秒的,所以不存在该问题,过滤器正常运行,请问大佬是怎么处理郑商所这个问题的?

Member
avatar
加入于:
帖子: 419
声望: 170

c787297017 wrote:

在将大佬设计的过滤器布置到实盘环境后,测试发现郑商所品种的tick_data中的datetime未精确到毫秒,所以每秒的两次tick数据datetime标识完全一致,后面的一次会被过滤器认为是无效tick,并且打印了【特别tick =】,上期所和大商所的品种tick数据datetime是精确到毫秒的,所以不存在该问题,过滤器正常运行,请问大佬是怎么处理郑商所这个问题的?

答复

  • 在simnow停止服务之前从来不会发现你说的这种情况,当下你应该用的不是CTP的gateway,请检查你所使用的gateway。
  • 打印了【特别tick =】的时候,应该是收到了不在有效交易状态下的tick数据,是应该被过滤掉的,正常。
  • 找到你所使用的gateway的MdApi的tick数据推送接口,仔细读代码,保证代码是类似下面的样子(这是CTP网关中的代码——有我修改的部分——看注释):
    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if not data["ActionDay"] or contract.exchange == Exchange.DCE:
            date_str: str = self.current_date

        else:
            date_str: str = data["ActionDay"]

        # 这里错了,按照这样的字符串"%Y%m%d %H:%M:%S.%f",会错把tick.datetime的微秒当成0.1秒计数
        # timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"        
        timestamp: str = f"{date_str} {data['UpdateTime']}." + str(data['UpdateMillisec']*1000).zfill(6) # hxxjava edit
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)
Member
avatar
加入于:
帖子: 26
声望: 1

感谢回复,我仔细检查了我使用的gateway,确定是ctp_gateway,并且我是在实盘时进行的测试,为了进一步定位问题,我将onRtnDepthMarketData中收到的data进行了打印(使用合约:SA209.CZCE)

description

输出内容如下

description

可以看到不知为何UpdateMillisec全部为0,所以才导致了同一秒内的tick数据datetime完全一致的情况,后面推进来的那条就会被当做特殊tick过滤掉。

不知道是否和我使用的vnpy_ctp版本有关系,我目前的版本号为6.5.1.7,hxxjava大神的应该是6.5.1.8以上,目前准备升级再次进行测试,感谢大佬帮我定位问题

Member
avatar
加入于:
帖子: 1446
声望: 102

郑商所合约,行情推送里时间戳不带毫秒信息的,要在业务层自己处理下

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】