VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 273
声望: 4

1:老师您好,咨询一下,no_ui无人值守模式,如果想要在交易时间结束的那一刻,同步持仓数据和变量,除了把sync_data函数写入到策略外,还有其他方法在no_ui脚本中来实现这个功能吗?

Member
avatar
加入于:
帖子: 6060
声望: 375

你是想缓存输出日志吗?

Member
avatar
加入于:
帖子: 273
声望: 4

xiaohe wrote:

你是想缓存输出日志吗?

1:老师您好,我想把持仓数据和变量保存到硬盘

Member
avatar
加入于:
帖子: 6060
声望: 375

哦哦,那sync_data会自动帮你存的呀

Member
avatar
加入于:
帖子: 4
声望: 0

xiaohe wrote:

哦哦,那sync_data会自动帮你存的呀
你好,我想问一下,如果我开启了多个cta策略,每个策略里都使用了sync_data,是不是会出现,保存的数据没保存成功的情况,比如A策略和B策略刚好同时调用了sync_data,其中A策略保存成功了更新了数据,B策略没有更新数据

Super Moderator
avatar
加入于:
帖子: 119
声望: 13

VeighNa 的架构是:单进程 + 事件引擎单线程处理策略回调

具体来说:

组件 线程模型 说明
EventEngine 单线程处理事件 所有策略回调在此线程中顺序执行
CTP 行情/交易 API 独立线程 接收数据后将事件放入队列
CTA 策略引擎 运行在 EventEngine 线程 处理 tick/order/trade 事件
策略初始化 线程池(max_workers=1) 仅初始化时使用

事件处理流程

CTP 行情/交易线程(独立线程)
    ↓ 收到数据
    ↓ 调用 gateway.on_tick() / on_trade()
    ↓ 将 Event 放入 EventEngine._queue
    ↓
EventEngine._thread(单线程)
    ↓ 从队列取出 Event
    ↓ 调用注册的 handler
    ↓ 如:CtaEngine.process_trade_event()
    ↓
    ↓ 遍历该合约的所有策略
    ↓ 依次调用 strategy.on_trade()
    ↓ 调用 sync_strategy_data()(立即同步到文件)

关键代码验证

EventEngine 是单线程处理

# vnpy/event/engine.py
class EventEngine:
    def __init__(self, interval: int = 1) -> None:
        self._queue: Queue = Queue()
        self._thread: Thread = Thread(target=self._run)  # 单线程

    def _run(self) -> None:
        """单线程循环处理事件"""
        while self._active:
            event: Event = self._queue.get(block=True, timeout=1)
            self._process(event)  # 同步处理,处理完一个才取下一个

    def _process(self, event: Event) -> None:
        """分发事件给所有 handler"""
        if event.type in self._handlers:
            [handler(event) for handler in self._handlers[event.type]]

CTA 引擎处理多个策略是顺序的

# vnpy_ctastrategy/engine.py
def process_tick_event(self, event: Event) -> None:
    tick: TickData = event.data
    strategies: list = self.symbol_strategy_map[tick.vt_symbol]

    # 顺序遍历所有订阅该合约的策略
    for strategy in strategies:
        if strategy.inited:
            self.call_strategy_func(strategy, strategy.on_tick, tick)

成交事件处理(自动调用 sync_data)

def process_trade_event(self, event: Event) -> None:
    trade: TradeData = event.data
    # ...
    strategy: CtaTemplate = self.orderid_strategy_map.get(trade.vt_orderid)

    # 更新持仓
    if trade.direction == Direction.LONG:
        strategy.pos += trade.volume
    else:
        strategy.pos -= trade.volume

    # 调用策略回调
    self.call_strategy_func(strategy, strategy.on_trade, trade)

    # 同步数据 - 立即执行,完成后才返回
    self.sync_strategy_data(strategy)

回答您的问题

多个策略同时调用 sync_data 会丢失数据吗?

不会,原因:

  1. 事件引擎单线程:所有 process_trade_event 调用都是顺序执行的
  2. 成交事件是独立的:即使策略 A 和 B 同时成交,它们会产生两个 TradeEvent,事件引擎会依次处理
  3. 数据是累积的self.strategy_data 是共享字典,先保存的 A 的数据在内存中,后保存的 B 的数据会基于已更新的字典写入
执行顺序:
1. TradeEvent(A) → process_trade_event(A) → sync_data(A) → 文件包含 A 的数据
2. TradeEvent(B) → process_trade_event(B) → sync_data(B) → 文件包含 A+B 的数据

即使极端情况下两个成交在同一个 tick 到达,由于 CTP 是分别回调的,事件也会排队处理,不会并发。


唯一的多线程场景

只有策略初始化使用了线程池:

def init_strategy(self, strategy_name: str) -> Future:
    return self.init_executor.submit(self._init_strategy, strategy_name)

但初始化时 trading=False,不会调用 sync_data,所以不影响数据同步的安全性。

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】