我来帮你分析这个问题。首先让我查询一下 VeighNa 中 EventEngine 的相关文档和源代码。
你分析得很准确,这是 EventEngine 中 timer 线程的一个设计缺陷。当 interval 设置较大时,sleep() 会阻塞线程,导致 stop() 方法中的 join() 长时间等待。
问题根源
def _run_timer(self) -> None:
while self._active:
sleep(self._interval) # 阻塞在这里,无法及时响应 _active = False
event: Event = Event(EVENT_TIMER)
self.put(event)
当调用 stop() 时:
self._active = False 被设置
- 但 timer 线程正阻塞在
sleep(900) 中
self._timer.join() 必须等待 900 秒才能返回
解决方案
推荐使用 threading.Event 替代 sleep,实现可中断的等待:
"""
Event-driven framework of VeighNa framework.
"""
from collections import defaultdict
from collections.abc import Callable
from queue import Empty, Queue
from threading import Event, Thread
from typing import Any
EVENT_TIMER = "eTimer"
class Event:
"""
Event object consists of a type string which is used
by event engine for distributing event, and a data
object which contains the real data.
"""
def __init__(self, type: str, data: Any = None) -> None:
""""""
self.type: str = type
self.data: Any = data
# Defines handler function to be used in event engine.
HandlerType = Callable[[Event], None]
class EventEngine:
"""
Event engine distributes event object based on its type
to those handlers registered.
It also generates timer event by every interval seconds,
which can be used for timing purpose.
"""
def __init__(self, interval: int = 1) -> None:
"""
Timer event is generated every 1 second by default, if
interval not specified.
"""
self._interval: int = interval
self._queue: Queue = Queue()
self._active: bool = False
self._thread: Thread = Thread(target=self._run)
self._timer: Thread = Thread(target=self._run_timer)
self._handlers: defaultdict = defaultdict(list)
self._general_handlers: list = []
self._stop_event: Event = Event() # 新增:用于中断 timer 线程
def _run(self) -> None:
"""
Get event from queue and then process it.
"""
while self._active:
try:
event: Event = self._queue.get(block=True, timeout=1)
self._process(event)
except Empty:
pass
def _process(self, event: Event) -> None:
"""
First distribute event to those handlers registered listening
to this type.
Then distribute event to those general handlers which listens
to all types.
"""
if event.type in self._handlers:
[handler(event) for handler in self._handlers[event.type]]
if self._general_handlers:
[handler(event) for handler in self._general_handlers]
def _run_timer(self) -> None:
"""
Sleep by interval second(s) and then generate a timer event.
"""
self._stop_event.clear() # 清除停止信号
while self._active:
# 使用 wait 代替 sleep,可被 set() 立即中断
if self._stop_event.wait(self._interval):
break # 收到停止信号,立即退出
event: Event = Event(EVENT_TIMER)
self.put(event)
def start(self) -> None:
"""
Start event engine to process events and generate timer events.
"""
self._active = True
self._stop_event.clear()
self._thread.start()
self._timer.start()
def stop(self) -> None:
"""
Stop event engine.
"""
self._active = False
self._stop_event.set() # 发送停止信号,立即唤醒 timer 线程
self._timer.join()
self._thread.join()
def put(self, event: Event) -> None:
"""
Put an event object into event queue.
"""
self._queue.put(event)
def register(self, type: str, handler: HandlerType) -> None:
"""
Register a new handler function for a specific event type. Every
function can only be registered once for each event type.
"""
handler_list: list = self._handlers[type]
if handler not in handler_list:
handler_list.append(handler)
def unregister(self, type: str, handler: HandlerType) -> None:
"""
Unregister an existing handler function from event engine.
"""
handler_list: list = self._handlers[type]
if handler in handler_list:
handler_list.remove(handler)
if not handler_list:
self._handlers.pop(type)
def register_general(self, handler: HandlerType) -> None:
"""
Register a new handler function for all event types. Every
function can only be registered once for each event type.
"""
if handler not in self._general_handlers:
self._general_handlers.append(handler)
def unregister_general(self, handler: HandlerType) -> None:
"""
Unregister an existing general handler function.
"""
if handler in self._general_handlers:
self._general_handlers.remove(handler)
关键改动说明
| 改动点 |
说明 |
新增 self._stop_event: Event |
用于线程间通信的事件对象 |
_run_timer() 使用 wait() |
Event.wait(timeout) 可被 set() 立即中断,而 sleep() 不可中断 |
stop() 中调用 set() |
立即唤醒正在 wait() 的 timer 线程 |
其他备选方案
如果无法修改源码,也可以在应用层设置 join 超时:
def stop(self) -> None:
"""带超时的停止方法"""
self._active = False
self._timer.join(timeout=5) # 最多等待5秒
if self._timer.is_alive():
# 超时后强制继续,或记录日志
print("Warning: Timer thread did not stop gracefully")
self._thread.join(timeout=5)
参考来源:vnpy/event/engine.py
基于RAG技术开发的VeighNa AI助手,你的个人专属 Desk Quant