0.修改OrderData如下:
@dataclass
class OrderData(BaseData):
"""
Order data contains information for tracking lastest status
of a specific order.
"""
symbol: str
exchange: Exchange
orderid: str
type: OrderType = OrderType.LIMIT
direction: Direction = Direction.NET
offset: Offset = Offset.NONE
price: float = 0
volume: float = 0
traded: float = 0
status: Status = Status.SUBMITTING
datetime: datetime = None
cancel_time: str = ""
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}_{self.exchange.value}/{self.gateway_name}"
self.vt_orderid = f"{self.gateway_name}_{self.orderid}"
#未成交量
self.untrade = self.volume - self.traded
1.策略init初始化参数
#状态控制初始化
self.chase_long_trigger = False
self.chase_sell_trigger = False
self.chase_short_trigger = False
self.chase_cover_trigger = False
self.cancel_status = False
self.last_vt_orderid = ""
self.long_trade_volume = 0
self.short_trade_volume = 0
self.sell_trade_volume = 0
self.cover_trade_volume = 0
self.chase_interval = 10 #拆单间隔:秒
2.on_tick里面的代码如下
from vnpy.trader.object import TickData, BarData, TradeData, OrderData,Status
def __init__(self, strategy_engine: StrategyEngine, strategy_name: str,vt_symbols: List[str], setting: dict):
"""
"""
super().__init__(strategy_engine, strategy_name, vt_symbols, setting)
#撤单条件选择,默认使用超时撤单,为False使用突破价格范围撤单
self.cancel_timer_trigger = True
def on_tick(self, tick: TickData):
active_orders = self.get_position_detail(chase_vt_symbol).active_orders
vt_orderid = ""
if active_orders:
#委托完成状态
order_finished = False
self.last_vt_orderid = list(active_orders.items())[0][0] #委托单vt_orderid
active_order:OrderData = list(active_orders.items())[0][1] #委托单类
if self.cancel_timer_trigger:
#撤单触发条件,超时撤单
trigger_status = (raw_tick.datetime - active_order.datetime).total_seconds() > self.chase_interval
else:
price_tick = self.get_contract_detail(chase_vt_symbol).price_tick
#突破价格范围撤单
trigger_status = not active_order.price - price_tick * self.cancel_trigger_payup <= raw_tick.last_price <= active_order.price + price_tick * self.cancel_trigger_payup
#开平仓追单,部分交易没有平仓指令(Offset.NONE)
if active_order.offset in (Offset.NONE,Offset.OPEN):
if active_order.direction == Direction.LONG:
self.long_trade_volume = active_order.untrade
if trigger_status and self.long_trade_volume > 0 and (not self.chase_long_trigger) and self.last_vt_orderid:
#撤销之前发出的未成交订单
self.cancel_order(self.last_vt_orderid)
self.chase_long_trigger = True
elif active_order.direction == Direction.SHORT:
self.short_trade_volume = active_order.untrade
if trigger_status and self.short_trade_volume > 0 and (not self.chase_short_trigger) and self.last_vt_orderid:
self.cancel_order(self.last_vt_orderid)
self.chase_short_trigger = True
#平仓追单
elif active_order.offset in (Offset.CLOSE,Offset.CLOSETODAY,Offset.CLOSEYESTERDAY):
if active_order.direction == Direction.SHORT:
self.sell_trade_volume = active_order.untrade
if trigger_status and self.sell_trade_volume > 0 and (not self.chase_sell_trigger) and self.last_vt_orderid:
self.cancel_order(self.last_vt_orderid)
self.chase_sell_trigger = True
if active_order.direction == Direction.LONG:
self.cover_trade_volume = active_order.untrade
if trigger_status and self.cover_trade_volume > 0 and (not self.chase_cover_trigger) and self.last_vt_orderid:
self.cancel_order(self.last_vt_orderid)
self.chase_cover_trigger = True
else:
order_finished = True
self.cancel_status = False
#追单的委托单状态是正常的撤销状态则发出追单指令
if self.get_order(self.last_vt_orderid) and self.get_order(self.last_vt_orderid).status == Status.CANCELLED:
if self.chase_long_trigger:
if order_finished:
self.buy(chase_vt_symbol,raw_tick.ask_price_1,self.long_trade_volume)
self.long_trade_volume = 0
self.chase_long_trigger = False
else:
self.cancel_surplus_order(list(active_orders))
elif self.chase_short_trigger:
if order_finished:
self.short(chase_vt_symbol,raw_tick.bid_price_1,self.short_trade_volume)
self.short_trade_volume = 0
self.chase_short_trigger = False
else:
self.cancel_surplus_order(list(active_orders))
elif self.chase_sell_trigger:
if order_finished:
self.sell(chase_vt_symbol,raw_tick.bid_price_1,self.sell_trade_volume)
self.sell_trade_volume = 0
self.chase_sell_trigger = False
else:
self.cancel_surplus_order(list(active_orders))
elif self.chase_cover_trigger:
if order_finished:
self.cover(chase_vt_symbol,raw_tick.ask_price_1,self.cover_trade_volume)
self.cover_trade_volume = 0
self.chase_cover_trigger = False
else:
self.cancel_surplus_order(list(active_orders))
#------------------------------------------------------------------------------------
def cancel_surplus_order(self,orderids:list):
"""
撤销剩余活动委托单
"""
if not self.cancel_status:
for vt_orderid in orderids:
self.cancel_order(vt_orderid)
self.cancel_status = True
# template.py里面增加
#------------------------------------------------------------------
def get_order(self,vt_orderid:str) -> Union[OrderData,None]:
"""
通过vt_orderid获取委托单
"""
return self.cta_engine.get_order(vt_orderid)
# cta_strategy\engine.py里面增加
#------------------------------------------------------------------------------------
def get_order(self,vt_orderid:str) -> Union[OrderData,None]:
"""
通过vt_orderid获取委托单
"""
self.main_engine.get_order(vt_orderid)
耗时2个多月和迅投团队配合测试对接,VeighNa框架的迅投研数据服务的接口vnpy_xt正式上线,支持股票、期货、期权、基金等历史量价数据的获取。
迅投为VeighNa社区提供了专属的试用申请链接:https://xuntou.net/#/signup?utm_source=vnpy
注册申请后即可获取14天的免费试用,目前数据流量上限较高,推荐有需要的同学不要错过!!!(在有效期内多下载一些数据)
整体使用流程如下:
使用过程中遇到任何问题可以通过社区论坛寻投研专区提问交流:https://www.vnpy.com/forum/forum/35-xun-tou-yan
问题1:
同样的bar为什么在一个函数里是5分钟K线,又在另一个函数里是10分钟K线呢?
def on_5min_bar(self, bar: BarData):
am = self.am
am.update_bar(bar) 为什么这里的bar是5分钟呢?
def on_10min_bar(self, bar: BarData):
am = self.am
am.update_bar(bar) 为什么这里的bar是10分钟呢?
以下AI的回答:
这个问题涉及到金融市场的数据处理,尤其是在使用不同时间框架的K线数据进行交易策略开发时常见。在你提供的代码片段中,on_5min_bar和on_10min_bar函数分别处理不同时间周期的K线数据。原因在于
虽然他们的内部逻辑(如更新数据、进行计算等)可能相似或相同,但是这两个函数被设计用来响应特定的时间间隔的数据更新(一个是5分钟,一个是10分钟),所以会出你所说的情况。
如何区分5分钟和10分钟K线
触发机制: 这些函数通常由不同的触发事件调用。例如,在一个交易系统中,可能有一个计时器或监听器,
当5分钟K线数据更新时,触发on_5min_bar函数,此时bar就是5分钟K线;
当10分钟K线数据更新时,触发on_10min_bar函数,此时bar就是10分钟K线。
数据源: 每个函数接收的bar参数代表一个特定时间段的K线数据。这个数据通常是由市场数据提供商按照不同的时间间隔生成并发送的,例如每5分钟或每10分钟更新一次。
代码结构
函数定义: 你的函数on_5min_bar和on_10min_bar通过名称就已经隐含了它们处理的是不同时间框架的数据。
数据处理: 函数内部,调用am.update_bar(bar)实际上是将传入的bar数据(无论是5分钟还是10分钟的数据)更新到某个数据管理系统(如am)中。这里am可能是一个累积/聚合数据的类实例,用于进一步的数据分析或交易信号生成。
实际应用
在实际应用中,确保正确的数据被送到正确的处理函数是很重要的,通常需要在数据传入系统之前就对其进行正确的分类和调度。这样的设计允许策略开发者针对不同的数据粒度制定不同的交易逻辑和管理策略。
总结来说,每个函数处理的K线时间长度由触发这些函数的事件决定,这些事件依据K线数据的生成频率来区分。
另一回答:
在交易策略或回测系统中,不同的K线周期(如1分钟、5分钟、10分钟等)通常是通过不同的回调函数或处理函数来处理的。在您提供的代码片段中,on_5min_bar 和 on_10min_bar 函数分别处理5分钟和10分钟的K线数据。
这里的 bar 之所以在 on_5min_bar 函数中是5分钟K线,在 on_10min_bar 函数中是10分钟K线,是因为这些函数是分别设计用来处理相应周期的K线数据的。通常,这样的设计是通过某种机制(可能是内部的事件驱动或回调函数注册)来实现的,该机制在特定周期的K线数据准备好时调用相应的处理函数。
例如,在vn.py这样的量化交易框架中,您可能会设置一个BarGenerator来监听实时数据流或历史数据,并根据设置的周期(如5分钟或10分钟)来生成不同周期的K线数据。当5分钟K线完成时,on_5min_bar 函数会被调用,并传入这个5分钟的K线数据。同理,当10分钟K线完成时,on_10min_bar 函数会被调用,并传入这个10分钟的K线数据。
简单来说,不同的回调函数处理不同周期的K线数据,是因为它们在设计时就被指定用来处理特定周期的K线。在您的策略代码中,您应该已经定义了这些回调函数,并且将它们与相应的K线周期进行了关联。
请注意,这里的 bar 变量只是一个参数,它代表当前处理的K线数据。在不同的函数调用中,传入的 bar 参数值会根据上下文(即当前的K线周期)而有所不同。因此,尽管变量名都是 bar,但它们代表的数据内容和周期是不同的。
发布于VeighNa社区公众号【vnpy-community】
原文作者:VeighNa小助手 | 发布时间:2025-08-11
去年【社区活动尊享卡】的受欢迎程度大幅超出我们的预期,为了保证每场社区活动的交流质量,尊享卡已经变更为仅对部分专业交易员用户定向提供。对于参加活动较多的同学强烈推荐!购买请扫描二维码添加小助手咨询:
8月8日,五家期货交易所联合发布了《程序化交易管理办法》,新规将于2025年10月9日正式实施。 尽管尚有6个月的过渡期,但许多期货公司已提前行动,着手联系客户办理委托协议签署及程序化交易的报备事宜。 我们注意到近期VeighNa社区内关于此事的讨论与疑问也日益增多。 为此计划举办一场社区活动,将我们与期货公司的沟通要点进行梳理,为大家集中答疑解惑。
本次活动将于8月30日(周六)下午2:00至5:00在上海举办。普通报名仅支持线下参会,尊享卡持有者和Elite会员可通过线上直播参与。活动具体地址将在微信群中公布,请在报名成功后扫码加入社区活动群,以便获取相关信息!
监管公告文件解读
a. 两大公告文件
i. 证监会《期货市场程序化交易管理规定(试行)》
ii. 期货交易所《程序化交易管理办法》
b. 监管规定梳理
i. 程序化交易报告管理
ii. 外部系统接入管理
iii. 主机托管和席位管理
iv. 交易行为监控管理
穿透式认证测试细节
a. 穿透式认证中的核心概念
i. 关于AppID和AuthCode
ii. 什么是直连和中继接入
iii. API的测试和生产版本
b. 更加严格的认证测试流程
i. 完整测试流程步骤梳理
ii. 选择合适的测试硬件环境
iii. 交易环节所需的信息采集记录
iv. 测试表格中的填写细节
新规下的VeighNa适配
a. RiskEngine风控引擎扩展
i. 委托撤单相关
ii. 验资验券相关
iii. 交易行为相关
b. 策略开发中的注意事项
i. 优先选择简单化模式
ii. 细粒度挂撤单算法管理
iii. 交易时间段筛选控制
闭门交流环节
时间:8月30日 14:00-17:00
地点:上海(具体地址后续在微信群中通知)
报名费:99元(Elite会员免费参加)
报名方式:扫描下方二维码报名(报名后请扫码加入社区活动微信群获取参会地址)
发布于VeighNa社区公众号【vnpy-community】
原文作者:VeighNa小助手 | 发布时间:2025-07-23
特别说明:本文内容基于我们团队在特定时间点的研究和信息收集。由于技术迭代迅速,部分信息可能存在时效性或未能完全覆盖所有细节。我们非常欢迎并期待来自社区的反馈和讨论,共同完善认知。
AI 正在重塑软件开发的各个环节。当我们讨论 AI 编程时,已不再仅仅局限于代码补全。如今,我们选取了三款具有代表性的工具:Cursor、Gemini CLI 与 Claude Code,它们各自代表了不同的产品理念与技术路径。本文将对它们进行深入的对比分析,以帮助开发者理解其核心差异和独特优势。
产品特点:
产品特点:
产品特点:
生态支持:支持与 VS Code、JetBrains 等多种 IDE 深度集成,提供多层级配置体系。
功能 | Cursor | Gemini CLI | Claude Code |
---|---|---|---|
选区上下文共享 | 原生支持 | 通过选定文件 | 通过插件支持 |
标签页同步 | 完全集成 | 通过选定文件 | 通过插件支持 |
诊断信息共享 | 编辑器内置 | 纯命令行模式 | 通过插件支持 |
快捷键支持 | 多种操作快捷键 | 暂不支持 | 仅有启动快捷键 |
支持的 IDE | Cursor (VS Code) | 基础编辑器支持 | VS Code、JetBrains 系列 |
功能 | Cursor | Gemini CLI | Claude Code |
---|---|---|---|
项目级AI规则 | .cursorrules 或 .cursor/rules/ |
项目级GEMINI.md |
项目级CLAUDE.md |
用户级AI规则 | User Rules | 全局~/.gemini/GEMINI.md + save_memory 命令 |
全局~/.claude/CLAUDE.md + # 快速添加 |
规则执行效果 | 不够稳定 | 较为可靠 | 较为可靠 |
历史对话记录 | @past chats 当前对话中一键引用历史对话 | /chat save/resume 可恢复之前手动保存的对话 |
--resume 开启对话前可选择恢复自动保存的对话 |
理论对比完成后,还是要看看这三款工具在实际工作中的表现。接下来,我们将通过一个简单的 VeighNa 数据导出脚本开发任务,来对比三款工具在同一个编程任务中的生成结果与工作流程。
以下是统一使用的提示词:
作为一名精通VeighNa框架的Python开发专家,请提供一个可以直接运行的Python脚本。
核心任务:
通过调用 'get_database' 函数来创建数据库实例,进而批量导出所有K线数据。
具体要求:
1. 参考实现:
a. 'get_database' 的用法,请参考文件 'E:\github\vnpy\vnpy\trader\database.py'。
b. 数据查询和文件生成的逻辑,请参考当前 'vnpy_datamanager' 插件内的实现。
2. 本地优先:为避免使用网上过时或不兼容的代码,请完全基于本地已有的代码和您对VeighNa框架的理解来完成任务。
3. 运行时输出:脚本执行时,每处理一个数据集,都需要在控制台清晰地打印出其合约代码、时间频率和数据长度。
4. 代码质量:
a. 确保代码风格清晰、可读性好。
b. 优先使用条件判断(如 'if')处理可预见的流程,避免不必要的 'try-except' 语句。
import csv
from pathlib import Path
from vnpy.trader.database import get_database
from vnpy.trader.object import BarData
# 获取数据库实例
db = get_database()
# 获取所有K线数据的概览信息
overviews = db.get_bar_overview()
# 检查导出的数据存放路径
data_path = Path("output_data")
if not data_path.exists():
data_path.mkdir()
# 遍历所有K线数据
for overview in overviews:
# 加载特定K线的全部历史数据
bars: list[BarData] = db.load_bar_data(
symbol=overview.symbol,
exchange=overview.exchange,
interval=overview.interval,
start=overview.start,
end=overview.end
)
# 在控制台打印信息
print(f"合约代码:{overview.symbol}")
print(f"时间频率:{overview.interval.value}")
print(f"数据长度:{len(bars)}")
print("-" * 30)
# 将数据写入到CSV文件中
fieldnames: list = [
"symbol",
"exchange",
"datetime",
"open",
"high",
"low",
"close",
"volume",
"turnover",
"open_interest"
]
filename = (
f"{overview.symbol}_"
f"{overview.exchange.value}_"
f"{overview.interval.value}.csv"
)
file_path: Path = data_path.joinpath(filename)
with open(file_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for bar in bars:
d: dict = {
"symbol": bar.symbol,
"exchange": bar.exchange.value,
"datetime": bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"open": bar.open_price,
"high": bar.high_price,
"low": bar.low_price,
"close": bar.close_price,
"volume": bar.volume,
"turnover": bar.turnover,
"open_interest": bar.open_interest,
}
writer.writerow(d)
从上图可以看出,Cursor 作为一款 AI 原生 IDE,其优势体现在与编码过程的深度融合上:
@
符号引用代码片段或历史对话,上下文控制灵活直观。# export_all_data.py
import os
import csv
from datetime import datetime
from typing import List
from vnpy.trader.database import get_database, BarOverview
from vnpy.trader.object import BarData
from vnpy.trader.constant import Exchange, Interval
# 创建用于存放导出数据的文件夹
output_folder = "kline_data_exported"
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 获取数据库实例
# 该函数会自动读取全局配置(通常是 settings.json)来连接数据库
database = get_database()
# 获取所有K线数据的概要信息
overviews: List[BarOverview] = database.get_bar_overview()
print(f"发现 {len(overviews)} 个K线数据集,开始处理...")
print("-" * 60)
# 遍历所有数据集
for overview in overviews:
symbol: str = overview.symbol
exchange: Exchange = overview.exchange
interval: Interval = overview.interval
start: datetime = overview.start
end: datetime = overview.end
# 从数据库加载该合约的全部K线数据
bars: List[BarData] = database.load_bar_data(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
data_length = len(bars)
# 检查是否有数据
if not data_length:
print(f"合约: {symbol}, 频率: {interval.value} -> 数据长度为 0, 跳过")
continue
# 打印处理信息
print(f"合约: {symbol}, 频率: {interval.value} -> 正在导出 {data_length} 条数据...")
# 构建CSV文件名和路径
file_name = f"{symbol}_{exchange.value}_{interval.value}.csv"
file_path = os.path.join(output_folder, file_name)
# 定义CSV表头
fieldnames: list = [
"symbol",
"exchange",
"datetime",
"interval",
"open",
"high",
"low",
"close",
"volume",
"turnover",
"open_interest"
]
# 将数据写入CSV文件
with open(file_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for bar in bars:
row_data = {
"symbol": bar.symbol,
"exchange": bar.exchange.value,
"datetime": bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"interval": bar.interval.value,
"open": bar.open_price,
"high": bar.high_price,
"low": bar.low_price,
"close": bar.close_price,
"volume": bar.volume,
"turnover": bar.turnover,
"open_interest": bar.open_interest,
}
writer.writerow(row_data)
print("-" * 60)
print(f"所有数据导出完成,文件已保存至 '{output_folder}' 文件夹。")
从上图可以看出,Gemini CLI 体现了其作为命令行工具的独特工作流:
#!/usr/bin/env python3
"""
批量导出所有K线数据的Python脚本
基于VeighNa框架的vnpy_datamanager插件实现
"""
import csv
import os
from datetime import datetime
from pathlib import Path
from vnpy.trader.database import BaseDatabase, get_database, BarOverview
from vnpy.trader.constant import Interval, Exchange
from vnpy.trader.object import BarData
def create_export_directory() -> Path:
"""创建导出目录"""
export_dir = Path("exported_kline_data")
export_dir.mkdir(exist_ok=True)
return export_dir
def generate_filename(symbol: str, exchange: Exchange, interval: Interval) -> str:
"""生成导出文件名"""
interval_name_map = {
Interval.MINUTE: "1min",
Interval.HOUR: "1hour",
Interval.DAILY: "1day"
}
interval_suffix = interval_name_map.get(interval, interval.value)
return f"{symbol}_{exchange.value}_{interval_suffix}.csv"
def export_bar_data_to_csv(
database: BaseDatabase,
file_path: str,
symbol: str,
exchange: Exchange,
interval: Interval,
start: datetime,
end: datetime
) -> bool:
"""导出K线数据到CSV文件"""
bars = database.load_bar_data(symbol, exchange, interval, start, end)
if not bars:
print(f" 警告: {symbol}.{exchange.value} {interval.value} 无数据")
return False
fieldnames = [
"symbol",
"exchange",
"datetime",
"open",
"high",
"low",
"close",
"volume",
"turnover",
"open_interest"
]
with open(file_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for bar in bars:
data_row = {
"symbol": bar.symbol,
"exchange": bar.exchange.value,
"datetime": bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"open": bar.open_price,
"high": bar.high_price,
"low": bar.low_price,
"close": bar.close_price,
"volume": bar.volume,
"turnover": bar.turnover,
"open_interest": bar.open_interest,
}
writer.writerow(data_row)
return True
def main():
"""主函数:批量导出所有K线数据"""
print("=== VeighNa K线数据批量导出工具 ===")
print("正在初始化数据库连接...")
# 获取数据库实例
database: BaseDatabase = get_database()
# 创建导出目录
export_dir = create_export_directory()
print(f"导出目录: {export_dir.absolute()}")
# 获取所有K线数据概览
print("正在获取数据库中的K线数据概览...")
overviews: list[BarOverview] = database.get_bar_overview()
if not overviews:
print("数据库中没有找到任何K线数据")
return
print(f"发现 {len(overviews)} 个数据集")
print("-" * 80)
success_count = 0
failed_count = 0
# 遍历所有数据集进行导出
for i, overview in enumerate(overviews, 1):
# 生成文件名和完整路径
filename = generate_filename(overview.symbol, overview.exchange, overview.interval)
file_path = export_dir / filename
# 输出处理信息
print(f"[{i:3d}/{len(overviews)}] 正在处理: {overview.symbol}.{overview.exchange.value} | "
f"频率: {overview.interval.value} | 数据量: {overview.count:,} 条")
print(f" 时间范围: {overview.start.strftime('%Y-%m-%d %H:%M:%S')} "
f"至 {overview.end.strftime('%Y-%m-%d %H:%M:%S')}")
# 导出数据
success = export_bar_data_to_csv(
database,
str(file_path),
overview.symbol,
overview.exchange,
overview.interval,
overview.start,
overview.end
)
if success:
file_size = os.path.getsize(file_path)
print(f" ✓ 导出成功: {filename} ({file_size:,} 字节)")
success_count += 1
else:
print(f" ✗ 导出失败: {filename}")
failed_count += 1
print()
# 输出汇总信息
print("=" * 80)
print("导出完成!")
print(f"成功导出: {success_count} 个文件")
print(f"导出失败: {failed_count} 个文件")
print(f"导出位置: {export_dir.absolute()}")
if success_count > 0:
print("\n导出的文件列表:")
for file_path in sorted(export_dir.glob("*.csv")):
file_size = os.path.getsize(file_path)
print(f" - {file_path.name} ({file_size:,} 字节)")
if __name__ == "__main__":
main()
从输出过程可以看出,Claude Code 强调的是结构化的任务管理和丰富的工具生态:
TodoWrite
工具将任务分解为清晰的步骤并实时展示进度,让用户对AI的工作状态一目了然。多模态辅助:支持便捷的图片输入,可以直接分析截图中的UI或错误信息,拓宽了交互的维度。
经过前面的详细对比和实战测试,我们对这三款工具有了全面的认识。它们各有千秋,而选择的关键在于开发者的具体需求和工作流偏好。
工具 | 核心优势 |
---|---|
Cursor | AI与IDE的无缝融合体验,强大的实时编码辅助。 |
Gemini CLI | 卓越的长上下文处理能力,高度可定制的开源框架。 |
Claude Code | 完善的工具链生态与结构化任务管理,深度集成主流IDE。 |
Cursor:
Gemini CLI:
Claude Code:
工具 | 成本水平 | 免费额度 |
---|---|---|
Cursor | 中等 | 提供两周免费试用 |
Gemini CLI | 低 | 包含免费层级的 API 调用,新用户可叠加 GCP 试用赠金 |
Claude Code | 高 | 无免费版,需订阅 Pro/Max |
三款工具各有其独特优势,选择应基于具体需求。随着工具的不断发展,未来的功能差距可能会缩小。每款工具都有其独特优势和适用场景,Cursor 更适合需要通过 IDE 实时编写代码的用户,Gemini CLI 适合对长上下文感知和项目安全有高要求的用户,Claude Code 则在 IDE 深度集成和丰富工具链方面表现突出。在实践中,建议开发者还是要根据自己的具体需求和工作方式选择合适的工具,或者考虑组合使用,以达到最佳的开发效率。
一、提前下载需要的安装包:
1、Miniconda3
https://docs.conda.io/en/latest/miniconda.html#
选择MacOSX installers里的最新版本,这里是Python 3.9下载。
2、pycharm
pycharm-community-2020.3.3.dmg
从官网上下载社区版https://www.jetbrains.com/pycharm/
3、vnpy安装包(解压后,复制文件夹到自己喜欢的位置)
从vnpy在gitee的官方地址下载最新的安装包,采用zip格式下载。
https://gitee.com/vnpy/vnpy
二、安装
1、安装Miniconda,这里是Miniconda3-latest-MacOSX-x86_64.pkg
2、添加国内源:
添加国内源:在当前用户下,编辑.condarc,内容如下:
channels:
3、创建虚拟环境
conda create -n py37_vnpy python=3.7
conda activate py37_vnpy
(退出:conda deactivate)
4、安装python.app
conda install -c conda-forge python.app
可能会因为网络问题不成功,多试几次。
5、安装pycharm-community-2020.3.3.dmg
从官网上下载社区版https://www.jetbrains.com/pycharm/
7、打开vnpy所在的文件夹,进行配置
点击‘PyCharm’菜单->Preferences菜单->Project:vnpy一级菜单->Python Interpreter二级菜单->点击右上齿轮->Add菜单->Conda Environment->Existing enviroment->Interpreter:/opy/miniconda3/envs/py37_vnpy/bin/pythonw(选择前面新建的虚拟环境的pythonw)->点OK->点OK->点OK
8、(确认在PyCharm里已经打开了vnpy项目),在PyCharm的底部,找到Terminal的标签,点击,进入py37_vnpy环境的终端,并且当前路径位于vnpy项目的文件夹。
执行以下的安装语句(requirements.txt是vnpy项目文件夹下面的一个文件),这个安装时间比较长,需要较好的网络。
pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
9、创建run.py文件,复制以下代码,来源 README.md
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_backtester import CtaBacktesterApp
def main():
"""Start VN Trader"""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
# main_engine.add_gateway(CtpGateway)
main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if name == "main":
main()
10、运行 python run.py,注意环境名称是 py37_vnpy
我学Python的目的很明确,就是量化交易。从一开始就有关注vn.py,但我学的是Python3,那时vn.py还处于版本1.x时期,所以只能望vn.py兴叹。
vn.py 2.0出来之后我并没有及时注意,等反应过来已经是2.0.7版。很兴奋,认真研究,并将心得写成《vn.py 2.0.7源代码深入分析》,分享在vn.py社区的经验分享板块。
出于对量化交易的爱好,出于对Python在量化交易中作用的认同,一定程度受vn.py强大功能的鼓舞,我与同事合写了《Python量化交易从入门到实战》一书,对vn.py的讨论是其中很重要的一部分内容。
后续又写了《vn.py 2.1.4源代码深入分析》和《vn.py 2.2.0源代码深入分析》两个文档,感谢各位老师的认可。
vn.py 3.0.0版发布于2022-03-23,这是我一直期待的一个版本,所以它刚一推出,我就立刻开始试用,并着手整理《vn.py 3.0.0源代码深入分析》。夜以继日,终于在前天完成。先发到了书籍的资源群中,接受了两天批评,现分享到此处。
写作本文档的一个主要目的是对vn.py的开源精神做出一点支持,希望本文档能够对大家学习使用vn.py有所帮助。
百度网盘链接:https://pan.baidu.com/s/1cl2MA9hNFhHlxfHM0gGe2A
提取码:s7u6
入门选手,参考了很多论坛和知乎的官方教程以及解决方案,在MacOS M1 Anaconda虚拟环境下安装成功 VeighNa 3.7.0 。具体功能还未探索,仅限于打开UI界面...
主要参考资料:https://zhuanlan.zhihu.com/p/608546190 和 https://www.vnpy.com/forum/topic/31679-macos-yin-te-er-xin-pian-lao-ban-an-zhuang-veighna-3-6-0-ban-ben-shi-jian-bi-ji
创建新环境,选择python3.10
下载vnpy3.7.0 (download zip并解压):https://github.com/vnpy/vnpy
terminal打开Anaconda vnpy 环境:
conda activate vnpy
安装TA-Lib和NumPy:
brew install ta-lib
pip install ta-lib==0.4.24
pip install numpy
安装vnpy和相关配置:
pip install vnpy
pip install vnpy_ctastrategy vnpy_ctabacktester vnpy_datamanager vnpy_sqlite vnpy_rqdata
相关配置可以根据需求调整上述只为例子包括CTA策略实盘和回测模块,历史数据管理模块,SQLite数据库驱动,RQData数据服务适配器
pip install vnpy_ctp
安装过程中如果出现报错某些依赖库的缺失,可以尝试pip install该依赖库打开run.py所在目录/路径:
cd 具体路径, 如cd /Users/name/Downloads/vnpy-master/vnpy
运行run.py:
python3 run.py
pip install PySide6==6.3.0
上述方法均是本人参考后使用的解决方案,以及中间不小心关了terminal,凭回忆记录了一部分过程,不保证百分百正确,具体情况也可能不一样,仅供参考。
发布于VeighNa社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2025-07-19
7月初发布了VeighNa的4.1.0版本,本次更新的主要内容是完成了绝大多数VeighNa开源社区版中的模块移植(接口、应用等),得益于Python 3.13带来的显著性能提升,强烈建议还在使用3.0大版本的用户升级,感受新一代版本带来的性能飞跃。
对于已经安装了VeighNa Studio 4.0版本的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载\VeighNa Studio-4.1.0**,体验一键安装的量化交易Python发行版,下载链接:
https://download.vnpy.com/veighna_studio-4.1.0.exe
关于Python 3.13具体的性能提升水平,社区里已经有许多同学讨论了,这里借着4.1.0发布的机会,对互联网上的公开资料做个整理。
熟悉Python发展历史的同学可能知道,自3.10版本以来,CPython官方团队在性能优化上投入了巨大的精力,几乎每个新版本都是一次“提速”。下面就来回顾一下这几个版本中和性能相关的核心改动,看看3.13版本对比3.10究竟快了多少。
Python 3.11:革命性的性能飞跃
Python 3.11是“Faster CPython”项目第一个取得丰硕成果的版本,其性能相较于3.10有巨大提升。根据官方文档,Python 3.11在标准基准测试套件上比3.10平均快了1.25倍。
主要改进包括:
a + b
操作,如果a
和b
总是整数,解释器会使用专门处理整数加法的快速指令,大大提高了执行效率。Python 3.12延续了3.11的势头,在现有基础上进行了更多细致的优化。虽然不像3.11那样有革命性的飞跃,但它同样带来了稳固的性能增长。3.12的官方文档中没有提及具体的平均性能提升数字,但根据社区测试的结果,Intel平台上对比3.10的平均提升在5%。
主要改进包括:
Python 3.13继续沿着性能优化的道路前进,根据社区测试的结果,对比3.12的平均性能提升大约是5%(Intel平台)。
这一版本的改进主要集中在:
asyncio
库进行了大量优化,根据基准测试,asyncio
相关任务整体性能提升了\1.19倍**。**unpack_sequence
)、生成器(generators
)等都变得更快。综合来看,从Python 3.10到3.13,CPython的性能经历了持续且显著的增长。通过将各个版本的性能提升进行串联估算 (1.25 1.05 1.05),我们可以得出一个大致的结论:
对于VeighNa用户而言,这意味着策略回测、实盘交易中涉及的大量纯Python计算逻辑(例如信号计算、交易执行、投研分析等)都将运行得更快,从而降低延迟、提升策略执行效率。因此,我们强烈建议使用开源社区版的用户升级到基于Python 3.13的VeighNa 4.1.0版本,来享受这份免费的“性能午餐”。
新增
调整
修复
发布于VeighNa社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2025-03-15
在量化交易领域,高质量的行情数据是策略研发和回测的基础。近期在VeighNa社区中,经常有用户咨询关于高频Tick数据录制的问题。
传统的SQLite等关系型数据库在面对大量合约同时录制的场景时,往往因写入性能瓶颈导致丢包,影响数据完整性。
为了解决以上难题,本文中将介绍如何利用VeighNa量化平台结合TDengine时序数据库,实现高效的行情数据录制。
对于大多数VeighNa用户,推荐使用TDengine 3.0版本的Docker容器镜像来安装配置时序数据库的服务端程序。
首先需要确保操作系统中已经安装好了Docker,并将其设置为随操作系统启动自动运行。使用Windows系统的同学可以参考这篇文章中的Docker Desktop安装教程:https://mp.weixin.qq.com/s/m3whdCs6jRs-Ye3Ip5oZmw
随后使用命令拉取TDengine容器镜像:
docker pull tdengine/tdengine:latest
拉取完成后,通过下述命令来启动数据库服务容器:
docker run -d `
-v C:/my_tdengine/data:/var/lib/taos `
-v C:/my_tdengine/log:/var/log/taos `
-p 6030:6030 `
-p 6041:6041 `
-p 6043:6043 `
-p 6044-6049:6044-6049 `
-p 6044-6045:6044-6045/udp `
-p 6060:6060 `
--restart=always `
tdengine/tdengine
注意,以上PowerShell多行命令的每行结尾使用了反引号(键盘Tab上方按键)作为换行符。将上述命令复制到PowerShell窗口中运行,等待几秒后容器启动成功会输出一段较长的随机字符串(容器编号)。
命令中的具体参数(以-或者--开头)说明如下:
-v:用于挂载Windows系统下的指定文件夹到容器中,为容器中运行的TDengine程序提供数据存储输出,可以根据自己的需求修改:
* /var/lib/taos,对应的是TDengine的数据存储路径
* /var/log/taos,对应的是TDengine的日志输出路径。
-p:用于将Windows系统的端口映射绑定到容器中对应的端口,为外部程序提供数据库访问,这些端口参数建议保持不变:
* 6030,主要用于应用程序(如VeighNa)连接TDengine;
* 6041,提供数据库管理终端(如DBeaver)连接TDengine;
* 其他端口提供更加进阶的功能,这里可以忽略。
--restart:用于设置容器的重启策略,always代表每次Docker Desktop启动后都立即启动TDengine容器,结合之前设置的Docker Desktop开机自动启动即可实现时序数据库的后台服务式运行。
安装配置好TDengine后,只需运行一个简单的Python脚本,就能开始高效录制行情数据。
首先,需要导入相关的Python标准库和VeighNa框架组件:
# 加载Python标准库
from logging import INFO
from time import sleep
# 加载VeighNa核心框架
from vnpy.event import EventEngine, Event
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine, LogEngine
from vnpy.trader.object import ContractData
from vnpy.trader.constant import Exchange, Product
from vnpy.trader.event import EVENT_CONTRACT
# 加载VeighNa插件模块
from vnpy_ctp import CtpGateway
from vnpy_datarecorder import DataRecorderApp, RecorderEngine
from vnpy_datarecorder.engine import EVENT_RECORDER_LOG
这部分代码导入了程序运行所需的各种模块:
日志对于监控系统运行状态和排查问题至关重要:
# 开启日志记录功能
SETTINGS["log.active"] = True # 激活日志功能
SETTINGS["log.level"] = INFO # 设置日志级别为INFO,输出详细信息
SETTINGS["log.console"] = True # 在控制台显示日志,方便实时查看
这段代码配置了VeighNa的日志系统,设置为INFO级别并在控制台显示,便于实时监控数据录制过程中的各种事件和可能出现的问题。
接下来,配置连接到期货市场所需的CTP接口信息:
# CTP接口登录信息
# 以下使用的是SimNow模拟账户信息,初学者可以在SimNow官网申请
ctp_setting: dict[str, str] = {
"用户名": "demo", # SimNow账户名
"密码": "123456", # SimNow密码
"经纪商代码": "9999", # SimNow经纪商代码固定为9999
"交易服务器": "180.168.146.187:10201", # SimNow交易服务器地址和端口
"行情服务器": "180.168.146.187:10211", # SimNow行情服务器地址和端口
"产品名称": "simnow_client_test", # 产品名称,用于区分不同的客户端
"授权编码": "0000000000000000" # 授权编码,SimNow模拟账户使用默认值即可
}
这里使用的是SimNow仿真账户(记得替换为你的用户名和密码),在实盘环境中需要替换为实际的期货账户信息。
为了灵活控制数据录制的范围,脚本允许指定要录制的交易所和品种类型:
# 要录制数据的交易所列表
recording_exchanges: list[Exchange] = [
Exchange.CFFEX, # 中国金融期货交易所
# Exchange.SHFE, # 上海期货交易所
# Exchange.DCE, # 大连商品交易所
# Exchange.CZCE, # 郑州商品交易所
# Exchange.GFEX, # 广州期货交易所
# Exchange.INE, # 上海国际能源交易中心
]
# 要录制数据的品种类型
recording_products: list[Product] = [
Product.FUTURES, # 期货品种
# Product.OPTION, # 期权品种
]
这段代码定义了两个列表,分别指定要录制数据的交易所和品种类型。默认只录制中国金融期货交易所的期货品种,用户可以根据需要取消注释来添加更多交易所和品种。
脚本的核心是run_recorder
函数,它负责初始化系统组件并启动数据录制:
def run_recorder() -> None:
"""
运行行情录制程序
该函数是程序的主体,按照以下步骤工作:
1. 创建VeighNa核心组件(事件引擎、主引擎)
2. 添加交易接口和应用模块
3. 设置数据录制规则
4. 连接到交易所并开始录制数据
"""
# 创建事件引擎,负责系统内各模块间的通信
event_engine: EventEngine = EventEngine()
# 创建主引擎,管理系统功能模块,包括底层接口、上层应用等
main_engine: MainEngine = MainEngine(event_engine)
# 添加CTP接口,连接到期货市场
main_engine.add_gateway(CtpGateway)
# 添加数据录制引擎,用于录制Tick行情入库
recorder_engine: RecorderEngine = main_engine.add_app(DataRecorderApp)
这部分代码初始化了VeighNa的核心组件:
接下来,定义了一个合约订阅函数,用于自动订阅符合条件的合约行情:
# 定义合约订阅函数
def subscribe_data(event: Event) -> None:
"""
处理合约推送并订阅行情
当系统接收到合约信息后,根据预设的交易所和品种过滤条件,
自动为符合条件的合约添加行情录制任务。
参数:
event: 包含合约信息的事件对象
"""
# 从事件对象中获取合约数据
contract: ContractData = event.data
# 判断合约是否符合录制条件
if (
contract.exchange in recording_exchanges # 检查合约所属交易所是否在预设列表中
and contract.product in recording_products # 检查合约品种类型是否在预设列表中
):
# 添加该合约的行情录制任务,vt_symbol是VeighNa中的唯一标识符,格式为"代码.交易所"
recorder_engine.add_tick_recording(contract.vt_symbol)
# 注册合约事件处理函数,当有新合约信息推送时,会自动调用subscribe_data函数
event_engine.register(EVENT_CONTRACT, subscribe_data)
这里基于VeighNa平台核心的事件驱动架构,实现了一套自动化的合约订阅机制:
EVENT_CONTRACT
事件subscribe_data
函数处理该事件该机制使得系统能够自动识别和订阅符合条件的合约,无需手动指定每个合约代码。
为了方便监控DataRecorder数据录制模块的内部运行状态,脚本还定义了专门的日志输出函数:
# 获取日志引擎并设置日志处理
log_engine: LogEngine = main_engine.get_engine("log")
def print_log(event: Event) -> None:
"""
处理数据录制模块的日志事件
将数据录制模块产生的日志信息输出到控制台和日志文件中,
便于监控录制过程和排查问题。
参数:
event: 包含日志信息的事件对象
"""
log_engine.logger.log(INFO, event.data)
# 注册日志事件处理函数,当有新的日志推送时,会自动调用print_log函数
event_engine.register(EVENT_RECORDER_LOG, print_log)
注意前面对于SETTINGS全局配置字典的修改,仅影响底层接口和核心引擎部分的日志记录,而这里的DataRecorder模块属于上层应用,需要注册额外的日志事件处理函数后才能实现输出。
之后就可以连接登录CTP接口并开始录制数据:
# 连接CTP接口并登录,第一个参数是接口设置,第二个参数是接口名称
main_engine.connect(ctp_setting, CtpGateway.default_name)
# 等待30秒,CTP接口连接后需要一段时间来完成初始化
sleep(30)
# 提示用户程序已经开始运行,用户可以根据需要随时退出
input(">>>>>> 高频行情数据录制已启动,正在记录数据。按回车键退出程序 <<<<<<")
# 关闭主引擎实现安全退出,避免出现内存中未入库数据的丢失
main_engine.close()
这部分代码完成了以下操作:
最后是Python程序的标准入口:
# Python程序的标准入口写法,直接运行此脚本时会执行run_recorder函数
if __name__ == "__main__":
run_recorder()
当直接运行该脚本时,会执行run_recorder
函数开始数据录制。
通过替换脚本中加载的底层接口模块和连接登录配置,就可以很方便的实现不同市场的高频Tick数据录制。
除了TDengine外,同样也可以使用其他VeighNa支持的高性能数据库(如DolphinDB等)作为后端的数据存储服务。
在文章结尾,附上完整的tick_recorder.py脚本源代码:
"""
该程序使用VeighNa框架通过CTP接口连接到期货市场,并自动录制指定交易所和品种的行情数据。
适合初学者了解VeighNa框架的基本用法和数据录制流程。
"""
# 加载Python标准库
from logging import INFO
from time import sleep
# 加载VeighNa核心框架
from vnpy.event import EventEngine, Event
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine, LogEngine
from vnpy.trader.object import ContractData
from vnpy.trader.constant import Exchange, Product
from vnpy.trader.event import EVENT_CONTRACT
# 加载VeighNa插件模块
from vnpy_ctp import CtpGateway
from vnpy_datarecorder import DataRecorderApp, RecorderEngine
from vnpy_datarecorder.engine import EVENT_RECORDER_LOG
# 开启日志记录功能
# 日志对于排查问题和监控系统运行状态非常重要
SETTINGS["log.active"] = True # 激活日志功能
SETTINGS["log.level"] = INFO # 设置日志级别为INFO,输出详细信息
SETTINGS["log.console"] = True # 在控制台显示日志,方便实时查看
# CTP接口登录信息
# 以下使用的是SimNow模拟账户信息,初学者可以在SimNow官网申请
ctp_setting: dict[str, str] = {
"用户名": "demo", # SimNow账户名
"密码": "Vnpy@123456", # SimNow密码
"经纪商代码": "9999", # SimNow经纪商代码固定为9999
"交易服务器": "180.168.146.187:10201", # SimNow交易服务器地址和端口
"行情服务器": "180.168.146.187:10211", # SimNow行情服务器地址和端口
"产品名称": "simnow_client_test", # 产品名称,用于区分不同的客户端
"授权编码": "0000000000000000" # 授权编码,SimNow模拟账户使用默认值即可
}
# 要录制数据的交易所列表
# 可以根据需要取消注释来添加更多交易所
recording_exchanges: list[Exchange] = [
Exchange.CFFEX, # 中国金融期货交易所
# Exchange.SHFE, # 上海期货交易所
# Exchange.DCE, # 大连商品交易所
# Exchange.CZCE, # 郑州商品交易所
# Exchange.GFEX, # 广州期货交易所
# Exchange.INE, # 上海国际能源交易中心
]
# 要录制数据的品种类型
# 可以根据需要取消注释来添加更多品种
recording_products: list[Product] = [
Product.FUTURES, # 期货品种
# Product.OPTION, # 期权品种
]
def run_recorder() -> None:
"""
运行行情录制程序
该函数是程序的主体,按照以下步骤工作:
1. 创建VeighNa核心组件(事件引擎、主引擎)
2. 添加交易接口和应用模块
3. 设置数据录制规则
4. 连接到交易所并开始录制数据
"""
# 创建事件引擎,负责系统内各模块间的通信
event_engine: EventEngine = EventEngine()
# 创建主引擎,管理系统功能模块,包括底层接口、上层应用等
main_engine: MainEngine = MainEngine(event_engine)
# 添加CTP接口,连接到期货市场
main_engine.add_gateway(CtpGateway)
# 添加数据录制引擎,用于录制Tick行情入库
recorder_engine: RecorderEngine = main_engine.add_app(DataRecorderApp)
# 定义合约订阅函数
def subscribe_data(event: Event) -> None:
"""
处理合约推送并订阅行情
当系统接收到合约信息后,根据预设的交易所和品种过滤条件,
自动为符合条件的合约添加行情录制任务。
参数:
event: 包含合约信息的事件对象
"""
# 从事件对象中获取合约数据
contract: ContractData = event.data
# 判断合约是否符合录制条件
if (
contract.exchange in recording_exchanges # 检查合约所属交易所是否在预设列表中
and contract.product in recording_products # 检查合约品种类型是否在预设列表中
):
# 添加该合约的行情录制任务,vt_symbol是VeighNa中的唯一标识符,格式为"代码.交易所"
recorder_engine.add_tick_recording(contract.vt_symbol)
# 注册合约事件处理函数,当有新合约信息推送时,会自动调用subscribe_data函数
event_engine.register(EVENT_CONTRACT, subscribe_data)
# 获取日志引擎并设置日志处理
log_engine: LogEngine = main_engine.get_engine("log")
def print_log(event: Event) -> None:
"""
处理数据录制模块的日志事件
将数据录制模块产生的日志信息输出到控制台和日志文件中,
便于监控录制过程和排查问题。
参数:
event: 包含日志信息的事件对象
"""
log_engine.logger.log(INFO, event.data)
# 注册日志事件处理函数,当有新的日志推送时,会自动调用print_log函数
event_engine.register(EVENT_RECORDER_LOG, print_log)
# 连接CTP接口并登录,第一个参数是接口设置,第二个参数是接口名称
main_engine.connect(ctp_setting, CtpGateway.default_name)
# 等待30秒,CTP接口连接后需要一段时间来完成初始化
sleep(30)
# 提示用户程序已经开始运行,用户可以根据需要随时退出
input(">>>>>> 高频行情数据录制已启动,正在记录数据。按回车键退出程序 <<<<<<")
# 关闭主引擎实现安全退出,避免出现内存中未入库数据的丢失
main_engine.close()
# Python程序的标准入口写法,直接运行此脚本时会执行run_recorder函数
if __name__ == "__main__":
run_recorder()