VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy

置顶主题

VeighNa发布v4.2.0 - RiskManager风控模块重构升级

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2025-11-10
 
上周发布了VeighNa的4.2.0版本,本次更新的主要内容是对RiskManager交易事前风控模块进行了重构升级,同时调整了框架的日志输出落地功能,完整适配期货程序化交易监管新规中对于程序化交易系统的功能要求。

对于已经安装了VeighNa Studio 4.0版本的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载VeighNa Studio-4.2.0,体验一键安装的量化交易Python发行版,下载链接:

https://download.vnpy.com/veighna_studio-4.2.0.exe

 

RiskManager风控模块

 

标准化风控规则模板

新版 vnpy_riskmanager 模块设计的核心是标准化的风控规则模板 RuleTemplate。任何自定义风控规则都必须继承该基类,这保证了所有规则都具备统一的接口,使得风控引擎(RiskEngine)能够对其进行动态发现、加载和执行。

RuleTemplate 为风控规则定义了完整的生命周期和核心逻辑方法。创建一个新的风控规则,即是创建一个继承自 RuleTemplate 的子类,并根据实际需求实现其中的相关方法。

核心组件

一个标准的风控规则类主要包含以下几个部分:

  • name: str: 规则的唯一中文名称,用于图形界面上该规则相关的信息显示。
  • parameters: dict[str, str]: 定义规则的可配置参数。该字典的键为参数的英文名,值为其在图形界面上显示的中文名,方便用户理解和修改。
  • variables: dict[str, str]: 定义规则需要实时监控的状态变量,同样字典的键为变量的英文名,值为其在图形界面上显示的中文名。

核心方法

  • on_init(self) -> None: 规则的初始化方法。在被风控引擎加载时调用,主要用于设定参数的默认值以及初始化监控变量的初始状态。
  • on_order(self, order: OrderData) -> None: 用于处理委托状态更新的回调方法。规则可在此方法内更新内部状态,例如跟踪活动委托的数量。
  • on_trade(self, trade: TradeData) -> None: 用于处理成交回报的回调方法。规则可在此方法内统计累计成交信息等。
  • on_tick(self, tick: TickData) -> None: 用于处理行情切片数据的回调方法。适用于需要基于实时行情进行判断的规则,但需注意其可能会因频繁调用而带来的性能开销。
  • check_allowed(self, req: OrderRequest, gateway_name: str) -> bool: 风控检查的核心逻辑实现。每当交易系统产生新的委托请求(OrderRequest)时,风控引擎都会调用此方法进行检查。若订单允许通过,则方法返回 True;若订单需要被拦截,则应先调用 self.write_log() 记录拦截的具体原因,随后返回 False

Python 规则示例

以下是一个完整的纯 Python 规则示例代码,实现了对单笔委托数量的上限限制。

from vnpy.trader.object import OrderRequest, OrderData
from ..template import RuleTemplate

class OrderVolumeRule(RuleTemplate):
    """单笔委托手数限制规则"""

    # 规则的中文名称
    name: str = "委托数量检查"

    # 定义可配置的参数
    parameters: dict[str, str] = {
        "max_volume": "单笔最大委托量"
    }

    # 定义需监控的变量(可为空)
    variables: dict[str, str] = {}

    def on_init(self) -> None:
        """初始化"""
        self.max_volume: int = 100         # 设置参数的默认值

    def check_allowed(self, req: OrderRequest, gateway_name: str) -> bool:
        """核心风控逻辑"""
        if req.volume > self.max_volume:
            msg = f"委托数量{req.volume}超过参数限制{self.max_volume}:{req}"
            self.write_log(msg)
            returnFalse

        returnTrue

模块的内置风控规则

vnpy_riskmanager 模块提供了一系列标准化的内置风控规则,覆盖了交易过程中的多个关键风险点。所有内置规则均提供纯 Python 和 Cython 两种实现,风控引擎在加载时会优先使用性能更优的 Cython 版本。

  • ActiveOrderRule (活动委托数量上限)此规则用于限制全局的活动(可撤)委托数量。当活动委托笔数超过设定的 active_order_limit 阈值时,新的委托请求将被拦截。它通过监听 on_order 事件来实时跟踪委托状态的变更,从而维护准确的活动委托计数。

  • DailyLimitRule (每日流控上限)该规则用于对整个交易日内的委托、撤单、成交行为进行总次数和单个合约次数的双重流控。它包含六个可配置参数:total_order_limit (汇总委托上限), total_cancel_limit (汇总撤单上限), total_trade_limit (汇总成交上限), contract_order_limit (合约委托上限), contract_cancel_limit (合约撤单上限), contract_trade_limit (合约成交上限)。在发单时,会检查各项累计值是否超限;在收到委托、撤单和成交回报时,会更新相应的计数。

  • DuplicateOrderRule (重复报单检查)用于防范因系统或人为失误导致的重复下单。该规则会统计历史上所有发出委托的缓存,当一个新的委托请求在合约、类型、方向、开平、价格、数量等字段上完全相同时,则会累加其重复次数。如果该次数超过 duplicate_order_limit 上限(默认为10次),委托就会被执行拦截。

  • OrderSizeRule (单笔委托规模上限)这是一个基础但极为重要的风控规则,用于防止“乌龙指”。它包含两个维度的限制:order_volume_limit 用于限制任何单笔委托请求的最大允许数量;order_value_limit 用于限制该笔委托的总价值(数量 价格 合约乘数)。任何一个维度超限都会导致委托被拦截。

  • OrderValidityRule (委托指令合法性监控)此规则在委托发送至交易接口前,对其指令的合法性进行预检查,提前拦截无效委托,减轻后端系统压力。检查内容包括:

    1. 通过 main_engine.get_contract 检查委托的合约是否存在。
    2. 检查委托价格是否为合约最小价格变动 pricetick 的整数倍。
    3. 检查委托数量是否超过了合约规定的单笔最大下单量 contract.max_volume
    4. 检查委托数量是否低于合约规定的单笔最小下单量 contract.min_volume

Cython 性能优化加速

风控引擎对所有规则的检查是串行执行的,在交易繁忙时,多条纯Python规则累加的检查延迟可能成为性能瓶颈。为了解决这个问题,vnpy_riskmanager模块支持使用Cython对代码进行性能优化。开发者可以将原有的纯Python风控规则,通过Cython语法重构为C语言级别的高性能版本,从而将检查延迟降低到微秒级。

Cython 规则的实现方式

编写 Cython 风控规则的关键是将核心逻辑(尤其是高频调用的check_allowed函数)放入一个 cdef class 中,同时保留一个标准的 Python class 作为外层包装器,以供风控引擎识别和加载。

以下是内置风控规则中 OrderSizeRule 的 Cython 版本实现代码作为示例:

# cython: language_level=3
from vnpy_riskmanager.template cimport RuleTemplate


cdef class OrderSizeRuleCy(RuleTemplate):
    """委托规模检查风控规则 (Cython 版本)"""

    cdef public int order_volume_limit
    cdef public float order_value_limit

    cpdef void on_init(self):
        """初始化"""
        self.order_volume_limit = 500
        self.order_value_limit = 1_000_000

    cpdef bint check_allowed(self, object req, str gateway_name):
        """检查是否允许委托"""
        cdef object contract
        cdef float order_value

        if req.volume > self.order_volume_limit:
            self.write_log(f"委托数量{req.volume}超过上限{self.order_volume_limit}:{req}")
            return False

        contract = self.get_contract(req.vt_symbol)
        if contract and req.price:      # 只考虑限价单
            order_value = req.volume * req.price * contract.size
            if order_value > self.order_value_limit:
                self.write_log(f"委托价值{order_value}超过上限{self.order_value_limit}:{req}")
                return False

        return True


class OrderSizeRule(OrderSizeRuleCy):
    """委托规模检查规则的Python包装类"""

    name: str = "委托规模检查"

    parameters: dict[str, str] = {
        "order_volume_limit": "委托数量上限",
        "order_value_limit": "委托价值上限",
    }

编译自定义 Cython 规则

对于自定义的 Cython 规则可以通过一个独立的编译脚本来编译生成pyd文件(注意编译过程中会需要用到 C++ 编译器,如 Windows 上的 Visual Studio):

  1. 创建 build_rule.py: 在项目根目录下创建此脚本。

    from setuptools import setup, Extension
    from Cython.Build import cythonize
    
    # 注意:需与 .pyx 文件名一致
    rule_name = "order_size_rule_cy"
    
    extensions = [
        Extension(
            name=f"vnpy_riskmanager.rules.{rule_name}",
            sources=[f"vnpy_riskmanager/rules/{rule_name}.pyx"],
        )
    ]
    
    setup(
        ext_modules=cythonize(
            extensions,
            compiler_directives={"language_level": "3"}
        )
    )
  2. 执行编译: 在项目根目录下打开终端,运行编译命令。

    python build_rule.py build_ext --inplace

    --inplace 参数会使编译生成的二进制文件(.pyd.so)直接输出到 vnpy_riskmanager/rules 目录下。编译完成后重启程序,RiskEngine 会自动发现并优先加载这个高性能的 Cython 规则。

风控信息通知机制

当一个委托请求因为触发了某项风控规则而被拦截时,vnpy_riskmanager 模块会通过多种方式立即向用户发出警报,确保交易员能够第一时间注意到异常情况并进行处理。

声音报警提醒

一旦有委托被拦截,风控引擎会立刻调用操作系统内置的提示音发出警报,确保即使用户的注意力不在交易界面上,也能迅速察觉到风控事件的发生,从而避免错过关键的风险信息。

气泡弹窗通知

在声音报警的同时,一个可视化的气泡弹窗会从系统托盘区域(通常是屏幕右下角)弹出,该弹窗会明确展示具体的拦截原因,例如“委托数量100超过参数限制50”,气泡弹窗会持续显示一段时间(默认30秒)。

日志信息输出

为了确保风险事件的可追溯性,所有拦截信息在触发实时报警的同时,也会通过VeighNa框架的标准化日志系统进行输出,具体包括:主界面的日志监控组件、启动系统的命令行终端,以及本地日志文件。

 

核心框架日志输出落地

 

为了满足监管新规中的要求,确保程序化交易系统的关键操作都有据可查,VeighNa的4.2.0版本在日志记录方面进行了全面增强,主要更新包括:

  • 日志默认启动与持久化:日志功能已默认激活并持久化,INFO级别的系统常规操作信息将同步输出到终端日志文件
  • 核心引擎操作日志:核心引擎 MainEngine 会自动记录关键交易生命周期事件,覆盖连接登录、行情订阅、委托下单委托撤单等操作。
  • 策略模块日志自动注册:各大策略模块(如 CtaStrategyPortfolioStrategy 等)的日志将自动注册并转发至核心日志引擎,实现对策略层行为的全面监控。

 

CHANGELOG

 

新增

  1. vnpy_riskmanager模块重构

    a. 采用插件式设计,提供标准化风控规则开发模板
    b. 支持中国期货程序化交易系统监管要求中的风控规则
    c. 输出拦截日志后播放提示声音,目前仅支持Windows系统
    d. 使用系统托盘栏图标,弹出交易风控拦截日志气泡框
    e. 提供Cython版本的风控规则开发模板以及具体规则实现
    f. 支持自动扫描加载用户自定义风控规则(放置于Trader目录下的rules文件夹中)

  2. vnpy_polygon数据服务接口,支持海外股票、期货、期权等资产品种的历史数据获取

调整

  1. vnpy_ctp更新底层API到6.7.11(生产和测试统一版本)
  2. vnpy_dolphindb升级适配4.0版本
  3. vnpy_tqsdk简化时间戳的格式化方法,提高效率
  4. vnpy_sqlite支持使用配置文件中声明的数据文件
  5. vnpy_taos优化get_bar_overview和get_tick_overview函数性能(直接访问超级表的tags)
  6. vnpy_spreadtrading / vnpy_portfoliostrategy / vnpy_scripttrader 注册模块日志输出到日志引擎
  7. vnpy_optionmaster优化定价模型中期权最小价值边界判断的逻辑
  8. 全局配置中的log.level改为INFO(10),默认启用详细日志记录输出
  9. MainEngine增加交易功能函数的调用日志输出
  10. vnpy.alpha中的Dataset增加process_data函数,便于测试不同数据处理器的效果
  11. vnpy_ib更新支持ibapi至10.40.1版本

修复

  1. vnpy_gm修复中金所和大商所合约代码转换的问题
  2. vnpy_rqdata修复RqdataGateway中的行情订阅函数错误问题
  3. vnpy_optionmaster修复关闭窗口时的异常报错
  4. vnpy_portfoliostrategy修复遗传算法参数优化中的调用传参问题
  5. 修复Linux系统上的sdist安装问题:vnpy_mini / vnpy_sopt / vnpy_rohon / vnpy_tap / vnpy_tts
  6. vnpy_xt修复XtGateway断线重连时的传参错误
  7. vnpy_optionmaster修复black-76模型中theta计算公式的问题
  8. vnpy_postgresql修复写入主键冲突时数据不会更新的问题
     


windows上利用uv手动安装vnpy系统的完整过程

本文记录了利用python包管理生态管理工具uv,从源码恢复vnpy的运行环境的全过程。

一、采用uv作为python生态管理工具

python包管理生态中存在多种工具,如pip、pip-tools、poetry、conda等,各自具备一定功能。

uv新一代python生态管理工具,它是 Astral 公司推出的一款基于Rust编写的Python包管理工具,旨在成为“Python的 Cargo”。

它提供了快速、可靠且易用的包管理体验,在性能、兼容性和功能上都有出色表现,为 Python项目的开发和管理带来了新的选择。

为什么用uv

与其他 Python 中的包管理工具相比,uv 更像是一个全能选手,它的优势在于:

  1. 速度快:得益于 Rust,uv 工具的速度让人惊艳,比如安装依赖,速度比其他工具快很多
  2. 功能全面: uv 是“一站式服务”的工具,从安装 Python、管理虚拟环境,到安装和管理包,再到管理项目依赖,它统统都能处理得很好
  3. 前景光明:背后有风投公司 Astral支持,且采用了 MIT许可,即使未来出现问题,社区也有应对的办法

使用uv,也可以像 Node]s 或者 Rust 项目那样方便的管理依赖。

二、安装uv

windows上安装uv,在cmd窗口中执行下面命令:

powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"

注意:执行完毕后,会在C:\users\\<用户名称>\.local\bin目录下安装两个文件uv.exe和uvx.exe。

description

进入cmd或者powershell窗口,输入uv命令应该可以得到如下回应就表示uv已经安装成功了:

description

如果报告“uv不是内部或外部命令,也不是可运行的程序或批处理文件。”请打开“我的电脑”| “高级系统设置”| “环境变量”| “用户变量”| “Path”,把C:\users\\<用户名称>\.local\bin目录添加进去就可以。

description

description

三、下载vnpy 4.0源码

description
如图所示,从https://www.github.com下载到vnpy-master.zip源代码,解压后包含如下文件和目录:

description

四、恢复vnpy的运行环境

4.1 安装python 3.13.17

输入下面的命令,限定python版本3.13,实际安装的是python 3.13.17。

uv python install 3.13

4.2 重建虚拟环境

来到vnpy-master源码目录下,输入下面的目录创建虚拟环境目录,当前目录下会多出.venv目录,其中包含虚拟环境的配置文件。

cd d:\vnpy-master
uv venv

4.3 安装ta-lib

这一步是最关键的一步,我是好几天都无法通过这一步,最后在陈总——陈晓优的指导下才通过的。

先把ta-lib二进制包装了,注意这里必须先指明安装的源是https://pypi.vnpy.com。

uv pip install ta-lib --index=https://pypi.vnpy.com

4.4 安装vnpy核心模块

uv pip install . --index=https://pypi.vnpy.com

4.5 安装vnpy其他需要的模块

只有vnpy核心模块是不够的,还需要根据自己的需要安装其他的模块才能够运行实际的交易:

  • 如果采用ctp行情和交易接口需要安装vnpy_ctp模块
  • 如果采用米筐历史书需要安装vnpy_rqdata模块
  • 如果采用cta策略需要安装vnpy_ctastrategy模块
  • 如果要运行cta策略回测需要安装vnpy_ctastrategy模块
  • 如果使用mysql数据库作为数据存储pymysql和vnpy_mysql模块
  • 如果需要对本地数据存储进行管理 vnpy_datamanager 模块

可以合并也可以逐条执行如下命令:

uv add importlib_metadata pymysql
uv add vnpy_ctp vnpy_ctastrategy vnpy_ctabacktester vnpy_datamanager vnpy_mysql vnpy_rqdata

五、运行例子

5.1 运行example\verghna_trader\run.py

vnpy的源码中包含一个例子,位于example\verghna_trader子目录下,其中的run.py最具代表性,它演示了如何利用已经安装的核心和其他配套模块能够完成功能:

  • vntrader界面
  • CTA策略交易界面
  • CTA策略回测
  • 本地数据管理
  • 合约查询
cd d:\vnpy-master\examples\veighna_trader\
uv run run.py

vntrader界面

description

合约查询界面

description



本地部署vnpy$发环境[适用于windows+mac]

安装流程:

  1. 下载 anaconda,并安装
  2. 下载pycharm,并安装
  3. 下载vnpy源码vn.py
    下载源码
  4. 打开 pycharm, file->open,选择下载的vnpy 文件夹
    pycharm打开vnpy文件夹
  5. 配置 python 开发环境, file- settings
    pycharm项目设置
    添加新的anaconda环境
    description
    选择python3.7
    description
    这里我使用的环境名称是 py37_vnpy

打开terminal-注意环境切换成 py37_vnpy,执行以下命令,安装需要的插件

pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

description
遇到安装失败的可以单独安装:

pip install PyQt5 -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

description

  1. 创建run.py文件,复制以下代码,来源 README.md

description

  1. 运行 python run.py,注意环境名称是 py37_vnpy

description

备注:
如果需要在cmd 下使用 py37_vnpy 环境。

打开 CMD
运行: conda activate py37_vnpy
会切换到py37_vnpy环境下

description



vnpy 的启动流程总结

作为初学者,面对 vnpy 无所不包、博大精深的丰富内容,试图用图形对 vnpy 的运行流程做一个归纳。
不到之处,还请各位多多指正

description



彻底解决tick数据的过滤问题

1. 问题的由来

  • 在文章 发现了vnpy的BarGenerator两个隐藏很深的错误 !中我就已经分析过tick数据对bar生成器的影响。
  • 当前vnpy系统对集合竞价tick与其他tick没有区分能力
  • 当前vnpy系统没有充分利用行情接口提供的状态信息,无法识别有效tick与无效tick,一股脑地发送到策略和应用中,导致bar合成的错误。

2. 问题的解决方法

在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。

tick数据过滤器的功能:

  1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用,每个网关对应一个ick数据过滤;
    要做到这一条,就必须做到对所有已经订阅过的合约的tick的缓存,否则你再次重启系统的时候是无法知道你收到第一个tick是否已经参与过之前bar的合成了。这样你可能重复使用该tick,这是错误的。
    为此我们需要将所有已经订阅过的合约的最新tick进行实时更新,并定期做持久化保存,且在每次系统启动的时候读取加载到系统中。
  2. 过滤无效tick,转发有效交易状态下的tick到系统中,不在有效交易状态下tick做丢弃处理,有效交易状态包括:集合竞价状态和连续竞价状态;
    CTP系统的行情接口中包含的实时更新的合约交易状态通知推送接口,OnRtnInstrumentStatus()。关于这个问题我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中做了详细的介绍,再次就不赘述。总之合约交易状态通知可以让我识别一个tick是否是有些大tick。
  3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持。
    合约交易状态通知可以让我们知道那些tick是tick,同时可以可以让我们区分那个tick是集合竞价tick,那些是连续竞价tick。对有效tick进性分析利用于我们策略或者应用生成出正确的bar。
  4. 本文只对CtpGateway,CtaEngine、CtaTemplate进行了更改,其他网关系统的道理都是相同的。如果您觉得对您有启发,也可以按同样的方法修改。

3. 过滤无效tick数据的实现代码

声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。

3.1 相关数据类型定义

4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:

class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"


# 有效交易状态
VALID_TRADE_STATUSES = [
    InstrumentStatus.CONTINOUS,
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]

# 集合竞价交易状态
AUCTION_STATUS = [
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]


class StatusEnterReason(Enum):
    """
    品种进入交易状态原因类型 hxxjava debug
    """
    AUTOMATIC = "自动切换"
    MANUAL = "手动切换"
    FUSE = "熔断"

在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:

@dataclass
class StatusData(BaseData):
    """
    hxxjava debug
    """
    symbol:str       
    exchange : Exchange    
    settlement_group_id : str = ""  
    instrument_status : InstrumentStatus = None   
    trading_segment_sn : int = None 
    enter_time : str = ""      
    enter_reason : str = ""  
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def belongs_to(self,vt_symbol:str):
        symbol,exchange_str = vt_symbol.split(".")
        instrument = left_alphas(symbol).upper()
        return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)

3.2 相关消息定义

在vnpy\trader\event.py中增加交易状态消息类型

EVENT_STATUS = "eStatus"                        # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick."            # hxxjava debug

3.3 Gateway的修改

在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:

引用部分增加:

from .event import EVENT_ORIGIN_TICK,EVENT_STATUS         # hxxjava add
from .object import StatusData    # hxxjava add

修改class BaseGateway的on_tick()接口,增加on_status()接口:

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_ORIGIN_TICK, tick)  # hxxjava add             
        # self.on_event(EVENT_TICK, tick)                   
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)  

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)
        self.on_event(EVENT_STATUS + status.vt_symbol, status)

3.4 CtpGateway的修改

修改vnpy_cpt\ctp_gateway.py:

增加引用部分

from vnpy.trader.constant import InstrumentStatus,StatusEnterReason  # hxxjava debug
rom vnpy.trader.object import StatusData,     # hxxjava debug

增加几个映射字典:

# 品种状态进入原因映射  hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
    "0": InstrumentStatus.BEFORE_TRADING,
    "1": InstrumentStatus.NO_TRADING,
    "2": InstrumentStatus.CONTINOUS,
    "3": InstrumentStatus.AUCTION_ORDERING,
    "4": InstrumentStatus.AUCTION_BALANCE,
    "5": InstrumentStatus.AUCTION_MATCH,
    "6": InstrumentStatus.CLOSE,
    "7": InstrumentStatus.CLOSE
}


# 品种状态进入原因映射  hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
    "1": StatusEnterReason.AUTOMATIC,
    "2": StatusEnterReason.MANUAL,
    "3": StatusEnterReason.FUSE
}

为class CtpTdApi增加下面合约状态推送接口:

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约品种状态信息 # hxxjava debug 
        """
        if data:
            # print(f"【data={data}】")
            status =  StatusData(
                symbol = data["InstrumentID"],
                exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
                settlement_group_id = data["SettlementGroupID"],
                instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
                trading_segment_sn = data["TradingSegmentSN"],
                enter_time = data["EnterTime"],
                enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
                exchange_inst_id = data["ExchangeInstID"],
                gateway_name=self.gateway_name
            )
            # print(f"status={status}")
            self.gateway.on_status(status)

3.5 对CtaEngine的进行扩展

增加引用部分

from vnpy.trader.event import EVENT_AUCTION_TICK  # hxxjava add

增加一个对CtaEgine的扩展MyCtaEngine

class MyCtaEngine(CtaEngine):
    """  """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 """

        tick:TickData = event.data
        strategies = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

把vnpy_ctastrategy目录下 的__init__.py中的CtaStrategyApp做如下修改

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

3.6 CtaTemplate的修改

修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

3.7 为数据库增加最新Tick保存函数

3.7.1 修改vnpy\trader\database.py

为class BaseDatabase增加下面两个接口函数:

    @abstractmethod
    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        pass

    @abstractmethod
    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        pass

3.7.2 修改vnpy_mysql\mysql_database.py

class MyDateTimeField(DateTimeField):
    def get_modifiers(self):
        return [6]

class DbLastTick(Model):    # hxxjava add
    """ 最新TICK数据表映射对象 """

    id = AutoField()

    gateway_name: str = CharField()

    symbol: str = CharField()
    exchange: str = CharField()
    datetime: datetime = MyDateTimeField()

    name: str = CharField()
    volume: float = FloatField()
    turnover: float = FloatField()
    open_interest: float = FloatField()
    last_price: float = FloatField()
    last_volume: float = FloatField()
    limit_up: float = FloatField()
    limit_down: float = FloatField()

    open_price: float = FloatField()
    high_price: float = FloatField()
    low_price: float = FloatField()
    pre_close: float = FloatField()

    bid_price_1: float = FloatField()
    bid_price_2: float = FloatField(null=True)
    bid_price_3: float = FloatField(null=True)
    bid_price_4: float = FloatField(null=True)
    bid_price_5: float = FloatField(null=True)

    ask_price_1: float = FloatField()
    ask_price_2: float = FloatField(null=True)
    ask_price_3: float = FloatField(null=True)
    ask_price_4: float = FloatField(null=True)
    ask_price_5: float = FloatField(null=True)

    bid_volume_1: float = FloatField()
    bid_volume_2: float = FloatField(null=True)
    bid_volume_3: float = FloatField(null=True)
    bid_volume_4: float = FloatField(null=True)
    bid_volume_5: float = FloatField(null=True)

    ask_volume_1: float = FloatField()
    ask_volume_2: float = FloatField(null=True)
    ask_volume_3: float = FloatField(null=True)
    ask_volume_4: float = FloatField(null=True)
    ask_volume_5: float = FloatField(null=True)

    localtime: datetime = DateTimeField(null=True)

    class Meta:
        database = db
        indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)

class MysqlDatabase的初始化做如下修改:

    def __init__(self) -> None:
        """"""
        self.db = db
        self.db.connect()
        self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview])   # hxxjava add DbLastTick,DbContractData

再为class MysqlDatabase添加下面两个函数:

    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        vt_symbols = [t.vt_symbol for t in ticks]

        # 删除ticks列表中包含合约的旧的tick记录
        d: ModelDelete = DbLastTick.delete().where(
            (DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
        )
        count = d.execute()
        # print(f"delete {count} last ticks")

        # 构造最新的ticks列表数据
        data = []
        for t in ticks:
            tick:TickData = deepcopy(t)     # hxxjava change
            tick.datetime = tick.datetime

            d = tick.__dict__
            d["exchange"] = d["exchange"].value
            d.pop("vt_symbol")
            data.append(d)
            # print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))

        # 使用upsert操作将数据更新到数据库中
        with self.db.atomic():
            for c in chunked(data, 50):
                DbLastTick.insert_many(c).on_conflict_replace().execute()

        return True

    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        try:
            # 从DbLastTick查询符合条件的最新tick记录
            s: ModelSelect = (
                DbLastTick.select().where(
                    (DbLastTick.gateway_name == gateway_name)
                    & (exchange is None or DbLastTick.exchange == exchange.value)
                    & (symbol is None or DbLastTick.symbol == symbol)
                ).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
            )

            # 利用最新tick记录构造ticks列表
            ticks: List[TickData] = []
            for db_tick in s:
                tick:TickData = TickData(
                    symbol=db_tick.symbol,
                    exchange=Exchange(db_tick.exchange),
                    datetime=to_china_tz(db_tick.datetime),
                    name=db_tick.name,
                    volume=db_tick.volume,
                    turnover=db_tick.turnover,
                    open_interest=db_tick.open_interest,
                    last_price=db_tick.last_price,
                    last_volume=db_tick.last_volume,
                    limit_up=db_tick.limit_up,
                    limit_down=db_tick.limit_down,
                    open_price=db_tick.open_price,
                    high_price=db_tick.high_price,
                    low_price=db_tick.low_price,
                    pre_close=db_tick.pre_close,
                    bid_price_1=db_tick.bid_price_1,
                    bid_price_2=db_tick.bid_price_2,
                    bid_price_3=db_tick.bid_price_3,
                    bid_price_4=db_tick.bid_price_4,
                    bid_price_5=db_tick.bid_price_5,
                    ask_price_1=db_tick.ask_price_1,
                    ask_price_2=db_tick.ask_price_2,
                    ask_price_3=db_tick.ask_price_3,
                    ask_price_4=db_tick.ask_price_4,
                    ask_price_5=db_tick.ask_price_5,
                    bid_volume_1=db_tick.bid_volume_1,
                    bid_volume_2=db_tick.bid_volume_2,
                    bid_volume_3=db_tick.bid_volume_3,
                    bid_volume_4=db_tick.bid_volume_4,
                    bid_volume_5=db_tick.bid_volume_5,
                    ask_volume_1=db_tick.ask_volume_1,
                    ask_volume_2=db_tick.ask_volume_2,
                    ask_volume_3=db_tick.ask_volume_3,
                    ask_volume_4=db_tick.ask_volume_4,
                    ask_volume_5=db_tick.ask_volume_5,
                    localtime=db_tick.localtime,
                    gateway_name=db_tick.gateway_name
                )
                ticks.append(tick)

            return ticks

        except:
            # 当DbLastTick表不存在的时候,会发生错误
            return []

3.8 tick数据过滤器的实现

在vnpy.usertools下创建tickfilter.py文件,其内容如下:

"""
本文件主要实现tick数据过滤器——TickFilter。

tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持

作者:hxxjava
日期:2022-06-16
修改日期:              修改原因:  
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
    EVENT_ORIGIN_TICK,
    EVENT_AUCTION_TICK,
    EVENT_TICK,
    EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol


def left_alphas(instr:str):
    """
    得到字符串左边的字符部分
    """
    ret_str = ''
    for s in instr:
        if s.isalpha():
            ret_str += s
        else:
            break
    return ret_str

def get_vt_instrument(vt_symbol:str):
    """
    从完整合约代码转换到完整品种代码
    """    
    symbol,exchange = extract_vt_symbol(vt_symbol)
    instrument = left_alphas(symbol)
    return f"{instrument}.{exchange.value}"


class TickFilter():
    """ tick数据过滤器 """
    CHECK_INTERVAL:int = 5  # 更新到数据库间隔

    def __init__(self,event_engine:EventEngine,gateway_name:str):
        """ tick数据过滤器初始化 """
        self.event_engine = event_engine
        self.gateway_name = gateway_name
        self.db = get_database()

        # 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
        self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}

        # 品种及合约状态字典 { vt_symbol : StatusData }
        self.statuses:Dict[str,StatusData] = {}
        self.second_cnt = 0

        self.load_last_ticks()
        self.register_event()

        # print(f"TickFilter {gateway_name}")

    def load_last_ticks(self):
        """ 
        加载属于网关名称为self.gateway_name的最新tick列表 
        """
        last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
        for tick in last_ticks:
            self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)

        # print(f"load {len(last_ticks)} last ticks")

    def register_event(self):
        """ 注册消息 """
        self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_STATUS,self.process_status_event)
        self.event_engine.register(EVENT_TIMER,self.check_last_ticks)        

    def process_tick_event(self,event:Event):
        """ 对原始tick进行过滤 """
        tick:TickData = event.data

        # 检查tick合约的经验状态是否位有效交易状态
        status:StatusData = self.statuses.get(tick.vt_symbol,None)
        if not status:
            vt_instrument = get_vt_instrument(tick.vt_symbol)
            status = self.statuses.get(vt_instrument,None)
            if not status:
                # 未收到交易状态,返回
                return

        if status.instrument_status not in VALID_TRADE_STATUSES:
            # 不在有效交易状态,返回
            return

        key = (tick.gateway_name,tick.vt_symbol)
        _,oldtick = self.last_ticks.get(key,(None,None))
        valid_tick = False
        if not oldtick:
            # 没有该合约的历史tick
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        elif tick.datetime > oldtick.datetime:
            # 
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        else:
            print(f"【特别tick = {tick}】")

        if valid_tick == True:
            # 如果是有效的tick
            if status.instrument_status != InstrumentStatus.CONTINOUS:
                # 发送集合竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
                self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))  

            else:
                # 发送连续竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_TICK,tick))
                self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))  

    def process_status_event(self, event: Event):  
        """ 交易状态通知消息处理 """
        status:StatusData = event.data
        self.statuses[status.vt_symbol] = status

        # print(f"【{status.gateway_name} {status}】")

    def check_last_ticks(self,event:Event) -> None:
        """ 原始tick过滤器 """
        self.second_cnt += 1
        if self.second_cnt % self.CHECK_INTERVAL == 0:
            # 如果到了定时间隔

            # 查询所有更新的tick
            changed_ticks = [] 

            for key,(update,tick) in self.last_ticks.items():
                if update:
                    changed_ticks.append(tick)
                    self.last_ticks[key] = (False,tick)

            if changed_ticks:
                # 如果存在更新的tick,保存到数据库
                t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
                t.start()
                # print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")

3.9 把tick数据过滤器安装到主引擎MainEngine上去

修改vnpy\trader\engine.py

添加引用部分

from vnpy.usertools.tickfilter import TickFilter    # hxxjava add

修改MainEngine的

在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:

self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add

修改其add_gateway(),内容如下:

    def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
        """
        Add gateway.
        """
        # Use default name if gateway_name not passed
        if not gateway_name:
            gateway_name = gateway_class.default_name

        gateway = gateway_class(self.event_engine, gateway_name)
        self.gateways[gateway_name] = gateway

        # Add gateway supported exchanges into engine
        for exchange in gateway.exchanges:
            if exchange not in self.exchanges:
                self.exchanges.append(exchange)

        # add a tick data filter for the gateway    #  hxxjava add  
        if gateway_name not in self.tick_filters:
            self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)

        return gateway

4. 经过上面的一系列修改,你获得了哪些好处?

  • 你的策略再也不会收到重复数据和垃圾数据
  • 此以后你的CTA策略中必须加入一个on_auction_tick()接口函数,用来接受每个交易日集合竞价所产生的tick。如何使用这个tick你有你的方法。
  • 在合成K线的时候你才可能构成正确的K线,比如BarGenerator对跨日tick时间戳的处理错误问题,在此也会迎刃而解。

4.1 现在来梳理下我们都干了哪些事情

  1. 在CtpGateway中引入了合约交易状态,这可以用来过滤无效数据,同时还能够识别集合竞价tick。
  2. 在database中增加了最新tick持久化保存,这为新的tick是否是重复的判断提供支持。
  3. 提供有效tick的分类,在CTA策略的模板中增加on_auction_tick()接口使得BarGenerator正确处1分钟bar的成交量和成交额成为可能。

4.2 非CTP网关使用者是否也可以这样做?

只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。

5. 解决BarGenerator统计bar成交量和成交额错误的方法

5.1 这是对BarGenerator做出点修改,

  • 修改BarGenerator的初始化函数
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ):
        """ Constructor """      

        ... ...  # 其他代码省略

        self.auction_tick:TickData = None
        self.last_tick: TickData = None
  • 增加BarGenerator的集合竞价tick处理函数
    def update_auction_tick(self,tick:TickData):
        """ 更新集合竞价tick """
        self.auction_tick = tick
  • 修改BarGenerator的1分钟bar合成函数
    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        if self.auction_tick:
            # 合约集合竞价tick到当前tick
            tick.high_price = max(tick.high_price,self.auction_tick.high_price)
            tick.low_price = min(tick.low_price,self.auction_tick.low_price)

            # 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
            self.last_tick = deepcopy(self.auction_tick)
            # 成交量和成交额每天从0开始单调递增
            self.last_tick.volume = 0.0   
            self.last_tick.turnover = 0.0

            # 用完集合竞价tick就丢弃
            self.auction_tick = None

      ... ...  # 其他代码省略

5.2 您的策略关于集合竞价tick更新的回调函数:

    def on_auction_tick(self, tick: TickData):
        """
        集合竞价tick处理
        """
        self.bg.update_auction_tick(tick)    # 假设self.bg是已经创建过的bar生成器

两点说明:

  1. 如果你在阅读本文的时候觉得有点一头雾水,可以搜索'hxxjava'字符串,将会显示大部分修改的代码,仔细揣摩下,就会知道我做了什么了!
  2. 另外本贴中还有一部分涉及到条件单的代码,如果出现错误,可以查找我的关于条件单的帖子比停止单更好用的条件单——ConditionOrder,这里就不再重复贴出那部分代码了。


开始报名:2025小班特训营第三场【VeighNa量化AI智能体应用】!

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:VeighNa小助手 | 发布时间:2025-10-21
 
老规矩还是先放几张之前特训营的照片:

description
准备完毕,静候小班同学到达

description
学习量化,掌握核心理论框架

description
深入代码,分析策略逻辑细节

description

现场实践,剖析机器学习算法

基于之前学员的反馈,小班特训营这种2天10小时+的高强度课程通过线上直播学习的效果并不理想。为了保证更好的学习质量,我们对授课模式进行了调整:后续小班特训营不再提供线上直播参加和视频内容回看,而是改为同一主题的每场小班课都可以再次到场听讲。同时,我们会针对每一个特训营的主题建立专项社群,持续提供专业交流与学习服务,而不再只局限于三个月的答疑时间。

小班特训营优先面向买方投资机构。由于AI Agent(智能体)开发本身的复杂性(大模型知识、GPU算力需求、编程开发水平等),不建议新手报名。本场课程目前仅剩2个名额,感兴趣的同学请抓紧。

 

VeighNa量化AI智能体应用

日期:2025年11月29日(周六)和11月30日(周日)

时间:两天下午1点-6点,共计10小时

地点:上海浦东(具体地址会在报名成功后发送)

大纲

1. 认识AI Agent和准备开发环境

a. 课程导览与AI Agent前景

​ i. 课程目标、内容结构与学习路径

​ ii. AI Agent在量化金融领域的应用前景

b. VeighNa Agent (vnag) 框架介绍

​ i. 项目定位、设计哲学与核心优势

​ ii. vnag的系统架构与模块概览

c. 开发环境搭建

​ i. 安装Python、vnag及相关依赖

​ ii. 配置IDE(如Cursor)

2. 连接大模型:vnag的Gateway模块

a. 大模型API核心概念

​ i. 主流AI服务商(OpenAI, Anthropic)API的特点与异同

​ ii. API Key管理与安全实践

b. vnag.gateway 抽象设计

​ i. 详解Gateway基类,理解统一接口的设计思想

​ ii. 实战:实现OpenaiGateway与AnthropicGateway

c. 流式数据输出(Streaming)

​ i. 流式输出在交互式应用中的重要性

​ ii. 在vnag中处理流式响应的核心代码解析

3. 构建Agent知识库:拆分与向量化

a. RAG与知识工程

​ i. RAG(Retrieval-Augmented Generation)的基本原理

​ ii. 文档、代码、FAQ等不同类型资料的处理策略

b. vnag.segmenter:智能文本拆分

​ i. 详解Segmenter基类与拆分器的作用

ii. 拆分器实战:CppSegmenter、PythonSegmenter、MarkdownSegmenter

c. vnag.vector:向量化存储与检索

​ i. 向量数据库基础(以ChromaDB为例)

​ ii. 详解Vector基类,实现数据入库与相似度搜索

​ iii. 动手实践:将拆分后的文本块向量化并存入数据库

4. Agent核心能力:Function Call与UI交互

a. 大模型的Function Call(函数调用)

​ i. Function Call的原理与应用场景

​ ii. 基于vnag.engine封装和调用Function Call的技巧

b. MCP:标准化的消息通信协议

​ i. 为何需要标准化的通信协议

​ ii. 上手开发一套基础MCP服务

c. vnag.ui:Qt图形前端解决方案

​ i. vnag的UI架构解析:Window、Widget与Worker

​ ii. UI与后端Agent的事件驱动与数据交互机制

d. 综合实战:构建一个简单的知识库问答机器人

5. 实战案例一:AI驱动的CTP API接口适配开发

a. 任务背景:量化交易接口升级的痛点

b. Agent任务设计与工作流

​ i. 知识库:新旧版本的CTP API头文件、官方文档

​ ii. 工具集(Function Call):文件读写、代码比对、代码生成

​ iii. 工作流:定位变更 -> 生成方案 -> 修改代码 -> 验证修正

c. Live Coding:可复用的全自动量化API接口升级开发智能体

6. 实战案例二:AI赋能的CTA策略自动化投研

a. 任务背景:从策略思想到回测报告的“最后一公里”

b. Agent任务设计与工作流

​ i. 知识库:CTA策略模板、API文档、优秀策略范例

​ ii. 工具集(Function Call):代码生成检查、回测引擎调用、报告分析

​ iii. 工作流:自然语言描述 -> 策略代码生成 -> 初步回测检验 -> 多轮参数优化 -> 编写投研报告

c. Live Coding:与AI Agent对话,从0到1完成完整CTA策略的投研

7. 实战案例三:AI驱动的股票因子挖掘

a. 任务背景:如何从海量信息中自动化挖掘有效Alpha因子

b. Agent任务设计与工作流

​ i. 知识库:指定的因子研究报告(PDF)或网页链接

​ ii. 工具集(Function Call):内容解析、vnpy.alpha调用、指标计算、数据入库

​ iii. 工作流:阅读资料 -> 生成公式 -> 检验计算 -> 分析结果 -> 循环迭代

c. Live Coding:给AI一篇券商研报,看它如何自动挖掘Alpha因子

价格:11999元

 

报名请扫描下方二维码添加小助手提供相关信息(想参加的课程、姓名、手机、公司、职位),报名结果以确认回复为准:
 

description

 



2025年第7次社区活动 - 【Kronos大模型的CTA应用实践】- 11月1日(上海)

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:VeighNa小助手 | 发布时间:2025-10-16
 
【社区活动尊享卡】的受欢迎程度大幅超出我们的预期,为了保证每场社区活动的交流质量,尊享卡已经变更为仅对部分专业交易员用户定向提供。对于参加活动较多的同学强烈推荐!购买请扫描二维码添加小助手咨询:

description

上一场社区活动中关于金融K线大模型 Kronos 的初步介绍,引发了社区同学们的广泛关注和积极讨论。

简单来说,Kronos 是一个专为金融市场设计的大模型,它的核心思想是将K线形态“词化”(Tokenize)——即视单根K线为“单词”,连续K线为“句子”,从而能够运用强大的Transformer架构来学习市场价格数据中的深层模式。根据论文中的基准测试结果,与LSTM等传统时序模型相比,其预测性能获得了显著提升。

本场活动中我们将从理论走向实践,深入探讨Kronos模型的实现与使用细节,并重点演示如何在 VeighNa 的CTA策略模块中,集成该模型以构建一套从数据准备、模型调用到策略回测的完整量化投研流程。

本场活动将于11月1日(周六)下午2:00至5:00在上海举办。普通报名仅支持线下参会,尊享卡持有者和Elite会员可通过线上直播参与。活动具体地址将在微信群中公布,请在报名成功后扫码加入社区活动群,以便获取相关信息!

 

活动内容大纲

 

  1. 初探Kronos金融K线大模型

    a. 金融时序分析的目标:价格 vs 收益率
    b. 传统时序模型(如LSTM等)的不足
    c. Kronos针对价格序列的token构建方式

  2. 使用Kronos来预测价格波动

    a. 从0搭建Kronos大模型运行环境
    b. 梳理论文中的预测方法论细节
    c. 正确理解Kronos模型的输出结果
    d. 批量计算价格波动的预测概率分布

  3. 尝试在CTA策略中应用Kronos

    a. 基于价格波动的预测结果构建趋势信号
    b. 解决Kronos模型的调用性能开销问题
    c. 期货市场的KronosStrategy绩效分析
    d. 完整投研回测代码的实现细节探讨

  4. 闭门交流环节

 
时间:11月1日 14:00-17:00

地点:上海(具体地址后续在微信群中通知)

报名费:99元(Elite会员免费参加)

报名方式:扫描下方二维码报名(报名后请扫码加入社区活动微信群获取参会地址)

 

description

 



【vnpy_xt】迅投研数据服务模块正式上线!

耗时2个多月和迅投团队配合测试对接,VeighNa框架的迅投研数据服务的接口vnpy_xt正式上线,支持股票、期货、期权、基金等历史量价数据的获取。

迅投为VeighNa社区提供了专属的试用申请链接:https://xuntou.net/#/signup?utm_source=vnpy

注册申请后即可获取14天的免费试用,目前数据流量上限较高,推荐有需要的同学不要错过!!!(在有效期内多下载一些数据)

整体使用流程如下:

  1. http://docs.thinktrader.net/下载安装xtquant安装包,解压后放置到c:\veighna_studio\lib\site_packages文件夹下
  2. 完成前文的试用账号注册后,登录https://xuntou.net/#/login,在【用户中心】-【个人设置】-【接口TOKEN】处获取Token
  3. 安装vnpy_xt模块:pip install vnpy_xt
  4. 在VeighNa Trader的【全局配置】中配置数据服务相关字段:
  5. datafeed.name:xt
  6. datafeed.username:token
  7. datafeed.password:填第二步中复制的Token

使用过程中遇到任何问题可以通过社区论坛寻投研专区提问交流:https://www.vnpy.com/forum/forum/35-xun-tou-yan



《vn.py 3.0.0源代码深入分析》

我学Python的目的很明确,就是量化交易。从一开始就有关注vn.py,但我学的是Python3,那时vn.py还处于版本1.x时期,所以只能望vn.py兴叹。
vn.py 2.0出来之后我并没有及时注意,等反应过来已经是2.0.7版。很兴奋,认真研究,并将心得写成《vn.py 2.0.7源代码深入分析》,分享在vn.py社区的经验分享板块。
出于对量化交易的爱好,出于对Python在量化交易中作用的认同,一定程度受vn.py强大功能的鼓舞,我与同事合写了《Python量化交易从入门到实战》一书,对vn.py的讨论是其中很重要的一部分内容。
后续又写了《vn.py 2.1.4源代码深入分析》和《vn.py 2.2.0源代码深入分析》两个文档,感谢各位老师的认可。
vn.py 3.0.0版发布于2022-03-23,这是我一直期待的一个版本,所以它刚一推出,我就立刻开始试用,并着手整理《vn.py 3.0.0源代码深入分析》。夜以继日,终于在前天完成。先发到了书籍的资源群中,接受了两天批评,现分享到此处。
写作本文档的一个主要目的是对vn.py的开源精神做出一点支持,希望本文档能够对大家学习使用vn.py有所帮助。

百度网盘链接:https://pan.baidu.com/s/1cl2MA9hNFhHlxfHM0gGe2A
提取码:s7u6



VeighNa发布v4.1.0 - 整体完成模块移植工作

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2025-07-19
 
7月初发布了VeighNa的4.1.0版本,本次更新的主要内容是完成了绝大多数VeighNa开源社区版中的模块移植(接口、应用等),得益于Python 3.13带来的显著性能提升,强烈建议还在使用3.0大版本的用户升级,感受新一代版本带来的性能飞跃。

对于已经安装了VeighNa Studio 4.0版本的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载\VeighNa Studio-4.1.0**,体验一键安装的量化交易Python发行版,下载链接:

https://download.vnpy.com/veighna_studio-4.1.0.exe

 

聊聊Python 3.13的性能

 

关于Python 3.13具体的性能提升水平,社区里已经有许多同学讨论了,这里借着4.1.0发布的机会,对互联网上的公开资料做个整理。

熟悉Python发展历史的同学可能知道,自3.10版本以来,CPython官方团队在性能优化上投入了巨大的精力,几乎每个新版本都是一次“提速”。下面就来回顾一下这几个版本中和性能相关的核心改动,看看3.13版本对比3.10究竟快了多少。

Python 3.11:革命性的性能飞跃

Python 3.11是“Faster CPython”项目第一个取得丰硕成果的版本,其性能相较于3.10有巨大提升。根据官方文档,Python 3.11在标准基准测试套件上比3.10平均快了1.25倍

主要改进包括:

  1. PEP 659 - 专业化自适应解释器:这是3.11性能提升的核心。解释器现在可以在运行时将通用的字节码替换为针对特定数据类型的“专业化”版本。例如,对于重复执行的a + b操作,如果ab总是整数,解释器会使用专门处理整数加法的快速指令,大大提高了执行效率。
  2. 更快的启动速度:通过将核心模块的字节码“冻结”在内存中,减少了解析和加载时间,使得Python解释器的启动速度提升了10-15%。
  3. 更快的函数调用:优化了函数调用过程中的帧(frame)创建和管理,减少了C栈的使用和内存分配,使得纯Python函数调用更加高效。

Python 3.12:精益求精的持续优化

Python 3.12延续了3.11的势头,在现有基础上进行了更多细致的优化。虽然不像3.11那样有革命性的飞跃,但它同样带来了稳固的性能增长。3.12的官方文档中没有提及具体的平均性能提升数字,但根据社区测试的结果,Intel平台上对比3.10的平均提升在5%

主要改进包括:

  1. 更多的专业化指令:在3.11的基础上,为更多的字节码指令增加了专业化版本,覆盖了更多的代码场景。
  2. 改进的内存管理:通过优化对象结构和垃圾回收机制,减少了内存开销,从而提高了缓存效率。
  3. 解释器循环优化:对解释器的主循环(evaluation loop)进行了生成方式的重构,使其更容易维护和优化,并为未来的JIT(Just-In-Time)编译器等更激进的优化铺平了道路。

Python 3.13:异步性能的又一次飞跃

Python 3.13继续沿着性能优化的道路前进,根据社区测试的结果,对比3.12的平均性能提升大约是5%(Intel平台)

这一版本的改进主要集中在:

  1. 异步性能大幅提升:对asyncio库进行了大量优化,根据基准测试,asyncio相关任务整体性能提升了\1.19倍**。**
  2. 常用操作加速:许多基础操作,如序列解包(unpack_sequence)、生成器(generators)等都变得更快。
  3. 为未来铺路:虽然这里不讨论实验性功能,但值得一提的是,3.13的许多改动(如试验性的No-GIL模式、新的JIT编译器框架)都在为未来更大幅度的性能提升奠定基础。

结论:所以到底快了多少?

综合来看,从Python 3.10到3.13,CPython的性能经历了持续且显著的增长。通过将各个版本的性能提升进行串联估算 (1.25 1.05 1.05),我们可以得出一个大致的结论:

 

Python 3.13的平均性能大约比Python 3.10快35-40%。

 

对于VeighNa用户而言,这意味着策略回测、实盘交易中涉及的大量纯Python计算逻辑(例如信号计算、交易执行、投研分析等)都将运行得更快,从而降低延迟、提升策略执行效率。因此,我们强烈建议使用开源社区版的用户升级到基于Python 3.13的VeighNa 4.1.0版本,来享受这份免费的“性能午餐”。

 

CHANGELOG

 

新增

  1. vnpy_mcdata新增对于Tick数据查询的支持
  2. OrderType枚举值增加ETF类型,支持ETF申购和赎回业务
  3. 增加遗传算法优化函数run_ga_optimization的入参,允许用户控制优化过程中所使用的全部超参
  4. CTA策略回测引擎,增加对于遗传算法优化函数新入参的支持

调整

  1. 升级扩展模块适配4.0版本,具体模块列表请参考该页面
  2. 使用close函数替代unbind,来实现vnpy.rpc模块中zmq.Socket的安全关闭
  3. 修改PySide6依赖版本为6.8.2.1,解决部分底层warning输出问题
  4. 修改ta-lib依赖版本为0.6.4,解决Linux和Mac系统的安装问题
  5. 调整Qt层捕捉到全局异常时的日志输出级别为Critical
  6. vnpy_datarecorder移除不必要的行情录制异常抛出,改为记录日志
  7. vnpy_rqdata下载股票数据时,除权方式由pre改为pre_volume
  8. 数据库模块录制行情数据时,默认跳过extra字段
  9. vnpy_ib支持10.30.1版本的ibapi,增加对于新版本撤单函数的传参支持

修复

  1. 修复新版本ta-lib中,MA_Type类不再是枚举值导致的部分指标计算问题
  2. 修复补全MainEngine缺失的get_tick函数
  3. 修复邮件发送引擎在使用QQ邮箱时出现的发送后报错问题
  4. 修复日志模块由于缺失默认gateway_name参数,在Qt层捕捉到全局异常时输出错误的问题
  5. vnpy_rohon新增Linux安装脚本,解决动态库找不到的问题
  6. vnpy_rqdata修复品种代码为小写合约的次主力88A2历史数据查询问题

 


新消息

6个月前

统计

主题
10549
帖子
39907
已注册用户
69400
最新用户
在线用户
154
在线来宾用户
80630
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】