订阅了A,B
A的数据先到,但此时你并不知道B数据在同一时间戳里会不会到,然后有2种情况
1 同一时间戳,B的数据不会到,采用B旧数据就可以套利
2 同一时间戳,B的数据后来到了,应该在B新数据来时套利
问题是,如果是1,看似B旧数据可以套利,但你不知道新数据是否可以
如果是2,B数据如果不来,但旧数据可以套利
怎么解?
订阅了A,B
A的数据先到,但此时你并不知道B数据在同一时间戳里会不会到,然后有2种情况
1 同一时间戳,B的数据不会到,采用B旧数据就可以套利
2 同一时间戳,B的数据后来到了,应该在B新数据来时套利
问题是,如果是1,看似B旧数据可以套利,但你不知道新数据是否可以
如果是2,B数据如果不来,但旧数据可以套利
怎么解?
而且还有个问题,比如说我订阅了A B C 三个合约的tick。我用模拟账号发现 某一时间点回这3个合约的tick顺序是乱序的。但是正式账号 顺序永远是一致的。这是为什么?难道模拟环境是多线程环境?正式环境是单线程回tick的?
这是一个非常典型的多合约数据同步问题,在套利策略开发中很常见。让我先查询 VeighNa 相关模块的处理方案。
好问题!您提到的是多合约套利策略中的数据同步时序问题,这在实盘交易中确实很常见。让我进一步查询 VeighNa 价差交易模块的源码实现:
完美! 现在我找到了关键代码。让我再查看价差数据计算的核心逻辑:
非常好! 现在我完全理解了您的问题。根据 VeighNa 价差交易模块的源码,我来给您详细解答:
您遇到的是多合约套利中的经典数据同步问题,这个问题在实盘交易中普遍存在。
从源码 engine.py 的 process_tick_event 方法可以看到:
def process_tick_event(self, event: Event) -> None:
tick: TickData = event.data
leg: LegData | None = self.legs.get(tick.vt_symbol, None)
if not leg:
return
leg.update_tick(tick) # 更新单腿tick数据
for spread in self.symbol_spread_map[tick.vt_symbol]:
# 只有能成功计算出价差盘口时,才会发送事件
if spread.calculate_price():
self.put_data_event(spread)
base.py)def calculate_price(self) -> bool:
"""
1. 如果各条腿价格均有效,则计算成功,返回True
2. 反之只要有一条腿的价格无效,则计算失败,返回False
"""
for variable, leg in self.variable_legs.items():
# 关键:如果有任何一条腿没有数据,直接返回False
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return False # 计算失败,不发送事件
# 所有腿都有数据时才计算价差
self.bid_price = self.parse_formula(self.price_code, bid_data)
self.ask_price = self.parse_formula(self.price_code, ask_data)
return True
核心策略:被动等待 + 使用旧数据填充
每条腿维护独立的 tick 缓存
LegData 对象缓存最新的 tick 数据只在所有腿都有数据时才计算价差
每次任何一条腿更新都尝试计算
您观察到的现象确实存在:
| 环境 | tick 顺序 | 原因 |
|---|---|---|
| 模拟环境 | 乱序 | 多线程推送,各合约独立线程 |
| 实盘环境 | 一致 | 单线程顺序推送,交易所排序 |
原因分析:
实盘环境(如 CTP):
模拟环境(如 SimNow):
对于您的两种情况:
# VeighNa 的做法:
# - 使用 B 的旧 tick 数据
# - 计算出的价差可能不够精确
# - 但可以立即做出交易决策
# 风险点:
# - 如果 B 旧数据已经过时(如跳价),可能误判套利机会
# VeighNa 的做法:
# - B 新 tick 到达时,会再次触发计算
# - 再次计算时使用 B 的新数据
# - 如果价差仍然满足条件,会再次发出信号
# 优化建议:
# - 在策略层面增加去重逻辑
# - 记录短时间内重复的套利机会
# - 或设置最小时间间隔过滤
from datetime import datetime, timedelta
class SpreadStrategy:
def __init__(self):
self.tick_timeout = timedelta(seconds=3) # tick超时时间
def on_spread_data(self):
# 检查所有腿tick的新鲜度
now = datetime.now()
for leg in self.spread.legs.values():
if not leg.tick:
return # 腿从未收到tick
age = now - leg.tick.datetime
if age > self.tick_timeout:
self.write_log(f"{leg.vt_symbol} tick数据过期: {age}")
return # 数据过期,不交易
# 所有数据都新鲜,执行套利逻辑
self.execute_arbitrage()
class SpreadStrategy:
def __init__(self):
self.last_signal_time = None
self.signal_interval = 1.0 # 最小信号间隔(秒)
def on_spread_data(self):
now = datetime.now()
# 检查是否在冷却期内
if self.last_signal_time:
elapsed = (now - self.last_signal_time).total_seconds()
if elapsed < self.signal_interval:
return # 冷却期内,忽略信号
# 执行套利
if self.check_arbitrage_opportunity():
self.execute_trade()
self.last_signal_time = now
def on_spread_data(self):
# 检查所有腿的tick时间戳是否接近
if not self.check_tick_sync():
return
# 执行套利
self.execute_arbitrage()
def check_tick_sync(self, max_delay_ms: int = 500) -> bool:
"""检查各腿tick时间戳是否同步"""
tick_times = []
for leg in self.spread.legs.values():
if not leg.tick:
return False
tick_times.append(leg.tick.datetime)
# 计算最大时间差
max_time = max(tick_times)
min_time = min(tick_times)
delay = (max_time - min_time).total_seconds() * 1000
return delay <= max_delay_ms
您需要在策略层面添加额外保护:
模拟与实盘的差异提醒我们:
希望这个解答能帮您理解 VeighNa 在处理多合约数据同步时的设计思路!如果还有疑问,欢迎继续讨论。