VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 9
声望: 0

订阅了A,B
A的数据先到,但此时你并不知道B数据在同一时间戳里会不会到,然后有2种情况
1 同一时间戳,B的数据不会到,采用B旧数据就可以套利
2 同一时间戳,B的数据后来到了,应该在B新数据来时套利

问题是,如果是1,看似B旧数据可以套利,但你不知道新数据是否可以
如果是2,B数据如果不来,但旧数据可以套利
怎么解?

Member
avatar
加入于:
帖子: 9
声望: 0

而且还有个问题,比如说我订阅了A B C 三个合约的tick。我用模拟账号发现 某一时间点回这3个合约的tick顺序是乱序的。但是正式账号 顺序永远是一致的。这是为什么?难道模拟环境是多线程环境?正式环境是单线程回tick的?

Super Moderator
avatar
加入于:
帖子: 107
声望: 12

这是一个非常典型的多合约数据同步问题,在套利策略开发中很常见。让我先查询 VeighNa 相关模块的处理方案。

好问题!您提到的是多合约套利策略中的数据同步时序问题,这在实盘交易中确实很常见。让我进一步查询 VeighNa 价差交易模块的源码实现:

完美! 现在我找到了关键代码。让我再查看价差数据计算的核心逻辑:

非常好! 现在我完全理解了您的问题。根据 VeighNa 价差交易模块的源码,我来给您详细解答:


问题解答

您遇到的是多合约套利中的经典数据同步问题,这个问题在实盘交易中普遍存在。

一、关于 tick 数据时序问题

1. VeighNa 的处理机制

从源码 engine.pyprocess_tick_event 方法可以看到:

def process_tick_event(self, event: Event) -> None:
    tick: TickData = event.data
    leg: LegData | None = self.legs.get(tick.vt_symbol, None)
    if not leg:
        return
    leg.update_tick(tick)  # 更新单腿tick数据

    for spread in self.symbol_spread_map[tick.vt_symbol]:
        # 只有能成功计算出价差盘口时,才会发送事件
        if spread.calculate_price():
            self.put_data_event(spread)

2. 价差计算逻辑 (来自 base.py)

def calculate_price(self) -> bool:
    """
    1. 如果各条腿价格均有效,则计算成功,返回True
    2. 反之只要有一条腿的价格无效,则计算失败,返回False
    """
    for variable, leg in self.variable_legs.items():
        # 关键:如果有任何一条腿没有数据,直接返回False
        if not leg.bid_volume or not leg.ask_volume:
            self.clear_price()
            return False  # 计算失败,不发送事件

    # 所有腿都有数据时才计算价差
    self.bid_price = self.parse_formula(self.price_code, bid_data)
    self.ask_price = self.parse_formula(self.price_code, ask_data)
    return True

二、VeighNa 的解决方案:

核心策略:被动等待 + 使用旧数据填充

  1. 每条腿维护独立的 tick 缓存

    • 每个 LegData 对象缓存最新的 tick 数据
    • 新 tick 到达时更新对应腿的缓存
  2. 只在所有腿都有数据时才计算价差

    • 如果 B 的新 tick 没来,会使用 B 的旧 tick 数据
    • 如果 B 从未收到过 tick,则不计算价差
  3. 每次任何一条腿更新都尝试计算

    • A 的 tick 到达 → 尝试用 A新+B旧 计算价差
    • B 的 tick 到达 → 尝试用 A旧+B新 计算价差

三、实盘 vs 模拟环境的 tick 顺序差异

您观察到的现象确实存在:

环境 tick 顺序 原因
模拟环境 乱序 多线程推送,各合约独立线程
实盘环境 一致 单线程顺序推送,交易所排序

原因分析:

  1. 实盘环境(如 CTP):

    • 交易所系统通常是单线程推送行情
    • 同一时间戳的多个合约按固定顺序推送
    • 保证了确定性的接收顺序
  2. 模拟环境(如 SimNow):

    • 可能使用多线程并发推送行情
    • 不同合约的 tick 由不同线程处理
    • 导致接收顺序不确定性

四、实际交易建议

对于您的两种情况:

情况1: B的数据不来,用旧数据套利

# VeighNa 的做法:
# - 使用 B 的旧 tick 数据
# - 计算出的价差可能不够精确
# - 但可以立即做出交易决策

# 风险点:
# - 如果 B 旧数据已经过时(如跳价),可能误判套利机会

情况2: B的数据后来到,应该用新数据套利

# VeighNa 的做法:
# - B 新 tick 到达时,会再次触发计算
# - 再次计算时使用 B 的新数据
# - 如果价差仍然满足条件,会再次发出信号

# 优化建议:
# - 在策略层面增加去重逻辑
# - 记录短时间内重复的套利机会
# - 或设置最小时间间隔过滤

五、建议的解决方案

方案1: 增加数据新鲜度检查

from datetime import datetime, timedelta

class SpreadStrategy:
    def __init__(self):
        self.tick_timeout = timedelta(seconds=3)  # tick超时时间

    def on_spread_data(self):
        # 检查所有腿tick的新鲜度
        now = datetime.now()
        for leg in self.spread.legs.values():
            if not leg.tick:
                return  # 腿从未收到tick

            age = now - leg.tick.datetime
            if age > self.tick_timeout:
                self.write_log(f"{leg.vt_symbol} tick数据过期: {age}")
                return  # 数据过期,不交易

        # 所有数据都新鲜,执行套利逻辑
        self.execute_arbitrage()

方案2: 设置交易冷却期

class SpreadStrategy:
    def __init__(self):
        self.last_signal_time = None
        self.signal_interval = 1.0  # 最小信号间隔(秒)

    def on_spread_data(self):
        now = datetime.now()

        # 检查是否在冷却期内
        if self.last_signal_time:
            elapsed = (now - self.last_signal_time).total_seconds()
            if elapsed < self.signal_interval:
                return  # 冷却期内,忽略信号

        # 执行套利
        if self.check_arbitrage_opportunity():
            self.execute_trade()
            self.last_signal_time = now

方案3: 使用同一时间戳过滤

def on_spread_data(self):
    # 检查所有腿的tick时间戳是否接近
    if not self.check_tick_sync():
        return

    # 执行套利
    self.execute_arbitrage()

def check_tick_sync(self, max_delay_ms: int = 500) -> bool:
    """检查各腿tick时间戳是否同步"""
    tick_times = []
    for leg in self.spread.legs.values():
        if not leg.tick:
            return False
        tick_times.append(leg.tick.datetime)

    # 计算最大时间差
    max_time = max(tick_times)
    min_time = min(tick_times)
    delay = (max_time - min_time).total_seconds() * 1000

    return delay <= max_delay_ms

六、总结

  1. VeighNa 的默认机制是"使用最新可用数据",即使部分腿的数据可能是旧的
  2. 这是务实的选择,在高频交易中,等待所有腿同步更新往往会错过机会
  3. 您需要在策略层面添加额外保护:

    • 数据新鲜度检查
    • 信号去重
    • 时间戳同步验证
    • 交易冷却期
  4. 模拟与实盘的差异提醒我们:

    • 模拟测试要考虑多线程乱序情况
    • 策略要有足够的健壮性
    • 不要依赖tick的固定顺序

希望这个解答能帮您理解 VeighNa 在处理多合约数据同步时的设计思路!如果还有疑问,欢迎继续讨论。

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】