1:老师您好,咨询一下,no_ui无人值守模式,如果想要在交易时间结束的那一刻,同步持仓数据和变量,除了把sync_data函数写入到策略外,还有其他方法在no_ui脚本中来实现这个功能吗?
1:老师您好,咨询一下,no_ui无人值守模式,如果想要在交易时间结束的那一刻,同步持仓数据和变量,除了把sync_data函数写入到策略外,还有其他方法在no_ui脚本中来实现这个功能吗?
你是想缓存输出日志吗?
哦哦,那sync_data会自动帮你存的呀
xiaohe wrote:
哦哦,那sync_data会自动帮你存的呀
你好,我想问一下,如果我开启了多个cta策略,每个策略里都使用了sync_data,是不是会出现,保存的数据没保存成功的情况,比如A策略和B策略刚好同时调用了sync_data,其中A策略保存成功了更新了数据,B策略没有更新数据
VeighNa 的架构是:单进程 + 事件引擎单线程处理策略回调
具体来说:
| 组件 | 线程模型 | 说明 |
|---|---|---|
| EventEngine | 单线程处理事件 | 所有策略回调在此线程中顺序执行 |
| CTP 行情/交易 API | 独立线程 | 接收数据后将事件放入队列 |
| CTA 策略引擎 | 运行在 EventEngine 线程 | 处理 tick/order/trade 事件 |
| 策略初始化 | 线程池(max_workers=1) | 仅初始化时使用 |
CTP 行情/交易线程(独立线程)
↓ 收到数据
↓ 调用 gateway.on_tick() / on_trade()
↓ 将 Event 放入 EventEngine._queue
↓
EventEngine._thread(单线程)
↓ 从队列取出 Event
↓ 调用注册的 handler
↓ 如:CtaEngine.process_trade_event()
↓
↓ 遍历该合约的所有策略
↓ 依次调用 strategy.on_trade()
↓ 调用 sync_strategy_data()(立即同步到文件)
# vnpy/event/engine.py
class EventEngine:
def __init__(self, interval: int = 1) -> None:
self._queue: Queue = Queue()
self._thread: Thread = Thread(target=self._run) # 单线程
def _run(self) -> None:
"""单线程循环处理事件"""
while self._active:
event: Event = self._queue.get(block=True, timeout=1)
self._process(event) # 同步处理,处理完一个才取下一个
def _process(self, event: Event) -> None:
"""分发事件给所有 handler"""
if event.type in self._handlers:
[handler(event) for handler in self._handlers[event.type]]
# vnpy_ctastrategy/engine.py
def process_tick_event(self, event: Event) -> None:
tick: TickData = event.data
strategies: list = self.symbol_strategy_map[tick.vt_symbol]
# 顺序遍历所有订阅该合约的策略
for strategy in strategies:
if strategy.inited:
self.call_strategy_func(strategy, strategy.on_tick, tick)
def process_trade_event(self, event: Event) -> None:
trade: TradeData = event.data
# ...
strategy: CtaTemplate = self.orderid_strategy_map.get(trade.vt_orderid)
# 更新持仓
if trade.direction == Direction.LONG:
strategy.pos += trade.volume
else:
strategy.pos -= trade.volume
# 调用策略回调
self.call_strategy_func(strategy, strategy.on_trade, trade)
# 同步数据 - 立即执行,完成后才返回
self.sync_strategy_data(strategy)
多个策略同时调用 sync_data 会丢失数据吗?
不会,原因:
process_trade_event 调用都是顺序执行的TradeEvent,事件引擎会依次处理self.strategy_data 是共享字典,先保存的 A 的数据在内存中,后保存的 B 的数据会基于已更新的字典写入执行顺序:
1. TradeEvent(A) → process_trade_event(A) → sync_data(A) → 文件包含 A 的数据
2. TradeEvent(B) → process_trade_event(B) → sync_data(B) → 文件包含 A+B 的数据
即使极端情况下两个成交在同一个 tick 到达,由于 CTP 是分别回调的,事件也会排队处理,不会并发。
只有策略初始化使用了线程池:
def init_strategy(self, strategy_name: str) -> Future:
return self.init_executor.submit(self._init_strategy, strategy_name)
但初始化时 trading=False,不会调用 sync_data,所以不影响数据同步的安全性。