VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy

置顶主题

第3场尊享卡专属社区活动 - 【Mini-Agent + Claude Skills深入实战】- 2026年1月18日

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:VeighNa小助手 | 发布时间:2025-12-30
 
尊享卡专属的系列社区活动已经于9月启动!本系列活动的一大亮点是包含更多 Live Coding 实战内容,我们希望通过这种“授人以渔”的方式,帮助更多用户将知识内化为技能。

之前的历史活动主题:

  • 2025年9月27日 【Cursor深入实战】
  • 2025年10月12日 【Claude Code深入实战】

第三场尊享卡专属活动【Mini-Agent + Claude Skills深入实战】,定于

定于1月18日(周日)下午2:00至5:00举行,活动具体大纲如下:

 

活动内容大纲

 

  1. 快速上手 Mini Agent
    a. 认识一下 MiniMax 国产开源大模型

    b. Mini Agent基础操作与核心功能概览

    c. 强大的交错思维(interleaved thinking)能力

  2. 基于 Python 实现的 Agent 项目

    a. CLI 智能体的执行循环实现

    b. 内置工具集和持久化记忆机制

    c. 上下文的自动摘要处理机制

    d. Claude Skills 专业技能支持

  3. CTA 自动投研再进化
    a. 梳理之前 Codex 实现方案的不足
    b. 思考结合 Claude Skills 的优化方向
    c. 以实际策略开发案例评估 MiniMax-M2.1 模型

    d. CTA 策略投研 Skills 的源码内容解析

  4. 闭门交流环节

 

时间:2026年1月18日(周日) 14:00-17:00

参加方式:小鹅通线上直播

报名方式:本活动仅对尊享卡用户和Elite会员开放

 

购买尊享卡请扫描二维码添加小助手咨询:

description

 



VeighNa发布v4.3.0 - VeighNa Assistant

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2025-12-29
 

上周我们发布了VeighNa 4.3.0版本。本次更新主要包括:在VeighNa Station中集成全新的AI智能助手VeighNa Assistant,以及对VeighNa Docker镜像进行重构优化。

已安装VeighNa Studio 4.0版本的用户,可使用快速更新功能完成自动升级;尚未安装的用户,建议直接下载VeighNa Studio-4.3.0,体验一键安装的量化交易Python发行版,下载链接:

https://download.vnpy.com/veighna_studio-4.3.0.exe

 

VeighNa Assistant

 

功能介绍

description

在本次4.3.0版本中,VeighNa Station集成了VeighNa Assistant智能助手(基于VNAG框架开发),主要功能包括:

  1. 接入VeighNa官方的内部文档知识库、示例策略库以及项目源码库,提供全面的技术支持;
  2. 采用Agentic AI架构(区别于传统的RAG方案),能够根据用户需求智能调用各种工具来完成复杂任务;
  3. 支持开放式的大模型服务配置,用户可以根据自身需求灵活选择国内外各种主流AI大模型。

使用步骤

要使用VeighNa Assistant,首先需要准备一个大模型服务的API Key(可理解为访问凭证)。对于初次接触的用户,推荐使用阿里云百炼的AI服务(目前提供较为充足的免费额度),具体开通流程请参考阿里云官方的详细步骤说明

准备好API Key后,双击桌面快捷方式启动VeighNa Station,在顶部菜单栏中找到【功能 -> AI服务配置】选项:

description

点击后将打开AI服务配置对话框。在顶部的下拉框中选择【Bailian】,然后在配置参数区域填入之前准备好的API Key,API地址保持默认即可:

description

点击【保存】按钮后,系统会弹出提示框,告知需要重启VeighNa Station以使配置生效:

description

完成重启后,点击顶部菜单栏的【功能 -> 模型浏览器】,选择想要使用的大模型:

description

在模型浏览器中,可以通过上图红框中的箭头按钮来添加或移除大模型,同时也可以调整已选模型的优先级顺序。对于百炼AI服务,推荐选择Agentic能力较强的qwen3-max-previewkimi-k2-thinking模型。完成设置后,点击右下角的【保存】按钮,系统会弹出确认提示框:

description

点击【OK】返回主界面后,即可在聊天区域与智能体进行交互。整体使用方式与ChatGPT等主流AI聊天工具高度相似,有相关使用经验的用户可以快速上手:

description

如果在使用过程中遇到任何问题或有改进建议,欢迎在社区论坛的【VeighNa Assistant】专区发帖交流。

 

VeighNa Docker镜像

 

基于社区用户的反馈,我们在4.3.0版本中 对VeighNa Docker镜像进行了全方位的重构与优化。

核心改进:

  1. 体积大幅瘦身:通过优化Dockerfile的分层构建逻辑,镜像体积从原先的1.44GB显著降低至851MB。更小的体积意味着更快的拉取速度和更低的存储占用,特别适合云端环境的快速部署。
  2. 构建流程透明化:移除了此前为了压缩体积而引入的复杂两步构建逻辑,回归标准化的构建流程。这不仅让打包过程更加稳定,也方便高阶用户参考官方Dockerfile进行个性化的二次开发。
  3. 核心环境升级:镜像环境全面升级适配VeighNa 4.0版本,并将核心解释器升级至Python 3.13,为用户提供更加高效、稳定的量化交易容器环境。

快速开始:

用户可以直接访问VeighNa Docker Hub查看详情,或使用以下命令直接拉取并启动:

# 拉取 4.3.0 版本镜像
docker pull veighna/veighna:4.3.0

# 启动容器(示例:挂载本地目录并启动图形界面)
docker run -it \
  -e DISPLAY=$DISPLAY \
  -v /tmp/.X11-unix:/tmp/.X11-unix \
  -v $(pwd)/home:/home \
  -p 8888:8888 \
  veighna/veighna:4.3.0 python3 -m veighna_station

 

CHANGELOG

 

新增

  1. vnpy.alpha增加WorldQuant的Alpha 101因子特征数据集

调整

  1. vnpy_sec/vnpy_esunny升级适配4.0版本
  2. vnpy_ctabacktester的策略代码编辑功能支持cursor和pycharm编辑器
  3. vnpy_ctastrategy的回测引擎,增加RGR绩效统计指标(感谢上弦之月贡献)
  4. ArrayManager增加对于指标计算函数重载(Function Overload)的类型提示声明
  5. vnpy_ctp增加特殊情况撤单(非交易时段、资金不足等)的日志输出
  6. DataProxy的所有比较运算,直接返回pl.Int32(而不是Bool)
  7. 重构ts_slope / ts_rsquare / ts_resi算子函数

修复

  1. vnpy_ib修复查询历史数据问题(query_history函数增加查询锁解决多线程冲突问题)
  2. vnpy_optionmaster修复深度虚值期权的隐含波动率计算收敛问题
     


2025年第8次社区活动 - 【Codex + MCP 的量化投研开发实战】- 12月20日(上海)

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:VeighNa小助手 | 发布时间:2025-12-04
 
【社区活动尊享卡】的受欢迎程度大幅超出我们的预期,为了保证每场社区活动的交流质量,尊享卡已经变更为仅对部分专业交易员用户定向提供。对于参加活动较多的同学强烈推荐!购买请扫描二维码添加小助手咨询:

description

前两场尊享卡专属活动中,大家已经见识了 Cursor 和 Claude Code 如何在量化开发与投研实践中释放灵感、提升效率。这一系列探索展示了 Vibe Coding 工具在量化研发中的潜力——AI 不再只是辅助,而正在成为策略创新的伙伴。

在 2025 年 VeighNa 社区活动的收官之作中,我们将以更系统的视角聚焦 Codex 及其对 MCP(Model Context Protocol)的支持,探讨这项技术如何让 AI 编码助手真正融入量化策略开发与回测的全流程,为智能化量化研发揭开新的篇章。

本场活动将于12月20日(周六)下午2:00至5:00在上海举办。普通报名仅支持线下参会,尊享卡报名可通过线上直播参与。活动具体地址将在微信群中公布,请在报名成功后扫码加入社区活动群,以便获取相关信息!

 

活动内容大纲

 

  1. Codex 快速入门

    a. 安装与登录配置
    b. 技术原理与核心功能概览
    c. 任务执行模式解析(对比 Claude Code )

  2. 接入 MCP 工具生态

    a. 理解 Model-Context-Protocol 的核心概念
    b. 搭建本地运行环境
    c. 寻找并配置合适的 MCP 服务
    d. 在 Codex 中完成 MCP 服务接入

  3. 量化投研开发实践

    a. 回顾传统的 CTA 策略投研流程
    b. 梳理 Codex 能帮我们自动完成的部分
    c. 结合 Sequential-Thinking 探索新策略灵感
    d. 通过 Filesystem 实现策略代码的读写与管理
    e. 体验 Codex 驱动的全流程策略开发与回测评估

  4. 闭门交流环节

 
时间:12月20日 14:00-17:00

地点:上海(具体地址后续在微信群中通知)

报名费:99元(Elite会员免费参加)

报名方式:扫描下方二维码报名(报名后请扫码加入社区活动微信群获取参会地址)

 

description

 



本地部署vnpy$发环境[适用于windows+mac]

安装流程:

  1. 下载 anaconda,并安装
  2. 下载pycharm,并安装
  3. 下载vnpy源码vn.py
    下载源码
  4. 打开 pycharm, file->open,选择下载的vnpy 文件夹
    pycharm打开vnpy文件夹
  5. 配置 python 开发环境, file- settings
    pycharm项目设置
    添加新的anaconda环境
    description
    选择python3.7
    description
    这里我使用的环境名称是 py37_vnpy

打开terminal-注意环境切换成 py37_vnpy,执行以下命令,安装需要的插件

pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

description
遇到安装失败的可以单独安装:

pip install PyQt5 -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

description

  1. 创建run.py文件,复制以下代码,来源 README.md

description

  1. 运行 python run.py,注意环境名称是 py37_vnpy

description

备注:
如果需要在cmd 下使用 py37_vnpy 环境。

打开 CMD
运行: conda activate py37_vnpy
会切换到py37_vnpy环境下

description



vnpy 的启动流程总结

作为初学者,面对 vnpy 无所不包、博大精深的丰富内容,试图用图形对 vnpy 的运行流程做一个归纳。
不到之处,还请各位多多指正

description



VeighNa发布v4.2.0 - RiskManager风控模块重构升级

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2025-11-10
 
上周发布了VeighNa的4.2.0版本,本次更新的主要内容是对RiskManager交易事前风控模块进行了重构升级,同时调整了框架的日志输出落地功能,完整适配期货程序化交易监管新规中对于程序化交易系统的功能要求。

对于已经安装了VeighNa Studio 4.0版本的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载VeighNa Studio-4.2.0,体验一键安装的量化交易Python发行版,下载链接:

https://download.vnpy.com/veighna_studio-4.2.0.exe

 

RiskManager风控模块

 

标准化风控规则模板

新版 vnpy_riskmanager 模块设计的核心是标准化的风控规则模板 RuleTemplate。任何自定义风控规则都必须继承该基类,这保证了所有规则都具备统一的接口,使得风控引擎(RiskEngine)能够对其进行动态发现、加载和执行。

RuleTemplate 为风控规则定义了完整的生命周期和核心逻辑方法。创建一个新的风控规则,即是创建一个继承自 RuleTemplate 的子类,并根据实际需求实现其中的相关方法。

核心组件

一个标准的风控规则类主要包含以下几个部分:

  • name: str: 规则的唯一中文名称,用于图形界面上该规则相关的信息显示。
  • parameters: dict[str, str]: 定义规则的可配置参数。该字典的键为参数的英文名,值为其在图形界面上显示的中文名,方便用户理解和修改。
  • variables: dict[str, str]: 定义规则需要实时监控的状态变量,同样字典的键为变量的英文名,值为其在图形界面上显示的中文名。

核心方法

  • on_init(self) -> None: 规则的初始化方法。在被风控引擎加载时调用,主要用于设定参数的默认值以及初始化监控变量的初始状态。
  • on_order(self, order: OrderData) -> None: 用于处理委托状态更新的回调方法。规则可在此方法内更新内部状态,例如跟踪活动委托的数量。
  • on_trade(self, trade: TradeData) -> None: 用于处理成交回报的回调方法。规则可在此方法内统计累计成交信息等。
  • on_tick(self, tick: TickData) -> None: 用于处理行情切片数据的回调方法。适用于需要基于实时行情进行判断的规则,但需注意其可能会因频繁调用而带来的性能开销。
  • check_allowed(self, req: OrderRequest, gateway_name: str) -> bool: 风控检查的核心逻辑实现。每当交易系统产生新的委托请求(OrderRequest)时,风控引擎都会调用此方法进行检查。若订单允许通过,则方法返回 True;若订单需要被拦截,则应先调用 self.write_log() 记录拦截的具体原因,随后返回 False

Python 规则示例

以下是一个完整的纯 Python 规则示例代码,实现了对单笔委托数量的上限限制。

from vnpy.trader.object import OrderRequest, OrderData
from ..template import RuleTemplate

class OrderVolumeRule(RuleTemplate):
    """单笔委托手数限制规则"""

    # 规则的中文名称
    name: str = "委托数量检查"

    # 定义可配置的参数
    parameters: dict[str, str] = {
        "max_volume": "单笔最大委托量"
    }

    # 定义需监控的变量(可为空)
    variables: dict[str, str] = {}

    def on_init(self) -> None:
        """初始化"""
        self.max_volume: int = 100         # 设置参数的默认值

    def check_allowed(self, req: OrderRequest, gateway_name: str) -> bool:
        """核心风控逻辑"""
        if req.volume > self.max_volume:
            msg = f"委托数量{req.volume}超过参数限制{self.max_volume}:{req}"
            self.write_log(msg)
            returnFalse

        returnTrue

模块的内置风控规则

vnpy_riskmanager 模块提供了一系列标准化的内置风控规则,覆盖了交易过程中的多个关键风险点。所有内置规则均提供纯 Python 和 Cython 两种实现,风控引擎在加载时会优先使用性能更优的 Cython 版本。

  • ActiveOrderRule (活动委托数量上限)此规则用于限制全局的活动(可撤)委托数量。当活动委托笔数超过设定的 active_order_limit 阈值时,新的委托请求将被拦截。它通过监听 on_order 事件来实时跟踪委托状态的变更,从而维护准确的活动委托计数。

  • DailyLimitRule (每日流控上限)该规则用于对整个交易日内的委托、撤单、成交行为进行总次数和单个合约次数的双重流控。它包含六个可配置参数:total_order_limit (汇总委托上限), total_cancel_limit (汇总撤单上限), total_trade_limit (汇总成交上限), contract_order_limit (合约委托上限), contract_cancel_limit (合约撤单上限), contract_trade_limit (合约成交上限)。在发单时,会检查各项累计值是否超限;在收到委托、撤单和成交回报时,会更新相应的计数。

  • DuplicateOrderRule (重复报单检查)用于防范因系统或人为失误导致的重复下单。该规则会统计历史上所有发出委托的缓存,当一个新的委托请求在合约、类型、方向、开平、价格、数量等字段上完全相同时,则会累加其重复次数。如果该次数超过 duplicate_order_limit 上限(默认为10次),委托就会被执行拦截。

  • OrderSizeRule (单笔委托规模上限)这是一个基础但极为重要的风控规则,用于防止“乌龙指”。它包含两个维度的限制:order_volume_limit 用于限制任何单笔委托请求的最大允许数量;order_value_limit 用于限制该笔委托的总价值(数量 价格 合约乘数)。任何一个维度超限都会导致委托被拦截。

  • OrderValidityRule (委托指令合法性监控)此规则在委托发送至交易接口前,对其指令的合法性进行预检查,提前拦截无效委托,减轻后端系统压力。检查内容包括:

    1. 通过 main_engine.get_contract 检查委托的合约是否存在。
    2. 检查委托价格是否为合约最小价格变动 pricetick 的整数倍。
    3. 检查委托数量是否超过了合约规定的单笔最大下单量 contract.max_volume
    4. 检查委托数量是否低于合约规定的单笔最小下单量 contract.min_volume

Cython 性能优化加速

风控引擎对所有规则的检查是串行执行的,在交易繁忙时,多条纯Python规则累加的检查延迟可能成为性能瓶颈。为了解决这个问题,vnpy_riskmanager模块支持使用Cython对代码进行性能优化。开发者可以将原有的纯Python风控规则,通过Cython语法重构为C语言级别的高性能版本,从而将检查延迟降低到微秒级。

Cython 规则的实现方式

编写 Cython 风控规则的关键是将核心逻辑(尤其是高频调用的check_allowed函数)放入一个 cdef class 中,同时保留一个标准的 Python class 作为外层包装器,以供风控引擎识别和加载。

以下是内置风控规则中 OrderSizeRule 的 Cython 版本实现代码作为示例:

# cython: language_level=3
from vnpy_riskmanager.template cimport RuleTemplate


cdef class OrderSizeRuleCy(RuleTemplate):
    """委托规模检查风控规则 (Cython 版本)"""

    cdef public int order_volume_limit
    cdef public float order_value_limit

    cpdef void on_init(self):
        """初始化"""
        self.order_volume_limit = 500
        self.order_value_limit = 1_000_000

    cpdef bint check_allowed(self, object req, str gateway_name):
        """检查是否允许委托"""
        cdef object contract
        cdef float order_value

        if req.volume > self.order_volume_limit:
            self.write_log(f"委托数量{req.volume}超过上限{self.order_volume_limit}:{req}")
            return False

        contract = self.get_contract(req.vt_symbol)
        if contract and req.price:      # 只考虑限价单
            order_value = req.volume * req.price * contract.size
            if order_value > self.order_value_limit:
                self.write_log(f"委托价值{order_value}超过上限{self.order_value_limit}:{req}")
                return False

        return True


class OrderSizeRule(OrderSizeRuleCy):
    """委托规模检查规则的Python包装类"""

    name: str = "委托规模检查"

    parameters: dict[str, str] = {
        "order_volume_limit": "委托数量上限",
        "order_value_limit": "委托价值上限",
    }

编译自定义 Cython 规则

对于自定义的 Cython 规则可以通过一个独立的编译脚本来编译生成pyd文件(注意编译过程中会需要用到 C++ 编译器,如 Windows 上的 Visual Studio):

  1. 创建 build_rule.py: 在项目根目录下创建此脚本。

    from setuptools import setup, Extension
    from Cython.Build import cythonize
    
    # 注意:需与 .pyx 文件名一致
    rule_name = "order_size_rule_cy"
    
    extensions = [
        Extension(
            name=f"vnpy_riskmanager.rules.{rule_name}",
            sources=[f"vnpy_riskmanager/rules/{rule_name}.pyx"],
        )
    ]
    
    setup(
        ext_modules=cythonize(
            extensions,
            compiler_directives={"language_level": "3"}
        )
    )
  2. 执行编译: 在项目根目录下打开终端,运行编译命令。

    python build_rule.py build_ext --inplace

    --inplace 参数会使编译生成的二进制文件(.pyd.so)直接输出到 vnpy_riskmanager/rules 目录下。编译完成后重启程序,RiskEngine 会自动发现并优先加载这个高性能的 Cython 规则。

风控信息通知机制

当一个委托请求因为触发了某项风控规则而被拦截时,vnpy_riskmanager 模块会通过多种方式立即向用户发出警报,确保交易员能够第一时间注意到异常情况并进行处理。

声音报警提醒

一旦有委托被拦截,风控引擎会立刻调用操作系统内置的提示音发出警报,确保即使用户的注意力不在交易界面上,也能迅速察觉到风控事件的发生,从而避免错过关键的风险信息。

气泡弹窗通知

在声音报警的同时,一个可视化的气泡弹窗会从系统托盘区域(通常是屏幕右下角)弹出,该弹窗会明确展示具体的拦截原因,例如“委托数量100超过参数限制50”,气泡弹窗会持续显示一段时间(默认30秒)。

日志信息输出

为了确保风险事件的可追溯性,所有拦截信息在触发实时报警的同时,也会通过VeighNa框架的标准化日志系统进行输出,具体包括:主界面的日志监控组件、启动系统的命令行终端,以及本地日志文件。

 

核心框架日志输出落地

 

为了满足监管新规中的要求,确保程序化交易系统的关键操作都有据可查,VeighNa的4.2.0版本在日志记录方面进行了全面增强,主要更新包括:

  • 日志默认启动与持久化:日志功能已默认激活并持久化,INFO级别的系统常规操作信息将同步输出到终端日志文件
  • 核心引擎操作日志:核心引擎 MainEngine 会自动记录关键交易生命周期事件,覆盖连接登录、行情订阅、委托下单委托撤单等操作。
  • 策略模块日志自动注册:各大策略模块(如 CtaStrategyPortfolioStrategy 等)的日志将自动注册并转发至核心日志引擎,实现对策略层行为的全面监控。

 

CHANGELOG

 

新增

  1. vnpy_riskmanager模块重构

    a. 采用插件式设计,提供标准化风控规则开发模板
    b. 支持中国期货程序化交易系统监管要求中的风控规则
    c. 输出拦截日志后播放提示声音,目前仅支持Windows系统
    d. 使用系统托盘栏图标,弹出交易风控拦截日志气泡框
    e. 提供Cython版本的风控规则开发模板以及具体规则实现
    f. 支持自动扫描加载用户自定义风控规则(放置于Trader目录下的rules文件夹中)

  2. vnpy_polygon数据服务接口,支持海外股票、期货、期权等资产品种的历史数据获取

调整

  1. vnpy_ctp更新底层API到6.7.11(生产和测试统一版本)
  2. vnpy_dolphindb升级适配4.0版本
  3. vnpy_tqsdk简化时间戳的格式化方法,提高效率
  4. vnpy_sqlite支持使用配置文件中声明的数据文件
  5. vnpy_taos优化get_bar_overview和get_tick_overview函数性能(直接访问超级表的tags)
  6. vnpy_spreadtrading / vnpy_portfoliostrategy / vnpy_scripttrader 注册模块日志输出到日志引擎
  7. vnpy_optionmaster优化定价模型中期权最小价值边界判断的逻辑
  8. 全局配置中的log.level改为INFO(10),默认启用详细日志记录输出
  9. MainEngine增加交易功能函数的调用日志输出
  10. vnpy.alpha中的Dataset增加process_data函数,便于测试不同数据处理器的效果
  11. vnpy_ib更新支持ibapi至10.40.1版本

修复

  1. vnpy_gm修复中金所和大商所合约代码转换的问题
  2. vnpy_rqdata修复RqdataGateway中的行情订阅函数错误问题
  3. vnpy_optionmaster修复关闭窗口时的异常报错
  4. vnpy_portfoliostrategy修复遗传算法参数优化中的调用传参问题
  5. 修复Linux系统上的sdist安装问题:vnpy_mini / vnpy_sopt / vnpy_rohon / vnpy_tap / vnpy_tts
  6. vnpy_xt修复XtGateway断线重连时的传参错误
  7. vnpy_optionmaster修复black-76模型中theta计算公式的问题
  8. vnpy_postgresql修复写入主键冲突时数据不会更新的问题
     


windows上利用uv手动安装vnpy系统的完整过程

本文记录了利用python包管理生态管理工具uv,从源码恢复vnpy的运行环境的全过程。

一、采用uv作为python生态管理工具

python包管理生态中存在多种工具,如pip、pip-tools、poetry、conda等,各自具备一定功能。

uv新一代python生态管理工具,它是 Astral 公司推出的一款基于Rust编写的Python包管理工具,旨在成为“Python的 Cargo”。

它提供了快速、可靠且易用的包管理体验,在性能、兼容性和功能上都有出色表现,为 Python项目的开发和管理带来了新的选择。

为什么用uv

与其他 Python 中的包管理工具相比,uv 更像是一个全能选手,它的优势在于:

  1. 速度快:得益于 Rust,uv 工具的速度让人惊艳,比如安装依赖,速度比其他工具快很多
  2. 功能全面: uv 是“一站式服务”的工具,从安装 Python、管理虚拟环境,到安装和管理包,再到管理项目依赖,它统统都能处理得很好
  3. 前景光明:背后有风投公司 Astral支持,且采用了 MIT许可,即使未来出现问题,社区也有应对的办法

使用uv,也可以像 Node]s 或者 Rust 项目那样方便的管理依赖。

二、安装uv

windows上安装uv,在cmd窗口中执行下面命令:

powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"

注意:执行完毕后,会在C:\users\\<用户名称>\.local\bin目录下安装两个文件uv.exe和uvx.exe。

description

进入cmd或者powershell窗口,输入uv命令应该可以得到如下回应就表示uv已经安装成功了:

description

如果报告“uv不是内部或外部命令,也不是可运行的程序或批处理文件。”请打开“我的电脑”| “高级系统设置”| “环境变量”| “用户变量”| “Path”,把C:\users\\<用户名称>\.local\bin目录添加进去就可以。

description

description

三、下载vnpy 4.0源码

description
如图所示,从https://www.github.com下载到vnpy-master.zip源代码,解压后包含如下文件和目录:

description

四、恢复vnpy的运行环境

4.1 安装python 3.13.17

输入下面的命令,限定python版本3.13,实际安装的是python 3.13.17。

uv python install 3.13

4.2 重建虚拟环境

来到vnpy-master源码目录下,输入下面的目录创建虚拟环境目录,当前目录下会多出.venv目录,其中包含虚拟环境的配置文件。

cd d:\vnpy-master
uv venv

4.3 安装ta-lib

这一步是最关键的一步,我是好几天都无法通过这一步,最后在陈总——陈晓优的指导下才通过的。

先把ta-lib二进制包装了,注意这里必须先指明安装的源是https://pypi.vnpy.com。

uv pip install ta-lib --index=https://pypi.vnpy.com

4.4 安装vnpy核心模块

uv pip install . --index=https://pypi.vnpy.com

4.5 安装vnpy其他需要的模块

只有vnpy核心模块是不够的,还需要根据自己的需要安装其他的模块才能够运行实际的交易:

  • 如果采用ctp行情和交易接口需要安装vnpy_ctp模块
  • 如果采用米筐历史书需要安装vnpy_rqdata模块
  • 如果采用cta策略需要安装vnpy_ctastrategy模块
  • 如果要运行cta策略回测需要安装vnpy_ctastrategy模块
  • 如果使用mysql数据库作为数据存储pymysql和vnpy_mysql模块
  • 如果需要对本地数据存储进行管理 vnpy_datamanager 模块

可以合并也可以逐条执行如下命令:

uv add importlib_metadata pymysql
uv add vnpy_ctp vnpy_ctastrategy vnpy_ctabacktester vnpy_datamanager vnpy_mysql vnpy_rqdata

五、运行例子

5.1 运行example\verghna_trader\run.py

vnpy的源码中包含一个例子,位于example\verghna_trader子目录下,其中的run.py最具代表性,它演示了如何利用已经安装的核心和其他配套模块能够完成功能:

  • vntrader界面
  • CTA策略交易界面
  • CTA策略回测
  • 本地数据管理
  • 合约查询
cd d:\vnpy-master\examples\veighna_trader\
uv run run.py

vntrader界面

description

合约查询界面

description



彻底解决tick数据的过滤问题

1. 问题的由来

  • 在文章 发现了vnpy的BarGenerator两个隐藏很深的错误 !中我就已经分析过tick数据对bar生成器的影响。
  • 当前vnpy系统对集合竞价tick与其他tick没有区分能力
  • 当前vnpy系统没有充分利用行情接口提供的状态信息,无法识别有效tick与无效tick,一股脑地发送到策略和应用中,导致bar合成的错误。

2. 问题的解决方法

在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。

tick数据过滤器的功能:

  1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用,每个网关对应一个ick数据过滤;
    要做到这一条,就必须做到对所有已经订阅过的合约的tick的缓存,否则你再次重启系统的时候是无法知道你收到第一个tick是否已经参与过之前bar的合成了。这样你可能重复使用该tick,这是错误的。
    为此我们需要将所有已经订阅过的合约的最新tick进行实时更新,并定期做持久化保存,且在每次系统启动的时候读取加载到系统中。
  2. 过滤无效tick,转发有效交易状态下的tick到系统中,不在有效交易状态下tick做丢弃处理,有效交易状态包括:集合竞价状态和连续竞价状态;
    CTP系统的行情接口中包含的实时更新的合约交易状态通知推送接口,OnRtnInstrumentStatus()。关于这个问题我已经在如何更有效地利用合约交易状态信息——交易状态信息管理器。一文中做了详细的介绍,再次就不赘述。总之合约交易状态通知可以让我识别一个tick是否是有些大tick。
  3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持。
    合约交易状态通知可以让我们知道那些tick是tick,同时可以可以让我们区分那个tick是集合竞价tick,那些是连续竞价tick。对有效tick进性分析利用于我们策略或者应用生成出正确的bar。
  4. 本文只对CtpGateway,CtaEngine、CtaTemplate进行了更改,其他网关系统的道理都是相同的。如果您觉得对您有启发,也可以按同样的方法修改。

3. 过滤无效tick数据的实现代码

声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。

3.1 相关数据类型定义

4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:

class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"


# 有效交易状态
VALID_TRADE_STATUSES = [
    InstrumentStatus.CONTINOUS,
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]

# 集合竞价交易状态
AUCTION_STATUS = [
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]


class StatusEnterReason(Enum):
    """
    品种进入交易状态原因类型 hxxjava debug
    """
    AUTOMATIC = "自动切换"
    MANUAL = "手动切换"
    FUSE = "熔断"

在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:

@dataclass
class StatusData(BaseData):
    """
    hxxjava debug
    """
    symbol:str       
    exchange : Exchange    
    settlement_group_id : str = ""  
    instrument_status : InstrumentStatus = None   
    trading_segment_sn : int = None 
    enter_time : str = ""      
    enter_reason : str = ""  
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def belongs_to(self,vt_symbol:str):
        symbol,exchange_str = vt_symbol.split(".")
        instrument = left_alphas(symbol).upper()
        return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)

3.2 相关消息定义

在vnpy\trader\event.py中增加交易状态消息类型

EVENT_STATUS = "eStatus"                        # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick."            # hxxjava debug

3.3 Gateway的修改

在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:

引用部分增加:

from .event import EVENT_ORIGIN_TICK,EVENT_STATUS         # hxxjava add
from .object import StatusData    # hxxjava add

修改class BaseGateway的on_tick()接口,增加on_status()接口:

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_ORIGIN_TICK, tick)  # hxxjava add             
        # self.on_event(EVENT_TICK, tick)                   
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)  

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)
        self.on_event(EVENT_STATUS + status.vt_symbol, status)

3.4 CtpGateway的修改

修改vnpy_cpt\ctp_gateway.py:

增加引用部分

from vnpy.trader.constant import InstrumentStatus,StatusEnterReason  # hxxjava debug
rom vnpy.trader.object import StatusData,     # hxxjava debug

增加几个映射字典:

# 品种状态进入原因映射  hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
    "0": InstrumentStatus.BEFORE_TRADING,
    "1": InstrumentStatus.NO_TRADING,
    "2": InstrumentStatus.CONTINOUS,
    "3": InstrumentStatus.AUCTION_ORDERING,
    "4": InstrumentStatus.AUCTION_BALANCE,
    "5": InstrumentStatus.AUCTION_MATCH,
    "6": InstrumentStatus.CLOSE,
    "7": InstrumentStatus.CLOSE
}


# 品种状态进入原因映射  hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
    "1": StatusEnterReason.AUTOMATIC,
    "2": StatusEnterReason.MANUAL,
    "3": StatusEnterReason.FUSE
}

为class CtpTdApi增加下面合约状态推送接口:

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约品种状态信息 # hxxjava debug 
        """
        if data:
            # print(f"【data={data}】")
            status =  StatusData(
                symbol = data["InstrumentID"],
                exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
                settlement_group_id = data["SettlementGroupID"],
                instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
                trading_segment_sn = data["TradingSegmentSN"],
                enter_time = data["EnterTime"],
                enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
                exchange_inst_id = data["ExchangeInstID"],
                gateway_name=self.gateway_name
            )
            # print(f"status={status}")
            self.gateway.on_status(status)

3.5 对CtaEngine的进行扩展

增加引用部分

from vnpy.trader.event import EVENT_AUCTION_TICK  # hxxjava add

增加一个对CtaEgine的扩展MyCtaEngine

class MyCtaEngine(CtaEngine):
    """  """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 """

        tick:TickData = event.data
        strategies = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

把vnpy_ctastrategy目录下 的__init__.py中的CtaStrategyApp做如下修改

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))

3.6 CtaTemplate的修改

修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

3.7 为数据库增加最新Tick保存函数

3.7.1 修改vnpy\trader\database.py

为class BaseDatabase增加下面两个接口函数:

    @abstractmethod
    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        pass

    @abstractmethod
    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        pass

3.7.2 修改vnpy_mysql\mysql_database.py

class MyDateTimeField(DateTimeField):
    def get_modifiers(self):
        return [6]

class DbLastTick(Model):    # hxxjava add
    """ 最新TICK数据表映射对象 """

    id = AutoField()

    gateway_name: str = CharField()

    symbol: str = CharField()
    exchange: str = CharField()
    datetime: datetime = MyDateTimeField()

    name: str = CharField()
    volume: float = FloatField()
    turnover: float = FloatField()
    open_interest: float = FloatField()
    last_price: float = FloatField()
    last_volume: float = FloatField()
    limit_up: float = FloatField()
    limit_down: float = FloatField()

    open_price: float = FloatField()
    high_price: float = FloatField()
    low_price: float = FloatField()
    pre_close: float = FloatField()

    bid_price_1: float = FloatField()
    bid_price_2: float = FloatField(null=True)
    bid_price_3: float = FloatField(null=True)
    bid_price_4: float = FloatField(null=True)
    bid_price_5: float = FloatField(null=True)

    ask_price_1: float = FloatField()
    ask_price_2: float = FloatField(null=True)
    ask_price_3: float = FloatField(null=True)
    ask_price_4: float = FloatField(null=True)
    ask_price_5: float = FloatField(null=True)

    bid_volume_1: float = FloatField()
    bid_volume_2: float = FloatField(null=True)
    bid_volume_3: float = FloatField(null=True)
    bid_volume_4: float = FloatField(null=True)
    bid_volume_5: float = FloatField(null=True)

    ask_volume_1: float = FloatField()
    ask_volume_2: float = FloatField(null=True)
    ask_volume_3: float = FloatField(null=True)
    ask_volume_4: float = FloatField(null=True)
    ask_volume_5: float = FloatField(null=True)

    localtime: datetime = DateTimeField(null=True)

    class Meta:
        database = db
        indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)

class MysqlDatabase的初始化做如下修改:

    def __init__(self) -> None:
        """"""
        self.db = db
        self.db.connect()
        self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview])   # hxxjava add DbLastTick,DbContractData

再为class MysqlDatabase添加下面两个函数:

    def save_last_tick(self, ticks: List[TickData]) -> bool:
        """
        Save last tick data into database.  # hxxjava add
        """
        vt_symbols = [t.vt_symbol for t in ticks]

        # 删除ticks列表中包含合约的旧的tick记录
        d: ModelDelete = DbLastTick.delete().where(
            (DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
        )
        count = d.execute()
        # print(f"delete {count} last ticks")

        # 构造最新的ticks列表数据
        data = []
        for t in ticks:
            tick:TickData = deepcopy(t)     # hxxjava change
            tick.datetime = tick.datetime

            d = tick.__dict__
            d["exchange"] = d["exchange"].value
            d.pop("vt_symbol")
            data.append(d)
            # print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))

        # 使用upsert操作将数据更新到数据库中
        with self.db.atomic():
            for c in chunked(data, 50):
                DbLastTick.insert_many(c).on_conflict_replace().execute()

        return True

    def load_last_tick(
        self,
        gateway_name : str,
        exchange: Exchange = None,
        symbol: str = None
    ) -> List[TickData]:
        """
        Load last tick data from database.  # hxxjava add
        """
        try:
            # 从DbLastTick查询符合条件的最新tick记录
            s: ModelSelect = (
                DbLastTick.select().where(
                    (DbLastTick.gateway_name == gateway_name)
                    & (exchange is None or DbLastTick.exchange == exchange.value)
                    & (symbol is None or DbLastTick.symbol == symbol)
                ).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
            )

            # 利用最新tick记录构造ticks列表
            ticks: List[TickData] = []
            for db_tick in s:
                tick:TickData = TickData(
                    symbol=db_tick.symbol,
                    exchange=Exchange(db_tick.exchange),
                    datetime=to_china_tz(db_tick.datetime),
                    name=db_tick.name,
                    volume=db_tick.volume,
                    turnover=db_tick.turnover,
                    open_interest=db_tick.open_interest,
                    last_price=db_tick.last_price,
                    last_volume=db_tick.last_volume,
                    limit_up=db_tick.limit_up,
                    limit_down=db_tick.limit_down,
                    open_price=db_tick.open_price,
                    high_price=db_tick.high_price,
                    low_price=db_tick.low_price,
                    pre_close=db_tick.pre_close,
                    bid_price_1=db_tick.bid_price_1,
                    bid_price_2=db_tick.bid_price_2,
                    bid_price_3=db_tick.bid_price_3,
                    bid_price_4=db_tick.bid_price_4,
                    bid_price_5=db_tick.bid_price_5,
                    ask_price_1=db_tick.ask_price_1,
                    ask_price_2=db_tick.ask_price_2,
                    ask_price_3=db_tick.ask_price_3,
                    ask_price_4=db_tick.ask_price_4,
                    ask_price_5=db_tick.ask_price_5,
                    bid_volume_1=db_tick.bid_volume_1,
                    bid_volume_2=db_tick.bid_volume_2,
                    bid_volume_3=db_tick.bid_volume_3,
                    bid_volume_4=db_tick.bid_volume_4,
                    bid_volume_5=db_tick.bid_volume_5,
                    ask_volume_1=db_tick.ask_volume_1,
                    ask_volume_2=db_tick.ask_volume_2,
                    ask_volume_3=db_tick.ask_volume_3,
                    ask_volume_4=db_tick.ask_volume_4,
                    ask_volume_5=db_tick.ask_volume_5,
                    localtime=db_tick.localtime,
                    gateway_name=db_tick.gateway_name
                )
                ticks.append(tick)

            return ticks

        except:
            # 当DbLastTick表不存在的时候,会发生错误
            return []

3.8 tick数据过滤器的实现

在vnpy.usertools下创建tickfilter.py文件,其内容如下:

"""
本文件主要实现tick数据过滤器——TickFilter。

tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持

作者:hxxjava
日期:2022-06-16
修改日期:              修改原因:  
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
    EVENT_ORIGIN_TICK,
    EVENT_AUCTION_TICK,
    EVENT_TICK,
    EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol


def left_alphas(instr:str):
    """
    得到字符串左边的字符部分
    """
    ret_str = ''
    for s in instr:
        if s.isalpha():
            ret_str += s
        else:
            break
    return ret_str

def get_vt_instrument(vt_symbol:str):
    """
    从完整合约代码转换到完整品种代码
    """    
    symbol,exchange = extract_vt_symbol(vt_symbol)
    instrument = left_alphas(symbol)
    return f"{instrument}.{exchange.value}"


class TickFilter():
    """ tick数据过滤器 """
    CHECK_INTERVAL:int = 5  # 更新到数据库间隔

    def __init__(self,event_engine:EventEngine,gateway_name:str):
        """ tick数据过滤器初始化 """
        self.event_engine = event_engine
        self.gateway_name = gateway_name
        self.db = get_database()

        # 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
        self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}

        # 品种及合约状态字典 { vt_symbol : StatusData }
        self.statuses:Dict[str,StatusData] = {}
        self.second_cnt = 0

        self.load_last_ticks()
        self.register_event()

        # print(f"TickFilter {gateway_name}")

    def load_last_ticks(self):
        """ 
        加载属于网关名称为self.gateway_name的最新tick列表 
        """
        last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
        for tick in last_ticks:
            self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)

        # print(f"load {len(last_ticks)} last ticks")

    def register_event(self):
        """ 注册消息 """
        self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
        self.event_engine.register(EVENT_STATUS,self.process_status_event)
        self.event_engine.register(EVENT_TIMER,self.check_last_ticks)        

    def process_tick_event(self,event:Event):
        """ 对原始tick进行过滤 """
        tick:TickData = event.data

        # 检查tick合约的经验状态是否位有效交易状态
        status:StatusData = self.statuses.get(tick.vt_symbol,None)
        if not status:
            vt_instrument = get_vt_instrument(tick.vt_symbol)
            status = self.statuses.get(vt_instrument,None)
            if not status:
                # 未收到交易状态,返回
                return

        if status.instrument_status not in VALID_TRADE_STATUSES:
            # 不在有效交易状态,返回
            return

        key = (tick.gateway_name,tick.vt_symbol)
        _,oldtick = self.last_ticks.get(key,(None,None))
        valid_tick = False
        if not oldtick:
            # 没有该合约的历史tick
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        elif tick.datetime > oldtick.datetime:
            # 
            self.last_ticks[key] = (True,tick)
            valid_tick = True

        else:
            print(f"【特别tick = {tick}】")

        if valid_tick == True:
            # 如果是有效的tick
            if status.instrument_status != InstrumentStatus.CONTINOUS:
                # 发送集合竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
                self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))  

            else:
                # 发送连续竞价tic消息到系统中
                self.event_engine.put(Event(EVENT_TICK,tick))
                self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))  

    def process_status_event(self, event: Event):  
        """ 交易状态通知消息处理 """
        status:StatusData = event.data
        self.statuses[status.vt_symbol] = status

        # print(f"【{status.gateway_name} {status}】")

    def check_last_ticks(self,event:Event) -> None:
        """ 原始tick过滤器 """
        self.second_cnt += 1
        if self.second_cnt % self.CHECK_INTERVAL == 0:
            # 如果到了定时间隔

            # 查询所有更新的tick
            changed_ticks = [] 

            for key,(update,tick) in self.last_ticks.items():
                if update:
                    changed_ticks.append(tick)
                    self.last_ticks[key] = (False,tick)

            if changed_ticks:
                # 如果存在更新的tick,保存到数据库
                t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
                t.start()
                # print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")

3.9 把tick数据过滤器安装到主引擎MainEngine上去

修改vnpy\trader\engine.py

添加引用部分

from vnpy.usertools.tickfilter import TickFilter    # hxxjava add

修改MainEngine的

在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:

self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add

修改其add_gateway(),内容如下:

    def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
        """
        Add gateway.
        """
        # Use default name if gateway_name not passed
        if not gateway_name:
            gateway_name = gateway_class.default_name

        gateway = gateway_class(self.event_engine, gateway_name)
        self.gateways[gateway_name] = gateway

        # Add gateway supported exchanges into engine
        for exchange in gateway.exchanges:
            if exchange not in self.exchanges:
                self.exchanges.append(exchange)

        # add a tick data filter for the gateway    #  hxxjava add  
        if gateway_name not in self.tick_filters:
            self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)

        return gateway

4. 经过上面的一系列修改,你获得了哪些好处?

  • 你的策略再也不会收到重复数据和垃圾数据
  • 此以后你的CTA策略中必须加入一个on_auction_tick()接口函数,用来接受每个交易日集合竞价所产生的tick。如何使用这个tick你有你的方法。
  • 在合成K线的时候你才可能构成正确的K线,比如BarGenerator对跨日tick时间戳的处理错误问题,在此也会迎刃而解。

4.1 现在来梳理下我们都干了哪些事情

  1. 在CtpGateway中引入了合约交易状态,这可以用来过滤无效数据,同时还能够识别集合竞价tick。
  2. 在database中增加了最新tick持久化保存,这为新的tick是否是重复的判断提供支持。
  3. 提供有效tick的分类,在CTA策略的模板中增加on_auction_tick()接口使得BarGenerator正确处1分钟bar的成交量和成交额成为可能。

4.2 非CTP网关使用者是否也可以这样做?

只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。

5. 解决BarGenerator统计bar成交量和成交额错误的方法

5.1 这是对BarGenerator做出点修改,

  • 修改BarGenerator的初始化函数
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ):
        """ Constructor """      

        ... ...  # 其他代码省略

        self.auction_tick:TickData = None
        self.last_tick: TickData = None
  • 增加BarGenerator的集合竞价tick处理函数
    def update_auction_tick(self,tick:TickData):
        """ 更新集合竞价tick """
        self.auction_tick = tick
  • 修改BarGenerator的1分钟bar合成函数
    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        if self.auction_tick:
            # 合约集合竞价tick到当前tick
            tick.high_price = max(tick.high_price,self.auction_tick.high_price)
            tick.low_price = min(tick.low_price,self.auction_tick.low_price)

            # 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
            self.last_tick = deepcopy(self.auction_tick)
            # 成交量和成交额每天从0开始单调递增
            self.last_tick.volume = 0.0   
            self.last_tick.turnover = 0.0

            # 用完集合竞价tick就丢弃
            self.auction_tick = None

      ... ...  # 其他代码省略

5.2 您的策略关于集合竞价tick更新的回调函数:

    def on_auction_tick(self, tick: TickData):
        """
        集合竞价tick处理
        """
        self.bg.update_auction_tick(tick)    # 假设self.bg是已经创建过的bar生成器

两点说明:

  1. 如果你在阅读本文的时候觉得有点一头雾水,可以搜索'hxxjava'字符串,将会显示大部分修改的代码,仔细揣摩下,就会知道我做了什么了!
  2. 另外本贴中还有一部分涉及到条件单的代码,如果出现错误,可以查找我的关于条件单的帖子比停止单更好用的条件单——ConditionOrder,这里就不再重复贴出那部分代码了。


开始报名:2025小班特训营第三场【VeighNa量化AI智能体应用】!

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:VeighNa小助手 | 发布时间:2025-10-21
 
老规矩还是先放几张之前特训营的照片:

description
准备完毕,静候小班同学到达

description
学习量化,掌握核心理论框架

description
深入代码,分析策略逻辑细节

description

现场实践,剖析机器学习算法

基于之前学员的反馈,小班特训营这种2天10小时+的高强度课程通过线上直播学习的效果并不理想。为了保证更好的学习质量,我们对授课模式进行了调整:后续小班特训营不再提供线上直播参加和视频内容回看,而是改为同一主题的每场小班课都可以再次到场听讲。同时,我们会针对每一个特训营的主题建立专项社群,持续提供专业交流与学习服务,而不再只局限于三个月的答疑时间。

小班特训营优先面向买方投资机构。由于AI Agent(智能体)开发本身的复杂性(大模型知识、GPU算力需求、编程开发水平等),不建议新手报名。本场课程目前仅剩2个名额,感兴趣的同学请抓紧。

 

VeighNa量化AI智能体应用

日期:2025年11月29日(周六)和11月30日(周日)

时间:两天下午1点-6点,共计10小时

地点:上海浦东(具体地址会在报名成功后发送)

大纲

1. 认识AI Agent和准备开发环境

a. 课程导览与AI Agent前景

​ i. 课程目标、内容结构与学习路径

​ ii. AI Agent在量化金融领域的应用前景

b. VeighNa Agent (vnag) 框架介绍

​ i. 项目定位、设计哲学与核心优势

​ ii. vnag的系统架构与模块概览

c. 开发环境搭建

​ i. 安装Python、vnag及相关依赖

​ ii. 配置IDE(如Cursor)

2. 连接大模型:vnag的Gateway模块

a. 大模型API核心概念

​ i. 主流AI服务商(OpenAI, Anthropic)API的特点与异同

​ ii. API Key管理与安全实践

b. vnag.gateway 抽象设计

​ i. 详解Gateway基类,理解统一接口的设计思想

​ ii. 实战:实现OpenaiGateway与AnthropicGateway

c. 流式数据输出(Streaming)

​ i. 流式输出在交互式应用中的重要性

​ ii. 在vnag中处理流式响应的核心代码解析

3. 构建Agent知识库:拆分与向量化

a. RAG与知识工程

​ i. RAG(Retrieval-Augmented Generation)的基本原理

​ ii. 文档、代码、FAQ等不同类型资料的处理策略

b. vnag.segmenter:智能文本拆分

​ i. 详解Segmenter基类与拆分器的作用

ii. 拆分器实战:CppSegmenter、PythonSegmenter、MarkdownSegmenter

c. vnag.vector:向量化存储与检索

​ i. 向量数据库基础(以ChromaDB为例)

​ ii. 详解Vector基类,实现数据入库与相似度搜索

​ iii. 动手实践:将拆分后的文本块向量化并存入数据库

4. Agent核心能力:Function Call与UI交互

a. 大模型的Function Call(函数调用)

​ i. Function Call的原理与应用场景

​ ii. 基于vnag.engine封装和调用Function Call的技巧

b. MCP:标准化的消息通信协议

​ i. 为何需要标准化的通信协议

​ ii. 上手开发一套基础MCP服务

c. vnag.ui:Qt图形前端解决方案

​ i. vnag的UI架构解析:Window、Widget与Worker

​ ii. UI与后端Agent的事件驱动与数据交互机制

d. 综合实战:构建一个简单的知识库问答机器人

5. 实战案例一:AI驱动的CTP API接口适配开发

a. 任务背景:量化交易接口升级的痛点

b. Agent任务设计与工作流

​ i. 知识库:新旧版本的CTP API头文件、官方文档

​ ii. 工具集(Function Call):文件读写、代码比对、代码生成

​ iii. 工作流:定位变更 -> 生成方案 -> 修改代码 -> 验证修正

c. Live Coding:可复用的全自动量化API接口升级开发智能体

6. 实战案例二:AI赋能的CTA策略自动化投研

a. 任务背景:从策略思想到回测报告的“最后一公里”

b. Agent任务设计与工作流

​ i. 知识库:CTA策略模板、API文档、优秀策略范例

​ ii. 工具集(Function Call):代码生成检查、回测引擎调用、报告分析

​ iii. 工作流:自然语言描述 -> 策略代码生成 -> 初步回测检验 -> 多轮参数优化 -> 编写投研报告

c. Live Coding:与AI Agent对话,从0到1完成完整CTA策略的投研

7. 实战案例三:AI驱动的股票因子挖掘

a. 任务背景:如何从海量信息中自动化挖掘有效Alpha因子

b. Agent任务设计与工作流

​ i. 知识库:指定的因子研究报告(PDF)或网页链接

​ ii. 工具集(Function Call):内容解析、vnpy.alpha调用、指标计算、数据入库

​ iii. 工作流:阅读资料 -> 生成公式 -> 检验计算 -> 分析结果 -> 循环迭代

c. Live Coding:给AI一篇券商研报,看它如何自动挖掘Alpha因子

价格:11999元

 

报名请扫描下方二维码添加小助手提供相关信息(想参加的课程、姓名、手机、公司、职位),报名结果以确认回复为准:
 

description

 



2025年第7次社区活动 - 【Kronos大模型的CTA应用实践】- 11月1日(上海)

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:VeighNa小助手 | 发布时间:2025-10-16
 
【社区活动尊享卡】的受欢迎程度大幅超出我们的预期,为了保证每场社区活动的交流质量,尊享卡已经变更为仅对部分专业交易员用户定向提供。对于参加活动较多的同学强烈推荐!购买请扫描二维码添加小助手咨询:

description

上一场社区活动中关于金融K线大模型 Kronos 的初步介绍,引发了社区同学们的广泛关注和积极讨论。

简单来说,Kronos 是一个专为金融市场设计的大模型,它的核心思想是将K线形态“词化”(Tokenize)——即视单根K线为“单词”,连续K线为“句子”,从而能够运用强大的Transformer架构来学习市场价格数据中的深层模式。根据论文中的基准测试结果,与LSTM等传统时序模型相比,其预测性能获得了显著提升。

本场活动中我们将从理论走向实践,深入探讨Kronos模型的实现与使用细节,并重点演示如何在 VeighNa 的CTA策略模块中,集成该模型以构建一套从数据准备、模型调用到策略回测的完整量化投研流程。

本场活动将于11月1日(周六)下午2:00至5:00在上海举办。普通报名仅支持线下参会,尊享卡持有者和Elite会员可通过线上直播参与。活动具体地址将在微信群中公布,请在报名成功后扫码加入社区活动群,以便获取相关信息!

 

活动内容大纲

 

  1. 初探Kronos金融K线大模型

    a. 金融时序分析的目标:价格 vs 收益率
    b. 传统时序模型(如LSTM等)的不足
    c. Kronos针对价格序列的token构建方式

  2. 使用Kronos来预测价格波动

    a. 从0搭建Kronos大模型运行环境
    b. 梳理论文中的预测方法论细节
    c. 正确理解Kronos模型的输出结果
    d. 批量计算价格波动的预测概率分布

  3. 尝试在CTA策略中应用Kronos

    a. 基于价格波动的预测结果构建趋势信号
    b. 解决Kronos模型的调用性能开销问题
    c. 期货市场的KronosStrategy绩效分析
    d. 完整投研回测代码的实现细节探讨

  4. 闭门交流环节

 
时间:11月1日 14:00-17:00

地点:上海(具体地址后续在微信群中通知)

报名费:99元(Elite会员免费参加)

报名方式:扫描下方二维码报名(报名后请扫码加入社区活动微信群获取参会地址)

 

description

 


新消息

7个月前

统计

主题
10658
帖子
40325
已注册用户
72228
最新用户
在线用户
188
在线来宾用户
47500
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】