1. 问题的由来
- 在文章 发现了vnpy的BarGenerator两个隐藏很深的错误 !中我就已经分析过tick数据对bar生成器的影响。
- 当前vnpy系统对集合竞价tick与其他tick没有区分能力
- 当前vnpy系统没有充分利用行情接口提供的状态信息,无法识别有效tick与无效tick,一股脑地发送到策略和应用中,导致bar合成的错误。
2. 问题的解决方法
在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。
tick数据过滤器的功能:
- 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用,每个网关对应一个ick数据过滤;
要做到这一条,就必须做到对所有已经订阅过的合约的tick的缓存,否则你再次重启系统的时候是无法知道你收到第一个tick是否已经参与过之前bar的合成了。这样你可能重复使用该tick,这是错误的。
为此我们需要将所有已经订阅过的合约的最新tick进行实时更新,并定期做持久化保存,且在每次系统启动的时候读取加载到系统中。 - 过滤无效tick,转发有效交易状态下的tick到系统中,不在有效交易状态下tick做丢弃处理,有效交易状态包括:集合竞价状态和连续竞价状态;
CTP系统的行情接口中包含的实时更新的合约交易状态通知推送接口,OnRtnInstrumentStatus()。关于这个问题我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中做了详细的介绍,再次就不赘述。总之合约交易状态通知可以让我识别一个tick是否是有些大tick。 - 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持。
合约交易状态通知可以让我们知道那些tick是tick,同时可以可以让我们区分那个tick是集合竞价tick,那些是连续竞价tick。对有效tick进性分析利用于我们策略或者应用生成出正确的bar。 - 本文只对CtpGateway,CtaEngine、CtaTemplate进行了更改,其他网关系统的道理都是相同的。如果您觉得对您有启发,也可以按同样的方法修改。
3. 过滤无效tick数据的实现代码
声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。
3.1 相关数据类型定义
4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
# 有效交易状态
VALID_TRADE_STATUSES = [
InstrumentStatus.CONTINOUS,
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
# 集合竞价交易状态
AUCTION_STATUS = [
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
class StatusEnterReason(Enum):
"""
品种进入交易状态原因类型 hxxjava debug
"""
AUTOMATIC = "自动切换"
MANUAL = "手动切换"
FUSE = "熔断"
在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:
@dataclass
class StatusData(BaseData):
"""
hxxjava debug
"""
symbol:str
exchange : Exchange
settlement_group_id : str = ""
instrument_status : InstrumentStatus = None
trading_segment_sn : int = None
enter_time : str = ""
enter_reason : str = ""
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def belongs_to(self,vt_symbol:str):
symbol,exchange_str = vt_symbol.split(".")
instrument = left_alphas(symbol).upper()
return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)
3.2 相关消息定义
在vnpy\trader\event.py中增加交易状态消息类型
EVENT_STATUS = "eStatus" # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick." # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick." # hxxjava debug
3.3 Gateway的修改
在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:
引用部分增加:
from .event import EVENT_ORIGIN_TICK,EVENT_STATUS # hxxjava add
from .object import StatusData # hxxjava add
修改class BaseGateway的on_tick()接口,增加on_status()接口:
def on_tick(self, tick: TickData) -> None:
"""
Tick event push.
Tick event of a specific vt_symbol is also pushed.
"""
self.on_event(EVENT_ORIGIN_TICK, tick) # hxxjava add
# self.on_event(EVENT_TICK, tick)
# self.on_event(EVENT_TICK + tick.vt_symbol, tick)
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
self.on_event(EVENT_STATUS + status.vt_symbol, status)
3.4 CtpGateway的修改
修改vnpy_cpt\ctp_gateway.py:
增加引用部分
from vnpy.trader.constant import InstrumentStatus,StatusEnterReason # hxxjava debug
rom vnpy.trader.object import StatusData, # hxxjava debug
增加几个映射字典:
# 品种状态进入原因映射 hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
"0": InstrumentStatus.BEFORE_TRADING,
"1": InstrumentStatus.NO_TRADING,
"2": InstrumentStatus.CONTINOUS,
"3": InstrumentStatus.AUCTION_ORDERING,
"4": InstrumentStatus.AUCTION_BALANCE,
"5": InstrumentStatus.AUCTION_MATCH,
"6": InstrumentStatus.CLOSE,
"7": InstrumentStatus.CLOSE
}
# 品种状态进入原因映射 hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
"1": StatusEnterReason.AUTOMATIC,
"2": StatusEnterReason.MANUAL,
"3": StatusEnterReason.FUSE
}
为class CtpTdApi增加下面合约状态推送接口:
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约品种状态信息 # hxxjava debug
"""
if data:
# print(f"【data={data}】")
status = StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
# print(f"status={status}")
self.gateway.on_status(status)
3.5 对CtaEngine的进行扩展
增加引用部分
from vnpy.trader.event import EVENT_AUCTION_TICK # hxxjava add
增加一个对CtaEgine的扩展MyCtaEngine
class MyCtaEngine(CtaEngine):
""" """
condition_filename = "condition_order.json" # 历史条件单存储文件
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
def load_active_condtion_orders(self):
""" """
return {}
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 """
tick:TickData = event.data
strategies = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
把vnpy_ctastrategy目录下 的__init__.py
中的CtaStrategyApp做如下修改
class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
# engine_class = CtaEngine
engine_class = MyCtaEngine # hxxjava add
widget_name = "CtaManager"
icon_name = str(app_path.joinpath("ui", "cta.ico"))
3.6 CtaTemplate的修改
修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():
@virtual
def on_auction_tick(self, tick: TickData):
"""
Callback of new tick data update. # hxxjava add for auction tick
"""
pass
3.7 为数据库增加最新Tick保存函数
3.7.1 修改vnpy\trader\database.py
为class BaseDatabase增加下面两个接口函数:
@abstractmethod
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
pass
@abstractmethod
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
pass
3.7.2 修改vnpy_mysql\mysql_database.py
class MyDateTimeField(DateTimeField):
def get_modifiers(self):
return [6]
class DbLastTick(Model): # hxxjava add
""" 最新TICK数据表映射对象 """
id = AutoField()
gateway_name: str = CharField()
symbol: str = CharField()
exchange: str = CharField()
datetime: datetime = MyDateTimeField()
name: str = CharField()
volume: float = FloatField()
turnover: float = FloatField()
open_interest: float = FloatField()
last_price: float = FloatField()
last_volume: float = FloatField()
limit_up: float = FloatField()
limit_down: float = FloatField()
open_price: float = FloatField()
high_price: float = FloatField()
low_price: float = FloatField()
pre_close: float = FloatField()
bid_price_1: float = FloatField()
bid_price_2: float = FloatField(null=True)
bid_price_3: float = FloatField(null=True)
bid_price_4: float = FloatField(null=True)
bid_price_5: float = FloatField(null=True)
ask_price_1: float = FloatField()
ask_price_2: float = FloatField(null=True)
ask_price_3: float = FloatField(null=True)
ask_price_4: float = FloatField(null=True)
ask_price_5: float = FloatField(null=True)
bid_volume_1: float = FloatField()
bid_volume_2: float = FloatField(null=True)
bid_volume_3: float = FloatField(null=True)
bid_volume_4: float = FloatField(null=True)
bid_volume_5: float = FloatField(null=True)
ask_volume_1: float = FloatField()
ask_volume_2: float = FloatField(null=True)
ask_volume_3: float = FloatField(null=True)
ask_volume_4: float = FloatField(null=True)
ask_volume_5: float = FloatField(null=True)
localtime: datetime = DateTimeField(null=True)
class Meta:
database = db
indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)
class MysqlDatabase的初始化做如下修改:
def __init__(self) -> None:
""""""
self.db = db
self.db.connect()
self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview]) # hxxjava add DbLastTick,DbContractData
再为class MysqlDatabase添加下面两个函数:
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
vt_symbols = [t.vt_symbol for t in ticks]
# 删除ticks列表中包含合约的旧的tick记录
d: ModelDelete = DbLastTick.delete().where(
(DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
)
count = d.execute()
# print(f"delete {count} last ticks")
# 构造最新的ticks列表数据
data = []
for t in ticks:
tick:TickData = deepcopy(t) # hxxjava change
tick.datetime = tick.datetime
d = tick.__dict__
d["exchange"] = d["exchange"].value
d.pop("vt_symbol")
data.append(d)
# print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))
# 使用upsert操作将数据更新到数据库中
with self.db.atomic():
for c in chunked(data, 50):
DbLastTick.insert_many(c).on_conflict_replace().execute()
return True
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
try:
# 从DbLastTick查询符合条件的最新tick记录
s: ModelSelect = (
DbLastTick.select().where(
(DbLastTick.gateway_name == gateway_name)
& (exchange is None or DbLastTick.exchange == exchange.value)
& (symbol is None or DbLastTick.symbol == symbol)
).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
)
# 利用最新tick记录构造ticks列表
ticks: List[TickData] = []
for db_tick in s:
tick:TickData = TickData(
symbol=db_tick.symbol,
exchange=Exchange(db_tick.exchange),
datetime=to_china_tz(db_tick.datetime),
name=db_tick.name,
volume=db_tick.volume,
turnover=db_tick.turnover,
open_interest=db_tick.open_interest,
last_price=db_tick.last_price,
last_volume=db_tick.last_volume,
limit_up=db_tick.limit_up,
limit_down=db_tick.limit_down,
open_price=db_tick.open_price,
high_price=db_tick.high_price,
low_price=db_tick.low_price,
pre_close=db_tick.pre_close,
bid_price_1=db_tick.bid_price_1,
bid_price_2=db_tick.bid_price_2,
bid_price_3=db_tick.bid_price_3,
bid_price_4=db_tick.bid_price_4,
bid_price_5=db_tick.bid_price_5,
ask_price_1=db_tick.ask_price_1,
ask_price_2=db_tick.ask_price_2,
ask_price_3=db_tick.ask_price_3,
ask_price_4=db_tick.ask_price_4,
ask_price_5=db_tick.ask_price_5,
bid_volume_1=db_tick.bid_volume_1,
bid_volume_2=db_tick.bid_volume_2,
bid_volume_3=db_tick.bid_volume_3,
bid_volume_4=db_tick.bid_volume_4,
bid_volume_5=db_tick.bid_volume_5,
ask_volume_1=db_tick.ask_volume_1,
ask_volume_2=db_tick.ask_volume_2,
ask_volume_3=db_tick.ask_volume_3,
ask_volume_4=db_tick.ask_volume_4,
ask_volume_5=db_tick.ask_volume_5,
localtime=db_tick.localtime,
gateway_name=db_tick.gateway_name
)
ticks.append(tick)
return ticks
except:
# 当DbLastTick表不存在的时候,会发生错误
return []
3.8 tick数据过滤器的实现
在vnpy.usertools下创建tickfilter.py文件,其内容如下:
"""
本文件主要实现tick数据过滤器——TickFilter。
tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持
作者:hxxjava
日期:2022-06-16
修改日期: 修改原因:
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
EVENT_ORIGIN_TICK,
EVENT_AUCTION_TICK,
EVENT_TICK,
EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol
def left_alphas(instr:str):
"""
得到字符串左边的字符部分
"""
ret_str = ''
for s in instr:
if s.isalpha():
ret_str += s
else:
break
return ret_str
def get_vt_instrument(vt_symbol:str):
"""
从完整合约代码转换到完整品种代码
"""
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
return f"{instrument}.{exchange.value}"
class TickFilter():
""" tick数据过滤器 """
CHECK_INTERVAL:int = 5 # 更新到数据库间隔
def __init__(self,event_engine:EventEngine,gateway_name:str):
""" tick数据过滤器初始化 """
self.event_engine = event_engine
self.gateway_name = gateway_name
self.db = get_database()
# 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}
# 品种及合约状态字典 { vt_symbol : StatusData }
self.statuses:Dict[str,StatusData] = {}
self.second_cnt = 0
self.load_last_ticks()
self.register_event()
# print(f"TickFilter {gateway_name}")
def load_last_ticks(self):
"""
加载属于网关名称为self.gateway_name的最新tick列表
"""
last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
for tick in last_ticks:
self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)
# print(f"load {len(last_ticks)} last ticks")
def register_event(self):
""" 注册消息 """
self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
self.event_engine.register(EVENT_STATUS,self.process_status_event)
self.event_engine.register(EVENT_TIMER,self.check_last_ticks)
def process_tick_event(self,event:Event):
""" 对原始tick进行过滤 """
tick:TickData = event.data
# 检查tick合约的经验状态是否位有效交易状态
status:StatusData = self.statuses.get(tick.vt_symbol,None)
if not status:
vt_instrument = get_vt_instrument(tick.vt_symbol)
status = self.statuses.get(vt_instrument,None)
if not status:
# 未收到交易状态,返回
return
if status.instrument_status not in VALID_TRADE_STATUSES:
# 不在有效交易状态,返回
return
key = (tick.gateway_name,tick.vt_symbol)
_,oldtick = self.last_ticks.get(key,(None,None))
valid_tick = False
if not oldtick:
# 没有该合约的历史tick
self.last_ticks[key] = (True,tick)
valid_tick = True
elif tick.datetime > oldtick.datetime:
#
self.last_ticks[key] = (True,tick)
valid_tick = True
else:
print(f"【特别tick = {tick}】")
if valid_tick == True:
# 如果是有效的tick
if status.instrument_status != InstrumentStatus.CONTINOUS:
# 发送集合竞价tic消息到系统中
self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
else:
# 发送连续竞价tic消息到系统中
self.event_engine.put(Event(EVENT_TICK,tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
def process_status_event(self, event: Event):
""" 交易状态通知消息处理 """
status:StatusData = event.data
self.statuses[status.vt_symbol] = status
# print(f"【{status.gateway_name} {status}】")
def check_last_ticks(self,event:Event) -> None:
""" 原始tick过滤器 """
self.second_cnt += 1
if self.second_cnt % self.CHECK_INTERVAL == 0:
# 如果到了定时间隔
# 查询所有更新的tick
changed_ticks = []
for key,(update,tick) in self.last_ticks.items():
if update:
changed_ticks.append(tick)
self.last_ticks[key] = (False,tick)
if changed_ticks:
# 如果存在更新的tick,保存到数据库
t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
t.start()
# print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")
3.9 把tick数据过滤器安装到主引擎MainEngine上去
修改vnpy\trader\engine.py
添加引用部分
from vnpy.usertools.tickfilter import TickFilter # hxxjava add
修改MainEngine的
在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:
self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add
修改其add_gateway(),内容如下:
def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
"""
Add gateway.
"""
# Use default name if gateway_name not passed
if not gateway_name:
gateway_name = gateway_class.default_name
gateway = gateway_class(self.event_engine, gateway_name)
self.gateways[gateway_name] = gateway
# Add gateway supported exchanges into engine
for exchange in gateway.exchanges:
if exchange not in self.exchanges:
self.exchanges.append(exchange)
# add a tick data filter for the gateway # hxxjava add
if gateway_name not in self.tick_filters:
self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)
return gateway
4. 经过上面的一系列修改,你获得了哪些好处?
- 你的策略再也不会收到重复数据和垃圾数据
- 此以后你的CTA策略中必须加入一个on_auction_tick()接口函数,用来接受每个交易日集合竞价所产生的tick。如何使用这个tick你有你的方法。
- 在合成K线的时候你才可能构成正确的K线,比如BarGenerator对跨日tick时间戳的处理错误问题,在此也会迎刃而解。
4.1 现在来梳理下我们都干了哪些事情
- 在CtpGateway中引入了合约交易状态,这可以用来过滤无效数据,同时还能够识别集合竞价tick。
- 在database中增加了最新tick持久化保存,这为新的tick是否是重复的判断提供支持。
- 提供有效tick的分类,在CTA策略的模板中增加on_auction_tick()接口使得BarGenerator正确处1分钟bar的成交量和成交额成为可能。
4.2 非CTP网关使用者是否也可以这样做?
只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。
5. 解决BarGenerator统计bar成交量和成交额错误的方法
5.1 这是对BarGenerator做出点修改,
- 修改BarGenerator的初始化函数
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
""" Constructor """
... ... # 其他代码省略
self.auction_tick:TickData = None
self.last_tick: TickData = None
- 增加BarGenerator的集合竞价tick处理函数
def update_auction_tick(self,tick:TickData):
""" 更新集合竞价tick """
self.auction_tick = tick
- 修改BarGenerator的1分钟bar合成函数
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
if self.auction_tick:
# 合约集合竞价tick到当前tick
tick.high_price = max(tick.high_price,self.auction_tick.high_price)
tick.low_price = min(tick.low_price,self.auction_tick.low_price)
# 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
self.last_tick = deepcopy(self.auction_tick)
# 成交量和成交额每天从0开始单调递增
self.last_tick.volume = 0.0
self.last_tick.turnover = 0.0
# 用完集合竞价tick就丢弃
self.auction_tick = None
... ... # 其他代码省略
5.2 您的策略关于集合竞价tick更新的回调函数:
def on_auction_tick(self, tick: TickData):
"""
集合竞价tick处理
"""
self.bg.update_auction_tick(tick) # 假设self.bg是已经创建过的bar生成器
两点说明:
- 如果你在阅读本文的时候觉得有点一头雾水,可以搜索'hxxjava'字符串,将会显示大部分修改的代码,仔细揣摩下,就会知道我做了什么了!
- 另外本贴中还有一部分涉及到条件单的代码,如果出现错误,可以查找我的关于条件单的帖子比停止单更好用的条件单——ConditionOrder,这里就不再重复贴出那部分代码了。