发布于VeighNa社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2025-11-10
上周发布了VeighNa的4.2.0版本,本次更新的主要内容是对RiskManager交易事前风控模块进行了重构升级,同时调整了框架的日志输出落地功能,完整适配期货程序化交易监管新规中对于程序化交易系统的功能要求。
对于已经安装了VeighNa Studio 4.0版本的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载VeighNa Studio-4.2.0,体验一键安装的量化交易Python发行版,下载链接:
https://download.vnpy.com/veighna_studio-4.2.0.exe
新版 vnpy_riskmanager 模块设计的核心是标准化的风控规则模板 RuleTemplate。任何自定义风控规则都必须继承该基类,这保证了所有规则都具备统一的接口,使得风控引擎(RiskEngine)能够对其进行动态发现、加载和执行。
RuleTemplate 为风控规则定义了完整的生命周期和核心逻辑方法。创建一个新的风控规则,即是创建一个继承自 RuleTemplate 的子类,并根据实际需求实现其中的相关方法。
一个标准的风控规则类主要包含以下几个部分:
name: str: 规则的唯一中文名称,用于图形界面上该规则相关的信息显示。parameters: dict[str, str]: 定义规则的可配置参数。该字典的键为参数的英文名,值为其在图形界面上显示的中文名,方便用户理解和修改。variables: dict[str, str]: 定义规则需要实时监控的状态变量,同样字典的键为变量的英文名,值为其在图形界面上显示的中文名。on_init(self) -> None: 规则的初始化方法。在被风控引擎加载时调用,主要用于设定参数的默认值以及初始化监控变量的初始状态。on_order(self, order: OrderData) -> None: 用于处理委托状态更新的回调方法。规则可在此方法内更新内部状态,例如跟踪活动委托的数量。on_trade(self, trade: TradeData) -> None: 用于处理成交回报的回调方法。规则可在此方法内统计累计成交信息等。on_tick(self, tick: TickData) -> None: 用于处理行情切片数据的回调方法。适用于需要基于实时行情进行判断的规则,但需注意其可能会因频繁调用而带来的性能开销。check_allowed(self, req: OrderRequest, gateway_name: str) -> bool: 风控检查的核心逻辑实现。每当交易系统产生新的委托请求(OrderRequest)时,风控引擎都会调用此方法进行检查。若订单允许通过,则方法返回 True;若订单需要被拦截,则应先调用 self.write_log() 记录拦截的具体原因,随后返回 False。以下是一个完整的纯 Python 规则示例代码,实现了对单笔委托数量的上限限制。
from vnpy.trader.object import OrderRequest, OrderData
from ..template import RuleTemplate
class OrderVolumeRule(RuleTemplate):
"""单笔委托手数限制规则"""
# 规则的中文名称
name: str = "委托数量检查"
# 定义可配置的参数
parameters: dict[str, str] = {
"max_volume": "单笔最大委托量"
}
# 定义需监控的变量(可为空)
variables: dict[str, str] = {}
def on_init(self) -> None:
"""初始化"""
self.max_volume: int = 100 # 设置参数的默认值
def check_allowed(self, req: OrderRequest, gateway_name: str) -> bool:
"""核心风控逻辑"""
if req.volume > self.max_volume:
msg = f"委托数量{req.volume}超过参数限制{self.max_volume}:{req}"
self.write_log(msg)
returnFalse
returnTrue
vnpy_riskmanager 模块提供了一系列标准化的内置风控规则,覆盖了交易过程中的多个关键风险点。所有内置规则均提供纯 Python 和 Cython 两种实现,风控引擎在加载时会优先使用性能更优的 Cython 版本。
ActiveOrderRule (活动委托数量上限)此规则用于限制全局的活动(可撤)委托数量。当活动委托笔数超过设定的 active_order_limit 阈值时,新的委托请求将被拦截。它通过监听 on_order 事件来实时跟踪委托状态的变更,从而维护准确的活动委托计数。
DailyLimitRule (每日流控上限)该规则用于对整个交易日内的委托、撤单、成交行为进行总次数和单个合约次数的双重流控。它包含六个可配置参数:total_order_limit (汇总委托上限), total_cancel_limit (汇总撤单上限), total_trade_limit (汇总成交上限), contract_order_limit (合约委托上限), contract_cancel_limit (合约撤单上限), contract_trade_limit (合约成交上限)。在发单时,会检查各项累计值是否超限;在收到委托、撤单和成交回报时,会更新相应的计数。
DuplicateOrderRule (重复报单检查)用于防范因系统或人为失误导致的重复下单。该规则会统计历史上所有发出委托的缓存,当一个新的委托请求在合约、类型、方向、开平、价格、数量等字段上完全相同时,则会累加其重复次数。如果该次数超过 duplicate_order_limit 上限(默认为10次),委托就会被执行拦截。
OrderSizeRule (单笔委托规模上限)这是一个基础但极为重要的风控规则,用于防止“乌龙指”。它包含两个维度的限制:order_volume_limit 用于限制任何单笔委托请求的最大允许数量;order_value_limit 用于限制该笔委托的总价值(数量 价格 合约乘数)。任何一个维度超限都会导致委托被拦截。
OrderValidityRule (委托指令合法性监控)此规则在委托发送至交易接口前,对其指令的合法性进行预检查,提前拦截无效委托,减轻后端系统压力。检查内容包括:
main_engine.get_contract 检查委托的合约是否存在。pricetick 的整数倍。contract.max_volume。contract.min_volume。风控引擎对所有规则的检查是串行执行的,在交易繁忙时,多条纯Python规则累加的检查延迟可能成为性能瓶颈。为了解决这个问题,vnpy_riskmanager模块支持使用Cython对代码进行性能优化。开发者可以将原有的纯Python风控规则,通过Cython语法重构为C语言级别的高性能版本,从而将检查延迟降低到微秒级。
编写 Cython 风控规则的关键是将核心逻辑(尤其是高频调用的check_allowed函数)放入一个 cdef class 中,同时保留一个标准的 Python class 作为外层包装器,以供风控引擎识别和加载。
以下是内置风控规则中 OrderSizeRule 的 Cython 版本实现代码作为示例:
# cython: language_level=3
from vnpy_riskmanager.template cimport RuleTemplate
cdef class OrderSizeRuleCy(RuleTemplate):
"""委托规模检查风控规则 (Cython 版本)"""
cdef public int order_volume_limit
cdef public float order_value_limit
cpdef void on_init(self):
"""初始化"""
self.order_volume_limit = 500
self.order_value_limit = 1_000_000
cpdef bint check_allowed(self, object req, str gateway_name):
"""检查是否允许委托"""
cdef object contract
cdef float order_value
if req.volume > self.order_volume_limit:
self.write_log(f"委托数量{req.volume}超过上限{self.order_volume_limit}:{req}")
return False
contract = self.get_contract(req.vt_symbol)
if contract and req.price: # 只考虑限价单
order_value = req.volume * req.price * contract.size
if order_value > self.order_value_limit:
self.write_log(f"委托价值{order_value}超过上限{self.order_value_limit}:{req}")
return False
return True
class OrderSizeRule(OrderSizeRuleCy):
"""委托规模检查规则的Python包装类"""
name: str = "委托规模检查"
parameters: dict[str, str] = {
"order_volume_limit": "委托数量上限",
"order_value_limit": "委托价值上限",
}
对于自定义的 Cython 规则可以通过一个独立的编译脚本来编译生成pyd文件(注意编译过程中会需要用到 C++ 编译器,如 Windows 上的 Visual Studio):
创建 build_rule.py: 在项目根目录下创建此脚本。
from setuptools import setup, Extension
from Cython.Build import cythonize
# 注意:需与 .pyx 文件名一致
rule_name = "order_size_rule_cy"
extensions = [
Extension(
name=f"vnpy_riskmanager.rules.{rule_name}",
sources=[f"vnpy_riskmanager/rules/{rule_name}.pyx"],
)
]
setup(
ext_modules=cythonize(
extensions,
compiler_directives={"language_level": "3"}
)
)
执行编译: 在项目根目录下打开终端,运行编译命令。
python build_rule.py build_ext --inplace
--inplace 参数会使编译生成的二进制文件(.pyd 或 .so)直接输出到 vnpy_riskmanager/rules 目录下。编译完成后重启程序,RiskEngine 会自动发现并优先加载这个高性能的 Cython 规则。
当一个委托请求因为触发了某项风控规则而被拦截时,vnpy_riskmanager 模块会通过多种方式立即向用户发出警报,确保交易员能够第一时间注意到异常情况并进行处理。
一旦有委托被拦截,风控引擎会立刻调用操作系统内置的提示音发出警报,确保即使用户的注意力不在交易界面上,也能迅速察觉到风控事件的发生,从而避免错过关键的风险信息。
在声音报警的同时,一个可视化的气泡弹窗会从系统托盘区域(通常是屏幕右下角)弹出,该弹窗会明确展示具体的拦截原因,例如“委托数量100超过参数限制50”,气泡弹窗会持续显示一段时间(默认30秒)。
为了确保风险事件的可追溯性,所有拦截信息在触发实时报警的同时,也会通过VeighNa框架的标准化日志系统进行输出,具体包括:主界面的日志监控组件、启动系统的命令行终端,以及本地日志文件。
为了满足监管新规中的要求,确保程序化交易系统的关键操作都有据可查,VeighNa的4.2.0版本在日志记录方面进行了全面增强,主要更新包括:
INFO级别的系统常规操作信息将同步输出到终端和日志文件。MainEngine 会自动记录关键交易生命周期事件,覆盖连接登录、行情订阅、委托下单及委托撤单等操作。CtaStrategy、PortfolioStrategy 等)的日志将自动注册并转发至核心日志引擎,实现对策略层行为的全面监控。
vnpy_riskmanager模块重构
a. 采用插件式设计,提供标准化风控规则开发模板
b. 支持中国期货程序化交易系统监管要求中的风控规则
c. 输出拦截日志后播放提示声音,目前仅支持Windows系统
d. 使用系统托盘栏图标,弹出交易风控拦截日志气泡框
e. 提供Cython版本的风控规则开发模板以及具体规则实现
f. 支持自动扫描加载用户自定义风控规则(放置于Trader目录下的rules文件夹中)
vnpy_polygon数据服务接口,支持海外股票、期货、期权等资产品种的历史数据获取
本文记录了利用python包管理生态管理工具uv,从源码恢复vnpy的运行环境的全过程。
python包管理生态中存在多种工具,如pip、pip-tools、poetry、conda等,各自具备一定功能。
uv新一代python生态管理工具,它是 Astral 公司推出的一款基于Rust编写的Python包管理工具,旨在成为“Python的 Cargo”。
它提供了快速、可靠且易用的包管理体验,在性能、兼容性和功能上都有出色表现,为 Python项目的开发和管理带来了新的选择。
与其他 Python 中的包管理工具相比,uv 更像是一个全能选手,它的优势在于:
使用uv,也可以像 Node]s 或者 Rust 项目那样方便的管理依赖。
windows上安装uv,在cmd窗口中执行下面命令:
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
注意:执行完毕后,会在C:\users\\<用户名称>\.local\bin目录下安装两个文件uv.exe和uvx.exe。

进入cmd或者powershell窗口,输入uv命令应该可以得到如下回应就表示uv已经安装成功了:

如果报告“uv不是内部或外部命令,也不是可运行的程序或批处理文件。”请打开“我的电脑”| “高级系统设置”| “环境变量”| “用户变量”| “Path”,把C:\users\\<用户名称>\.local\bin目录添加进去就可以。


三、下载vnpy 4.0源码

如图所示,从https://www.github.com下载到vnpy-master.zip源代码,解压后包含如下文件和目录:

输入下面的命令,限定python版本3.13,实际安装的是python 3.13.17。
uv python install 3.13
来到vnpy-master源码目录下,输入下面的目录创建虚拟环境目录,当前目录下会多出.venv目录,其中包含虚拟环境的配置文件。
cd d:\vnpy-master
uv venv
这一步是最关键的一步,我是好几天都无法通过这一步,最后在陈总——陈晓优的指导下才通过的。
先把ta-lib二进制包装了,注意这里必须先指明安装的源是https://pypi.vnpy.com。
uv pip install ta-lib --index=https://pypi.vnpy.com
uv pip install . --index=https://pypi.vnpy.com
只有vnpy核心模块是不够的,还需要根据自己的需要安装其他的模块才能够运行实际的交易:
可以合并也可以逐条执行如下命令:
uv add importlib_metadata pymysql
uv add vnpy_ctp vnpy_ctastrategy vnpy_ctabacktester vnpy_datamanager vnpy_mysql vnpy_rqdata
vnpy的源码中包含一个例子,位于example\verghna_trader子目录下,其中的run.py最具代表性,它演示了如何利用已经安装的核心和其他配套模块能够完成功能:
cd d:\vnpy-master\examples\veighna_trader\
uv run run.py


安装流程:





打开terminal-注意环境切换成 py37_vnpy,执行以下命令,安装需要的插件
pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

遇到安装失败的可以单独安装:
pip install PyQt5 -i http://pypi.douban.com/simple --trusted-host pypi.douban.com



备注:
如果需要在cmd 下使用 py37_vnpy 环境。
打开 CMD
运行: conda activate py37_vnpy
会切换到py37_vnpy环境下

作为初学者,面对 vnpy 无所不包、博大精深的丰富内容,试图用图形对 vnpy 的运行流程做一个归纳。
不到之处,还请各位多多指正

在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。
声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。
4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
# 有效交易状态
VALID_TRADE_STATUSES = [
InstrumentStatus.CONTINOUS,
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
# 集合竞价交易状态
AUCTION_STATUS = [
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
class StatusEnterReason(Enum):
"""
品种进入交易状态原因类型 hxxjava debug
"""
AUTOMATIC = "自动切换"
MANUAL = "手动切换"
FUSE = "熔断"
在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:
@dataclass
class StatusData(BaseData):
"""
hxxjava debug
"""
symbol:str
exchange : Exchange
settlement_group_id : str = ""
instrument_status : InstrumentStatus = None
trading_segment_sn : int = None
enter_time : str = ""
enter_reason : str = ""
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def belongs_to(self,vt_symbol:str):
symbol,exchange_str = vt_symbol.split(".")
instrument = left_alphas(symbol).upper()
return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)
在vnpy\trader\event.py中增加交易状态消息类型
EVENT_STATUS = "eStatus" # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick." # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick." # hxxjava debug
在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:
from .event import EVENT_ORIGIN_TICK,EVENT_STATUS # hxxjava add
from .object import StatusData # hxxjava add
def on_tick(self, tick: TickData) -> None:
"""
Tick event push.
Tick event of a specific vt_symbol is also pushed.
"""
self.on_event(EVENT_ORIGIN_TICK, tick) # hxxjava add
# self.on_event(EVENT_TICK, tick)
# self.on_event(EVENT_TICK + tick.vt_symbol, tick)
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
self.on_event(EVENT_STATUS + status.vt_symbol, status)
修改vnpy_cpt\ctp_gateway.py:
from vnpy.trader.constant import InstrumentStatus,StatusEnterReason # hxxjava debug
rom vnpy.trader.object import StatusData, # hxxjava debug
# 品种状态进入原因映射 hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
"0": InstrumentStatus.BEFORE_TRADING,
"1": InstrumentStatus.NO_TRADING,
"2": InstrumentStatus.CONTINOUS,
"3": InstrumentStatus.AUCTION_ORDERING,
"4": InstrumentStatus.AUCTION_BALANCE,
"5": InstrumentStatus.AUCTION_MATCH,
"6": InstrumentStatus.CLOSE,
"7": InstrumentStatus.CLOSE
}
# 品种状态进入原因映射 hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
"1": StatusEnterReason.AUTOMATIC,
"2": StatusEnterReason.MANUAL,
"3": StatusEnterReason.FUSE
}
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约品种状态信息 # hxxjava debug
"""
if data:
# print(f"【data={data}】")
status = StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
# print(f"status={status}")
self.gateway.on_status(status)
from vnpy.trader.event import EVENT_AUCTION_TICK # hxxjava add
class MyCtaEngine(CtaEngine):
""" """
condition_filename = "condition_order.json" # 历史条件单存储文件
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
def load_active_condtion_orders(self):
""" """
return {}
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 """
tick:TickData = event.data
strategies = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
__init__.py中的CtaStrategyApp做如下修改class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
# engine_class = CtaEngine
engine_class = MyCtaEngine # hxxjava add
widget_name = "CtaManager"
icon_name = str(app_path.joinpath("ui", "cta.ico"))
修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():
@virtual
def on_auction_tick(self, tick: TickData):
"""
Callback of new tick data update. # hxxjava add for auction tick
"""
pass
为class BaseDatabase增加下面两个接口函数:
@abstractmethod
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
pass
@abstractmethod
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
pass
class MyDateTimeField(DateTimeField):
def get_modifiers(self):
return [6]
class DbLastTick(Model): # hxxjava add
""" 最新TICK数据表映射对象 """
id = AutoField()
gateway_name: str = CharField()
symbol: str = CharField()
exchange: str = CharField()
datetime: datetime = MyDateTimeField()
name: str = CharField()
volume: float = FloatField()
turnover: float = FloatField()
open_interest: float = FloatField()
last_price: float = FloatField()
last_volume: float = FloatField()
limit_up: float = FloatField()
limit_down: float = FloatField()
open_price: float = FloatField()
high_price: float = FloatField()
low_price: float = FloatField()
pre_close: float = FloatField()
bid_price_1: float = FloatField()
bid_price_2: float = FloatField(null=True)
bid_price_3: float = FloatField(null=True)
bid_price_4: float = FloatField(null=True)
bid_price_5: float = FloatField(null=True)
ask_price_1: float = FloatField()
ask_price_2: float = FloatField(null=True)
ask_price_3: float = FloatField(null=True)
ask_price_4: float = FloatField(null=True)
ask_price_5: float = FloatField(null=True)
bid_volume_1: float = FloatField()
bid_volume_2: float = FloatField(null=True)
bid_volume_3: float = FloatField(null=True)
bid_volume_4: float = FloatField(null=True)
bid_volume_5: float = FloatField(null=True)
ask_volume_1: float = FloatField()
ask_volume_2: float = FloatField(null=True)
ask_volume_3: float = FloatField(null=True)
ask_volume_4: float = FloatField(null=True)
ask_volume_5: float = FloatField(null=True)
localtime: datetime = DateTimeField(null=True)
class Meta:
database = db
indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)
class MysqlDatabase的初始化做如下修改:
def __init__(self) -> None:
""""""
self.db = db
self.db.connect()
self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview]) # hxxjava add DbLastTick,DbContractData
再为class MysqlDatabase添加下面两个函数:
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
vt_symbols = [t.vt_symbol for t in ticks]
# 删除ticks列表中包含合约的旧的tick记录
d: ModelDelete = DbLastTick.delete().where(
(DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
)
count = d.execute()
# print(f"delete {count} last ticks")
# 构造最新的ticks列表数据
data = []
for t in ticks:
tick:TickData = deepcopy(t) # hxxjava change
tick.datetime = tick.datetime
d = tick.__dict__
d["exchange"] = d["exchange"].value
d.pop("vt_symbol")
data.append(d)
# print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))
# 使用upsert操作将数据更新到数据库中
with self.db.atomic():
for c in chunked(data, 50):
DbLastTick.insert_many(c).on_conflict_replace().execute()
return True
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
try:
# 从DbLastTick查询符合条件的最新tick记录
s: ModelSelect = (
DbLastTick.select().where(
(DbLastTick.gateway_name == gateway_name)
& (exchange is None or DbLastTick.exchange == exchange.value)
& (symbol is None or DbLastTick.symbol == symbol)
).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
)
# 利用最新tick记录构造ticks列表
ticks: List[TickData] = []
for db_tick in s:
tick:TickData = TickData(
symbol=db_tick.symbol,
exchange=Exchange(db_tick.exchange),
datetime=to_china_tz(db_tick.datetime),
name=db_tick.name,
volume=db_tick.volume,
turnover=db_tick.turnover,
open_interest=db_tick.open_interest,
last_price=db_tick.last_price,
last_volume=db_tick.last_volume,
limit_up=db_tick.limit_up,
limit_down=db_tick.limit_down,
open_price=db_tick.open_price,
high_price=db_tick.high_price,
low_price=db_tick.low_price,
pre_close=db_tick.pre_close,
bid_price_1=db_tick.bid_price_1,
bid_price_2=db_tick.bid_price_2,
bid_price_3=db_tick.bid_price_3,
bid_price_4=db_tick.bid_price_4,
bid_price_5=db_tick.bid_price_5,
ask_price_1=db_tick.ask_price_1,
ask_price_2=db_tick.ask_price_2,
ask_price_3=db_tick.ask_price_3,
ask_price_4=db_tick.ask_price_4,
ask_price_5=db_tick.ask_price_5,
bid_volume_1=db_tick.bid_volume_1,
bid_volume_2=db_tick.bid_volume_2,
bid_volume_3=db_tick.bid_volume_3,
bid_volume_4=db_tick.bid_volume_4,
bid_volume_5=db_tick.bid_volume_5,
ask_volume_1=db_tick.ask_volume_1,
ask_volume_2=db_tick.ask_volume_2,
ask_volume_3=db_tick.ask_volume_3,
ask_volume_4=db_tick.ask_volume_4,
ask_volume_5=db_tick.ask_volume_5,
localtime=db_tick.localtime,
gateway_name=db_tick.gateway_name
)
ticks.append(tick)
return ticks
except:
# 当DbLastTick表不存在的时候,会发生错误
return []
在vnpy.usertools下创建tickfilter.py文件,其内容如下:
"""
本文件主要实现tick数据过滤器——TickFilter。
tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持
作者:hxxjava
日期:2022-06-16
修改日期: 修改原因:
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
EVENT_ORIGIN_TICK,
EVENT_AUCTION_TICK,
EVENT_TICK,
EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol
def left_alphas(instr:str):
"""
得到字符串左边的字符部分
"""
ret_str = ''
for s in instr:
if s.isalpha():
ret_str += s
else:
break
return ret_str
def get_vt_instrument(vt_symbol:str):
"""
从完整合约代码转换到完整品种代码
"""
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
return f"{instrument}.{exchange.value}"
class TickFilter():
""" tick数据过滤器 """
CHECK_INTERVAL:int = 5 # 更新到数据库间隔
def __init__(self,event_engine:EventEngine,gateway_name:str):
""" tick数据过滤器初始化 """
self.event_engine = event_engine
self.gateway_name = gateway_name
self.db = get_database()
# 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}
# 品种及合约状态字典 { vt_symbol : StatusData }
self.statuses:Dict[str,StatusData] = {}
self.second_cnt = 0
self.load_last_ticks()
self.register_event()
# print(f"TickFilter {gateway_name}")
def load_last_ticks(self):
"""
加载属于网关名称为self.gateway_name的最新tick列表
"""
last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
for tick in last_ticks:
self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)
# print(f"load {len(last_ticks)} last ticks")
def register_event(self):
""" 注册消息 """
self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
self.event_engine.register(EVENT_STATUS,self.process_status_event)
self.event_engine.register(EVENT_TIMER,self.check_last_ticks)
def process_tick_event(self,event:Event):
""" 对原始tick进行过滤 """
tick:TickData = event.data
# 检查tick合约的经验状态是否位有效交易状态
status:StatusData = self.statuses.get(tick.vt_symbol,None)
if not status:
vt_instrument = get_vt_instrument(tick.vt_symbol)
status = self.statuses.get(vt_instrument,None)
if not status:
# 未收到交易状态,返回
return
if status.instrument_status not in VALID_TRADE_STATUSES:
# 不在有效交易状态,返回
return
key = (tick.gateway_name,tick.vt_symbol)
_,oldtick = self.last_ticks.get(key,(None,None))
valid_tick = False
if not oldtick:
# 没有该合约的历史tick
self.last_ticks[key] = (True,tick)
valid_tick = True
elif tick.datetime > oldtick.datetime:
#
self.last_ticks[key] = (True,tick)
valid_tick = True
else:
print(f"【特别tick = {tick}】")
if valid_tick == True:
# 如果是有效的tick
if status.instrument_status != InstrumentStatus.CONTINOUS:
# 发送集合竞价tic消息到系统中
self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
else:
# 发送连续竞价tic消息到系统中
self.event_engine.put(Event(EVENT_TICK,tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
def process_status_event(self, event: Event):
""" 交易状态通知消息处理 """
status:StatusData = event.data
self.statuses[status.vt_symbol] = status
# print(f"【{status.gateway_name} {status}】")
def check_last_ticks(self,event:Event) -> None:
""" 原始tick过滤器 """
self.second_cnt += 1
if self.second_cnt % self.CHECK_INTERVAL == 0:
# 如果到了定时间隔
# 查询所有更新的tick
changed_ticks = []
for key,(update,tick) in self.last_ticks.items():
if update:
changed_ticks.append(tick)
self.last_ticks[key] = (False,tick)
if changed_ticks:
# 如果存在更新的tick,保存到数据库
t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
t.start()
# print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")
修改vnpy\trader\engine.py
from vnpy.usertools.tickfilter import TickFilter # hxxjava add
在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:
self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add
修改其add_gateway(),内容如下:
def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
"""
Add gateway.
"""
# Use default name if gateway_name not passed
if not gateway_name:
gateway_name = gateway_class.default_name
gateway = gateway_class(self.event_engine, gateway_name)
self.gateways[gateway_name] = gateway
# Add gateway supported exchanges into engine
for exchange in gateway.exchanges:
if exchange not in self.exchanges:
self.exchanges.append(exchange)
# add a tick data filter for the gateway # hxxjava add
if gateway_name not in self.tick_filters:
self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)
return gateway
只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
""" Constructor """
... ... # 其他代码省略
self.auction_tick:TickData = None
self.last_tick: TickData = None
def update_auction_tick(self,tick:TickData):
""" 更新集合竞价tick """
self.auction_tick = tick
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
if self.auction_tick:
# 合约集合竞价tick到当前tick
tick.high_price = max(tick.high_price,self.auction_tick.high_price)
tick.low_price = min(tick.low_price,self.auction_tick.low_price)
# 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
self.last_tick = deepcopy(self.auction_tick)
# 成交量和成交额每天从0开始单调递增
self.last_tick.volume = 0.0
self.last_tick.turnover = 0.0
# 用完集合竞价tick就丢弃
self.auction_tick = None
... ... # 其他代码省略
def on_auction_tick(self, tick: TickData):
"""
集合竞价tick处理
"""
self.bg.update_auction_tick(tick) # 假设self.bg是已经创建过的bar生成器
发布于VeighNa社区公众号【vnpy-community】
原文作者:VeighNa小助手 | 发布时间:2025-10-21
老规矩还是先放几张之前特训营的照片:

准备完毕,静候小班同学到达

学习量化,掌握核心理论框架

深入代码,分析策略逻辑细节

现场实践,剖析机器学习算法
基于之前学员的反馈,小班特训营这种2天10小时+的高强度课程通过线上直播学习的效果并不理想。为了保证更好的学习质量,我们对授课模式进行了调整:后续小班特训营不再提供线上直播参加和视频内容回看,而是改为同一主题的每场小班课都可以再次到场听讲。同时,我们会针对每一个特训营的主题建立专项社群,持续提供专业交流与学习服务,而不再只局限于三个月的答疑时间。
小班特训营优先面向买方投资机构。由于AI Agent(智能体)开发本身的复杂性(大模型知识、GPU算力需求、编程开发水平等),不建议新手报名。本场课程目前仅剩2个名额,感兴趣的同学请抓紧。
VeighNa量化AI智能体应用
日期:2025年11月29日(周六)和11月30日(周日)
时间:两天下午1点-6点,共计10小时
地点:上海浦东(具体地址会在报名成功后发送)
大纲:
1. 认识AI Agent和准备开发环境
a. 课程导览与AI Agent前景
i. 课程目标、内容结构与学习路径
ii. AI Agent在量化金融领域的应用前景
b. VeighNa Agent (vnag) 框架介绍
i. 项目定位、设计哲学与核心优势
ii. vnag的系统架构与模块概览
c. 开发环境搭建
i. 安装Python、vnag及相关依赖
ii. 配置IDE(如Cursor)
2. 连接大模型:vnag的Gateway模块
a. 大模型API核心概念
i. 主流AI服务商(OpenAI, Anthropic)API的特点与异同
ii. API Key管理与安全实践
b. vnag.gateway 抽象设计
i. 详解Gateway基类,理解统一接口的设计思想
ii. 实战:实现OpenaiGateway与AnthropicGateway
c. 流式数据输出(Streaming)
i. 流式输出在交互式应用中的重要性
ii. 在vnag中处理流式响应的核心代码解析
3. 构建Agent知识库:拆分与向量化
a. RAG与知识工程
i. RAG(Retrieval-Augmented Generation)的基本原理
ii. 文档、代码、FAQ等不同类型资料的处理策略
b. vnag.segmenter:智能文本拆分
i. 详解Segmenter基类与拆分器的作用
ii. 拆分器实战:CppSegmenter、PythonSegmenter、MarkdownSegmenter
c. vnag.vector:向量化存储与检索
i. 向量数据库基础(以ChromaDB为例)
ii. 详解Vector基类,实现数据入库与相似度搜索
iii. 动手实践:将拆分后的文本块向量化并存入数据库
4. Agent核心能力:Function Call与UI交互
a. 大模型的Function Call(函数调用)
i. Function Call的原理与应用场景
ii. 基于vnag.engine封装和调用Function Call的技巧
b. MCP:标准化的消息通信协议
i. 为何需要标准化的通信协议
ii. 上手开发一套基础MCP服务
c. vnag.ui:Qt图形前端解决方案
i. vnag的UI架构解析:Window、Widget与Worker
ii. UI与后端Agent的事件驱动与数据交互机制
d. 综合实战:构建一个简单的知识库问答机器人
5. 实战案例一:AI驱动的CTP API接口适配开发
a. 任务背景:量化交易接口升级的痛点
b. Agent任务设计与工作流
i. 知识库:新旧版本的CTP API头文件、官方文档
ii. 工具集(Function Call):文件读写、代码比对、代码生成
iii. 工作流:定位变更 -> 生成方案 -> 修改代码 -> 验证修正
c. Live Coding:可复用的全自动量化API接口升级开发智能体
6. 实战案例二:AI赋能的CTA策略自动化投研
a. 任务背景:从策略思想到回测报告的“最后一公里”
b. Agent任务设计与工作流
i. 知识库:CTA策略模板、API文档、优秀策略范例
ii. 工具集(Function Call):代码生成检查、回测引擎调用、报告分析
iii. 工作流:自然语言描述 -> 策略代码生成 -> 初步回测检验 -> 多轮参数优化 -> 编写投研报告
c. Live Coding:与AI Agent对话,从0到1完成完整CTA策略的投研
7. 实战案例三:AI驱动的股票因子挖掘
a. 任务背景:如何从海量信息中自动化挖掘有效Alpha因子
b. Agent任务设计与工作流
i. 知识库:指定的因子研究报告(PDF)或网页链接
ii. 工具集(Function Call):内容解析、vnpy.alpha调用、指标计算、数据入库
iii. 工作流:阅读资料 -> 生成公式 -> 检验计算 -> 分析结果 -> 循环迭代
c. Live Coding:给AI一篇券商研报,看它如何自动挖掘Alpha因子
价格:11999元
报名请扫描下方二维码添加小助手提供相关信息(想参加的课程、姓名、手机、公司、职位),报名结果以确认回复为准:

发布于VeighNa社区公众号【vnpy-community】
原文作者:VeighNa小助手 | 发布时间:2025-10-16
【社区活动尊享卡】的受欢迎程度大幅超出我们的预期,为了保证每场社区活动的交流质量,尊享卡已经变更为仅对部分专业交易员用户定向提供。对于参加活动较多的同学强烈推荐!购买请扫描二维码添加小助手咨询:

上一场社区活动中关于金融K线大模型 Kronos 的初步介绍,引发了社区同学们的广泛关注和积极讨论。
简单来说,Kronos 是一个专为金融市场设计的大模型,它的核心思想是将K线形态“词化”(Tokenize)——即视单根K线为“单词”,连续K线为“句子”,从而能够运用强大的Transformer架构来学习市场价格数据中的深层模式。根据论文中的基准测试结果,与LSTM等传统时序模型相比,其预测性能获得了显著提升。
本场活动中我们将从理论走向实践,深入探讨Kronos模型的实现与使用细节,并重点演示如何在 VeighNa 的CTA策略模块中,集成该模型以构建一套从数据准备、模型调用到策略回测的完整量化投研流程。
本场活动将于11月1日(周六)下午2:00至5:00在上海举办。普通报名仅支持线下参会,尊享卡持有者和Elite会员可通过线上直播参与。活动具体地址将在微信群中公布,请在报名成功后扫码加入社区活动群,以便获取相关信息!
初探Kronos金融K线大模型
a. 金融时序分析的目标:价格 vs 收益率
b. 传统时序模型(如LSTM等)的不足
c. Kronos针对价格序列的token构建方式
使用Kronos来预测价格波动
a. 从0搭建Kronos大模型运行环境
b. 梳理论文中的预测方法论细节
c. 正确理解Kronos模型的输出结果
d. 批量计算价格波动的预测概率分布
尝试在CTA策略中应用Kronos
a. 基于价格波动的预测结果构建趋势信号
b. 解决Kronos模型的调用性能开销问题
c. 期货市场的KronosStrategy绩效分析
d. 完整投研回测代码的实现细节探讨
闭门交流环节
时间:11月1日 14:00-17:00
地点:上海(具体地址后续在微信群中通知)
报名费:99元(Elite会员免费参加)
报名方式:扫描下方二维码报名(报名后请扫码加入社区活动微信群获取参会地址)

耗时2个多月和迅投团队配合测试对接,VeighNa框架的迅投研数据服务的接口vnpy_xt正式上线,支持股票、期货、期权、基金等历史量价数据的获取。
迅投为VeighNa社区提供了专属的试用申请链接:https://xuntou.net/#/signup?utm_source=vnpy
注册申请后即可获取14天的免费试用,目前数据流量上限较高,推荐有需要的同学不要错过!!!(在有效期内多下载一些数据)
整体使用流程如下:
使用过程中遇到任何问题可以通过社区论坛寻投研专区提问交流:https://www.vnpy.com/forum/forum/35-xun-tou-yan
我学Python的目的很明确,就是量化交易。从一开始就有关注vn.py,但我学的是Python3,那时vn.py还处于版本1.x时期,所以只能望vn.py兴叹。
vn.py 2.0出来之后我并没有及时注意,等反应过来已经是2.0.7版。很兴奋,认真研究,并将心得写成《vn.py 2.0.7源代码深入分析》,分享在vn.py社区的经验分享板块。
出于对量化交易的爱好,出于对Python在量化交易中作用的认同,一定程度受vn.py强大功能的鼓舞,我与同事合写了《Python量化交易从入门到实战》一书,对vn.py的讨论是其中很重要的一部分内容。
后续又写了《vn.py 2.1.4源代码深入分析》和《vn.py 2.2.0源代码深入分析》两个文档,感谢各位老师的认可。
vn.py 3.0.0版发布于2022-03-23,这是我一直期待的一个版本,所以它刚一推出,我就立刻开始试用,并着手整理《vn.py 3.0.0源代码深入分析》。夜以继日,终于在前天完成。先发到了书籍的资源群中,接受了两天批评,现分享到此处。
写作本文档的一个主要目的是对vn.py的开源精神做出一点支持,希望本文档能够对大家学习使用vn.py有所帮助。
百度网盘链接:https://pan.baidu.com/s/1cl2MA9hNFhHlxfHM0gGe2A
提取码:s7u6
发布于VeighNa社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2025-07-19
7月初发布了VeighNa的4.1.0版本,本次更新的主要内容是完成了绝大多数VeighNa开源社区版中的模块移植(接口、应用等),得益于Python 3.13带来的显著性能提升,强烈建议还在使用3.0大版本的用户升级,感受新一代版本带来的性能飞跃。
对于已经安装了VeighNa Studio 4.0版本的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载\VeighNa Studio-4.1.0**,体验一键安装的量化交易Python发行版,下载链接:
https://download.vnpy.com/veighna_studio-4.1.0.exe
关于Python 3.13具体的性能提升水平,社区里已经有许多同学讨论了,这里借着4.1.0发布的机会,对互联网上的公开资料做个整理。
熟悉Python发展历史的同学可能知道,自3.10版本以来,CPython官方团队在性能优化上投入了巨大的精力,几乎每个新版本都是一次“提速”。下面就来回顾一下这几个版本中和性能相关的核心改动,看看3.13版本对比3.10究竟快了多少。
Python 3.11:革命性的性能飞跃
Python 3.11是“Faster CPython”项目第一个取得丰硕成果的版本,其性能相较于3.10有巨大提升。根据官方文档,Python 3.11在标准基准测试套件上比3.10平均快了1.25倍。
主要改进包括:
a + b操作,如果a和b总是整数,解释器会使用专门处理整数加法的快速指令,大大提高了执行效率。Python 3.12延续了3.11的势头,在现有基础上进行了更多细致的优化。虽然不像3.11那样有革命性的飞跃,但它同样带来了稳固的性能增长。3.12的官方文档中没有提及具体的平均性能提升数字,但根据社区测试的结果,Intel平台上对比3.10的平均提升在5%。
主要改进包括:
Python 3.13继续沿着性能优化的道路前进,根据社区测试的结果,对比3.12的平均性能提升大约是5%(Intel平台)。
这一版本的改进主要集中在:
asyncio库进行了大量优化,根据基准测试,asyncio相关任务整体性能提升了\1.19倍**。**unpack_sequence)、生成器(generators)等都变得更快。综合来看,从Python 3.10到3.13,CPython的性能经历了持续且显著的增长。通过将各个版本的性能提升进行串联估算 (1.25 1.05 1.05),我们可以得出一个大致的结论:
对于VeighNa用户而言,这意味着策略回测、实盘交易中涉及的大量纯Python计算逻辑(例如信号计算、交易执行、投研分析等)都将运行得更快,从而降低延迟、提升策略执行效率。因此,我们强烈建议使用开源社区版的用户升级到基于Python 3.13的VeighNa 4.1.0版本,来享受这份免费的“性能午餐”。
新增
调整
修复