VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 16
声望: 0

Windows 无界面cmd窗口下跑实盘策略,在初始化EventEngine 时传入的 interval 较大时,比如这里设置的15分钟:

def init_engine(self):
self.event_engine = EventEngine(interval= 900)
self.main_engine = MainEngine(self.event_engine)

使用 ctrl + C 中止进程时调用:self.main_engine.close(),会出现无法退出的情况。经调试发现,在 EventEngine 类中 self._timer.join() 会卡住,它在等待 _run_timer(self) 方法中的 sleep(self._interval) 结束:

def _run_timer(self) -> None:
"""
Sleep by interval second(s) and then generate a timer event.
"""
while self._active:
sleep(self._interval)
event: Event = Event(EVENT_TIMER)
self.put(event)

def start(self) -> None:
    """
    Start event engine to process events and generate timer events.
    """
    self._active = True
    self._thread.start()
    self._timer.start()

def stop(self) -> None:
    """
    Stop event engine.
    """
    self._active = False
    self._timer.join()
    self._thread.join()

请问该如何解决这个问题?是否会考虑优化一下线程的停止机制。

Super Moderator
avatar
加入于:
帖子: 120
声望: 13

我来帮你分析这个问题。首先让我查询一下 VeighNa 中 EventEngine 的相关文档和源代码。

你分析得很准确,这是 EventEngine 中 timer 线程的一个设计缺陷。当 interval 设置较大时,sleep() 会阻塞线程,导致 stop() 方法中的 join() 长时间等待。

问题根源

def _run_timer(self) -> None:
    while self._active:
        sleep(self._interval)  # 阻塞在这里,无法及时响应 _active = False
        event: Event = Event(EVENT_TIMER)
        self.put(event)

当调用 stop() 时:

  1. self._active = False 被设置
  2. 但 timer 线程正阻塞在 sleep(900)
  3. self._timer.join() 必须等待 900 秒才能返回

解决方案

推荐使用 threading.Event 替代 sleep,实现可中断的等待:

"""
Event-driven framework of VeighNa framework.
"""

from collections import defaultdict
from collections.abc import Callable
from queue import Empty, Queue
from threading import Event, Thread
from typing import Any

EVENT_TIMER = "eTimer"

class Event:
    """
    Event object consists of a type string which is used
    by event engine for distributing event, and a data
    object which contains the real data.
    """

    def __init__(self, type: str, data: Any = None) -> None:
        """"""
        self.type: str = type
        self.data: Any = data

# Defines handler function to be used in event engine.
HandlerType = Callable[[Event], None]

class EventEngine:
    """
    Event engine distributes event object based on its type
    to those handlers registered.

    It also generates timer event by every interval seconds,
    which can be used for timing purpose.
    """

    def __init__(self, interval: int = 1) -> None:
        """
        Timer event is generated every 1 second by default, if
        interval not specified.
        """
        self._interval: int = interval
        self._queue: Queue = Queue()
        self._active: bool = False
        self._thread: Thread = Thread(target=self._run)
        self._timer: Thread = Thread(target=self._run_timer)
        self._handlers: defaultdict = defaultdict(list)
        self._general_handlers: list = []
        self._stop_event: Event = Event()  # 新增:用于中断 timer 线程

    def _run(self) -> None:
        """
        Get event from queue and then process it.
        """
        while self._active:
            try:
                event: Event = self._queue.get(block=True, timeout=1)
                self._process(event)
            except Empty:
                pass

    def _process(self, event: Event) -> None:
        """
        First distribute event to those handlers registered listening
        to this type.

        Then distribute event to those general handlers which listens
        to all types.
        """
        if event.type in self._handlers:
            [handler(event) for handler in self._handlers[event.type]]

        if self._general_handlers:
            [handler(event) for handler in self._general_handlers]

    def _run_timer(self) -> None:
        """
        Sleep by interval second(s) and then generate a timer event.
        """
        self._stop_event.clear()  # 清除停止信号
        while self._active:
            # 使用 wait 代替 sleep,可被 set() 立即中断
            if self._stop_event.wait(self._interval):
                break  # 收到停止信号,立即退出
            event: Event = Event(EVENT_TIMER)
            self.put(event)

    def start(self) -> None:
        """
        Start event engine to process events and generate timer events.
        """
        self._active = True
        self._stop_event.clear()
        self._thread.start()
        self._timer.start()

    def stop(self) -> None:
        """
        Stop event engine.
        """
        self._active = False
        self._stop_event.set()  # 发送停止信号,立即唤醒 timer 线程
        self._timer.join()
        self._thread.join()

    def put(self, event: Event) -> None:
        """
        Put an event object into event queue.
        """
        self._queue.put(event)

    def register(self, type: str, handler: HandlerType) -> None:
        """
        Register a new handler function for a specific event type. Every
        function can only be registered once for each event type.
        """
        handler_list: list = self._handlers[type]
        if handler not in handler_list:
            handler_list.append(handler)

    def unregister(self, type: str, handler: HandlerType) -> None:
        """
        Unregister an existing handler function from event engine.
        """
        handler_list: list = self._handlers[type]

        if handler in handler_list:
            handler_list.remove(handler)

        if not handler_list:
            self._handlers.pop(type)

    def register_general(self, handler: HandlerType) -> None:
        """
        Register a new handler function for all event types. Every
        function can only be registered once for each event type.
        """
        if handler not in self._general_handlers:
            self._general_handlers.append(handler)

    def unregister_general(self, handler: HandlerType) -> None:
        """
        Unregister an existing general handler function.
        """
        if handler in self._general_handlers:
            self._general_handlers.remove(handler)

关键改动说明

改动点 说明
新增 self._stop_event: Event 用于线程间通信的事件对象
_run_timer() 使用 wait() Event.wait(timeout) 可被 set() 立即中断,而 sleep() 不可中断
stop() 中调用 set() 立即唤醒正在 wait() 的 timer 线程

其他备选方案

如果无法修改源码,也可以在应用层设置 join 超时:

def stop(self) -> None:
    """带超时的停止方法"""
    self._active = False
    self._timer.join(timeout=5)  # 最多等待5秒
    if self._timer.is_alive():
        # 超时后强制继续,或记录日志
        print("Warning: Timer thread did not stop gracefully")
    self._thread.join(timeout=5)

参考来源vnpy/event/engine.py

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】