VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy

置顶主题

【VNPY进阶】on_tick函数内撤单追单详解,实盘在用的代码,没有坑哦

0.修改OrderData如下:

@dataclass
class OrderData(BaseData):
    """
    Order data contains information for tracking lastest status 
    of a specific order.
    """

    symbol: str
    exchange: Exchange
    orderid: str

    type: OrderType = OrderType.LIMIT
    direction: Direction = Direction.NET
    offset: Offset = Offset.NONE
    price: float = 0
    volume: float = 0
    traded: float = 0
    status: Status = Status.SUBMITTING
    datetime: datetime = None

    cancel_time: str = ""
    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}_{self.exchange.value}/{self.gateway_name}"
        self.vt_orderid = f"{self.gateway_name}_{self.orderid}"
        #未成交量
        self.untrade = self.volume - self.traded

1.策略init初始化参数

        #状态控制初始化
        self.chase_long_trigger = False
        self.chase_sell_trigger = False
        self.chase_short_trigger = False
        self.chase_cover_trigger = False  
        self.cancel_status = False
        self.last_vt_orderid = ""
        self.long_trade_volume = 0
        self.short_trade_volume = 0
        self.sell_trade_volume = 0
        self.cover_trade_volume = 0 
        self.chase_interval   =    10    #拆单间隔:秒

get_position_detail参考这个帖子 https://www.vnpy.com/forum/topic/2167-cha-xun-cang-wei-chi-cang-jun-jie-wei-cheng-jiao-wei-tuo-dan-yi-ge-han-shu-gao-ding

2.on_tick里面的代码如下

from vnpy.trader.object import TickData, BarData, TradeData, OrderData,Status
    def __init__(self, strategy_engine: StrategyEngine, strategy_name: str,vt_symbols: List[str], setting: dict):
        """
        """
        super().__init__(strategy_engine, strategy_name, vt_symbols, setting)
        #撤单条件选择,默认使用超时撤单,为False使用突破价格范围撤单
        self.cancel_timer_trigger = True
    def on_tick(self, tick: TickData):
        active_orders = self.get_position_detail(chase_vt_symbol).active_orders
        vt_orderid = ""
        if active_orders:
            #委托完成状态
            order_finished = False
            self.last_vt_orderid = list(active_orders.items())[0][0]         #委托单vt_orderid
            active_order:OrderData = list(active_orders.items())[0][1]      #委托单类 
            if self.cancel_timer_trigger:
                #撤单触发条件,超时撤单
                trigger_status = (raw_tick.datetime - active_order.datetime).total_seconds() > self.chase_interval
            else:
                price_tick = self.get_contract_detail(chase_vt_symbol).price_tick
                #突破价格范围撤单
                trigger_status = not active_order.price - price_tick * self.cancel_trigger_payup <= raw_tick.last_price <= active_order.price + price_tick * self.cancel_trigger_payup
            #开平仓追单,部分交易没有平仓指令(Offset.NONE)
            if active_order.offset in (Offset.NONE,Offset.OPEN):
                if active_order.direction == Direction.LONG:
                    self.long_trade_volume = active_order.untrade
                    if trigger_status and self.long_trade_volume > 0 and (not self.chase_long_trigger) and self.last_vt_orderid:
                        #撤销之前发出的未成交订单
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_long_trigger = True
                elif active_order.direction == Direction.SHORT:
                    self.short_trade_volume = active_order.untrade    
                    if trigger_status and self.short_trade_volume > 0 and (not self.chase_short_trigger) and self.last_vt_orderid:  
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_short_trigger = True
            #平仓追单
            elif active_order.offset in (Offset.CLOSE,Offset.CLOSETODAY,Offset.CLOSEYESTERDAY):
                if active_order.direction == Direction.SHORT: 
                    self.sell_trade_volume = active_order.untrade
                    if trigger_status and self.sell_trade_volume > 0 and (not self.chase_sell_trigger) and self.last_vt_orderid: 
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_sell_trigger = True                                                    
                if active_order.direction == Direction.LONG:
                    self.cover_trade_volume = active_order.untrade
                    if trigger_status and self.cover_trade_volume > 0 and (not self.chase_cover_trigger) and self.last_vt_orderid:                                                       
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_cover_trigger = True   
        else:
            order_finished = True
            self.cancel_status = False
        #追单的委托单状态是正常的撤销状态则发出追单指令
        if self.get_order(self.last_vt_orderid) and self.get_order(self.last_vt_orderid).status == Status.CANCELLED:
            if self.chase_long_trigger:
                if order_finished:
                    self.buy(chase_vt_symbol,raw_tick.ask_price_1,self.long_trade_volume)
                    self.long_trade_volume = 0
                    self.chase_long_trigger = False  
                else:
                    self.cancel_surplus_order(list(active_orders))
            elif self.chase_short_trigger:
                if  order_finished:
                    self.short(chase_vt_symbol,raw_tick.bid_price_1,self.short_trade_volume)
                    self.short_trade_volume = 0
                    self.chase_short_trigger = False 
                else:
                    self.cancel_surplus_order(list(active_orders))
            elif self.chase_sell_trigger:
                if order_finished:
                    self.sell(chase_vt_symbol,raw_tick.bid_price_1,self.sell_trade_volume)
                    self.sell_trade_volume = 0
                    self.chase_sell_trigger = False                      
                else:
                    self.cancel_surplus_order(list(active_orders))
            elif self.chase_cover_trigger:
                if order_finished:
                    self.cover(chase_vt_symbol,raw_tick.ask_price_1,self.cover_trade_volume)
                    self.cover_trade_volume = 0
                    self.chase_cover_trigger = False
                else:
                    self.cancel_surplus_order(list(active_orders))
    #------------------------------------------------------------------------------------
    def cancel_surplus_order(self,orderids:list):
        """
        撤销剩余活动委托单
        """
        if not self.cancel_status:
            for vt_orderid in  orderids:
                self.cancel_order(vt_orderid)
            self.cancel_status = True
# template.py里面增加
    #------------------------------------------------------------------
    def get_order(self,vt_orderid:str) -> Union[OrderData,None]:
        """
        通过vt_orderid获取委托单
        """
        return self.cta_engine.get_order(vt_orderid) 
# cta_strategy\engine.py里面增加
    #------------------------------------------------------------------------------------
    def get_order(self,vt_orderid:str) -> Union[OrderData,None]:
        """
        通过vt_orderid获取委托单
        """
        self.main_engine.get_order(vt_orderid)


【vnpy_xt】迅投研数据服务模块正式上线!

耗时2个多月和迅投团队配合测试对接,VeighNa框架的迅投研数据服务的接口vnpy_xt正式上线,支持股票、期货、期权、基金等历史量价数据的获取。

迅投为VeighNa社区提供了专属的试用申请链接:https://xuntou.net/#/signup?utm_source=vnpy

注册申请后即可获取14天的免费试用,目前数据流量上限较高,推荐有需要的同学不要错过!!!(在有效期内多下载一些数据)

整体使用流程如下:

  1. http://docs.thinktrader.net/下载安装xtquant安装包,解压后放置到c:\veighna_studio\lib\site_packages文件夹下
  2. 完成前文的试用账号注册后,登录https://xuntou.net/#/login,在【用户中心】-【个人设置】-【接口TOKEN】处获取Token
  3. 安装vnpy_xt模块:pip install vnpy_xt
  4. 在VeighNa Trader的【全局配置】中配置数据服务相关字段:
  5. datafeed.name:xt
  6. datafeed.username:token
  7. datafeed.password:填第二步中复制的Token

使用过程中遇到任何问题可以通过社区论坛寻投研专区提问交流:https://www.vnpy.com/forum/forum/35-xun-tou-yan



整理的一些我自己学习vnpy疑问和解答,欢迎指正。

问题1:
同样的bar为什么在一个函数里是5分钟K线,又在另一个函数里是10分钟K线呢?

def on_5min_bar(self, bar: BarData):
am = self.am
am.update_bar(bar) 为什么这里的bar是5分钟呢?

def on_10min_bar(self, bar: BarData):
am = self.am
am.update_bar(bar) 为什么这里的bar是10分钟呢?

以下AI的回答:
这个问题涉及到金融市场的数据处理,尤其是在使用不同时间框架的K线数据进行交易策略开发时常见。在你提供的代码片段中,on_5min_bar和on_10min_bar函数分别处理不同时间周期的K线数据。原因在于
虽然他们的内部逻辑(如更新数据、进行计算等)可能相似或相同,但是这两个函数被设计用来响应特定的时间间隔的数据更新(一个是5分钟,一个是10分钟),所以会出你所说的情况。

如何区分5分钟和10分钟K线
触发机制: 这些函数通常由不同的触发事件调用。例如,在一个交易系统中,可能有一个计时器或监听器,

当5分钟K线数据更新时,触发on_5min_bar函数,此时bar就是5分钟K线;
当10分钟K线数据更新时,触发on_10min_bar函数,此时bar就是10分钟K线。

数据源: 每个函数接收的bar参数代表一个特定时间段的K线数据。这个数据通常是由市场数据提供商按照不同的时间间隔生成并发送的,例如每5分钟或每10分钟更新一次。

代码结构
函数定义: 你的函数on_5min_bar和on_10min_bar通过名称就已经隐含了它们处理的是不同时间框架的数据。
数据处理: 函数内部,调用am.update_bar(bar)实际上是将传入的bar数据(无论是5分钟还是10分钟的数据)更新到某个数据管理系统(如am)中。这里am可能是一个累积/聚合数据的类实例,用于进一步的数据分析或交易信号生成。

实际应用
在实际应用中,确保正确的数据被送到正确的处理函数是很重要的,通常需要在数据传入系统之前就对其进行正确的分类和调度。这样的设计允许策略开发者针对不同的数据粒度制定不同的交易逻辑和管理策略。

总结来说,每个函数处理的K线时间长度由触发这些函数的事件决定,这些事件依据K线数据的生成频率来区分。

另一回答:
在交易策略或回测系统中,不同的K线周期(如1分钟、5分钟、10分钟等)通常是通过不同的回调函数或处理函数来处理的。在您提供的代码片段中,on_5min_bar 和 on_10min_bar 函数分别处理5分钟和10分钟的K线数据。

这里的 bar 之所以在 on_5min_bar 函数中是5分钟K线,在 on_10min_bar 函数中是10分钟K线,是因为这些函数是分别设计用来处理相应周期的K线数据的。通常,这样的设计是通过某种机制(可能是内部的事件驱动或回调函数注册)来实现的,该机制在特定周期的K线数据准备好时调用相应的处理函数。

例如,在vn.py这样的量化交易框架中,您可能会设置一个BarGenerator来监听实时数据流或历史数据,并根据设置的周期(如5分钟或10分钟)来生成不同周期的K线数据。当5分钟K线完成时,on_5min_bar 函数会被调用,并传入这个5分钟的K线数据。同理,当10分钟K线完成时,on_10min_bar 函数会被调用,并传入这个10分钟的K线数据。

简单来说,不同的回调函数处理不同周期的K线数据,是因为它们在设计时就被指定用来处理特定周期的K线。在您的策略代码中,您应该已经定义了这些回调函数,并且将它们与相应的K线周期进行了关联。

请注意,这里的 bar 变量只是一个参数,它代表当前处理的K线数据。在不同的函数调用中,传入的 bar 参数值会根据上下文(即当前的K线周期)而有所不同。因此,尽管变量名都是 bar,但它们代表的数据内容和周期是不同的。



2025年第5次社区活动 - 【期货程序化交易管理办法详解】 - 8月30日(上海)

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:VeighNa小助手 | 发布时间:2025-08-11
 
去年【社区活动尊享卡】的受欢迎程度大幅超出我们的预期,为了保证每场社区活动的交流质量,尊享卡已经变更为仅对部分专业交易员用户定向提供。对于参加活动较多的同学强烈推荐!购买请扫描二维码添加小助手咨询:

description

8月8日,五家期货交易所联合发布了《程序化交易管理办法》,新规将于2025年10月9日正式实施。 尽管尚有6个月的过渡期,但许多期货公司已提前行动,着手联系客户办理委托协议签署及程序化交易的报备事宜。 我们注意到近期VeighNa社区内关于此事的讨论与疑问也日益增多。 为此计划举办一场社区活动,将我们与期货公司的沟通要点进行梳理,为大家集中答疑解惑。

本次活动将于8月30日(周六)下午2:00至5:00在上海举办。普通报名仅支持线下参会,尊享卡持有者和Elite会员可通过线上直播参与。活动具体地址将在微信群中公布,请在报名成功后扫码加入社区活动群,以便获取相关信息!

 

活动内容大纲

 

  1. 监管公告文件解读

    a. 两大公告文件

    i. 证监会《期货市场程序化交易管理规定(试行)》
    ii. 期货交易所《程序化交易管理办法》

    b. 监管规定梳理

    i. 程序化交易报告管理
    ii. 外部系统接入管理
    iii. 主机托管和席位管理
    iv. 交易行为监控管理

  2. 穿透式认证测试细节

    a. 穿透式认证中的核心概念

    i. 关于AppID和AuthCode
    ii. 什么是直连和中继接入
    iii. API的测试和生产版本

    b. 更加严格的认证测试流程

    i. 完整测试流程步骤梳理
    ii. 选择合适的测试硬件环境
    iii. 交易环节所需的信息采集记录
    iv. 测试表格中的填写细节

  3. 新规下的VeighNa适配

    a. RiskEngine风控引擎扩展

    i. 委托撤单相关
    ii. 验资验券相关
    iii. 交易行为相关

    b. 策略开发中的注意事项

    i. 优先选择简单化模式
    ii. 细粒度挂撤单算法管理
    iii. 交易时间段筛选控制

  4. 闭门交流环节
     

时间:8月30日 14:00-17:00

地点:上海(具体地址后续在微信群中通知)

报名费:99元(Elite会员免费参加)

报名方式:扫描下方二维码报名(报名后请扫码加入社区活动微信群获取参会地址)

 

description

 



AI 编程开发工具对比:Cursor、Gemini CLI 与 Claude Code

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:VeighNa小助手 | 发布时间:2025-07-23
 

特别说明:本文内容基于我们团队在特定时间点的研究和信息收集。由于技术迭代迅速,部分信息可能存在时效性或未能完全覆盖所有细节。我们非常欢迎并期待来自社区的反馈和讨论,共同完善认知。
 

概述

AI 正在重塑软件开发的各个环节。当我们讨论 AI 编程时,已不再仅仅局限于代码补全。如今,我们选取了三款具有代表性的工具:Cursor、Gemini CLI 与 Claude Code,它们各自代表了不同的产品理念与技术路径。本文将对它们进行深入的对比分析,以帮助开发者理解其核心差异和独特优势。

 

产品定位与架构对比

 

Cursor

产品特点:

  • 核心定位:AI 原生代码编辑器;
  • 主要优势:基于 VS Code 深度整合 AI 能力,提供无缝的嵌入式体验;
  • 工作模式:可视化编程环境;
  • 生态支持:继承 VS Code 完整的 IDE 功能和插件生态,并提供稳定的商业化支持。

Gemini CLI

产品特点:

  • 核心定位:命令行 AI 工作流工具;
  • 主要优势:支持超大上下文窗口(1M+ tokens)、强大的多模态能力,且 Google Gemini 费用相对低廉;
  • 工作模式:命令行交互环境;
  • 生态支持:完全开源,通过 MCP 协议扩展工具生态,支持项目级与用户级配置;

Claude Code

产品特点:

  • 核心定位:深度集成 IDE 的编程工作流助手;
  • 主要优势:强大的代码理解能力,以及丰富的内置工具;
  • 工作模式:命令行交互环境;
  • 生态支持:支持与 VS Code、JetBrains 等多种 IDE 深度集成,提供多层级配置体系。

     

功能特性详细对比

 

IDE 集成能力

功能 Cursor Gemini CLI Claude Code
选区上下文共享 原生支持 通过选定文件 通过插件支持
标签页同步 完全集成 通过选定文件 通过插件支持
诊断信息共享 编辑器内置 纯命令行模式 通过插件支持
快捷键支持 多种操作快捷键 暂不支持 仅有启动快捷键
支持的 IDE Cursor (VS Code) 基础编辑器支持 VS Code、JetBrains 系列

配置和记忆系统

功能 Cursor Gemini CLI Claude Code
项目级AI规则 .cursorrules.cursor/rules/ 项目级GEMINI.md 项目级CLAUDE.md
用户级AI规则 User Rules 全局~/.gemini/GEMINI.md + save_memory命令 全局~/.claude/CLAUDE.md + #快速添加
规则执行效果 不够稳定 较为可靠 较为可靠
历史对话记录 @past chats 当前对话中一键引用历史对话 /chat save/resume 可恢复之前手动保存的对话 --resume 开启对话前可选择恢复自动保存的对话

 

代码生成对比

 

开发任务

理论对比完成后,还是要看看这三款工具在实际工作中的表现。接下来,我们将通过一个简单的 VeighNa 数据导出脚本开发任务,来对比三款工具在同一个编程任务中的生成结果与工作流程。

以下是统一使用的提示词:

作为一名精通VeighNa框架的Python开发专家,请提供一个可以直接运行的Python脚本。

核心任务:
通过调用 'get_database' 函数来创建数据库实例,进而批量导出所有K线数据。

具体要求:
1.  参考实现:
    a. 'get_database' 的用法,请参考文件 'E:\github\vnpy\vnpy\trader\database.py'。
    b. 数据查询和文件生成的逻辑,请参考当前 'vnpy_datamanager' 插件内的实现。
2.  本地优先:为避免使用网上过时或不兼容的代码,请完全基于本地已有的代码和您对VeighNa框架的理解来完成任务。
3.  运行时输出:脚本执行时,每处理一个数据集,都需要在控制台清晰地打印出其合约代码、时间频率和数据长度。
4.  代码质量:
    a. 确保代码风格清晰、可读性好。
    b. 优先使用条件判断(如 'if')处理可预见的流程,避免不必要的 'try-except' 语句。

 

Cursor

import csv
from pathlib import Path

from vnpy.trader.database import get_database
from vnpy.trader.object import BarData

# 获取数据库实例
db = get_database()

# 获取所有K线数据的概览信息
overviews = db.get_bar_overview()

# 检查导出的数据存放路径
data_path = Path("output_data")
if not data_path.exists():
    data_path.mkdir()

# 遍历所有K线数据
for overview in overviews:
    # 加载特定K线的全部历史数据
    bars: list[BarData] = db.load_bar_data(
        symbol=overview.symbol,
        exchange=overview.exchange,
        interval=overview.interval,
        start=overview.start,
        end=overview.end
    )

    # 在控制台打印信息
    print(f"合约代码:{overview.symbol}")
    print(f"时间频率:{overview.interval.value}")
    print(f"数据长度:{len(bars)}")
    print("-" * 30)

    # 将数据写入到CSV文件中
    fieldnames: list = [
        "symbol",
        "exchange",
        "datetime",
        "open",
        "high",
        "low",
        "close",
        "volume",
        "turnover",
        "open_interest"
    ]

    filename = (
        f"{overview.symbol}_"
        f"{overview.exchange.value}_"
        f"{overview.interval.value}.csv"
    )
    file_path: Path = data_path.joinpath(filename)

    with open(file_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()

        for bar in bars:
            d: dict = {
                "symbol": bar.symbol,
                "exchange": bar.exchange.value,
                "datetime": bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
                "open": bar.open_price,
                "high": bar.high_price,
                "low": bar.low_price,
                "close": bar.close_price,
                "volume": bar.volume,
                "turnover": bar.turnover,
                "open_interest": bar.open_interest,
            }
            writer.writerow(d)

点击查看Cursor输出过程

从上图可以看出,Cursor 作为一款 AI 原生 IDE,其优势体现在与编码过程的深度融合上:

  • 无缝的 IDE 集成:用户在熟悉的类 VS Code 环境中操作,通过图形界面选择模型、管理上下文、查看 Diff,学习成本低。
  • 多维度的上下文管理:支持通过界面直接添加文件、利用 @ 符号引用代码片段或历史对话,上下文控制灵活直观。
  • 强大的编码辅助:除了生成代码,它还提供实时的代码补全、智能提示,并在编码过程中利用 IDE 的 Linter 进行检查和修正。
  • 支持多文件夹工作空间:可以轻松应对需要跨多个项目或代码库参考的复杂任务。
     
    #### Gemini CLI
# export_all_data.py
import os
import csv
from datetime import datetime
from typing import List

from vnpy.trader.database import get_database, BarOverview
from vnpy.trader.object import BarData
from vnpy.trader.constant import Exchange, Interval

# 创建用于存放导出数据的文件夹
output_folder = "kline_data_exported"
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# 获取数据库实例
# 该函数会自动读取全局配置(通常是 settings.json)来连接数据库
database = get_database()

# 获取所有K线数据的概要信息
overviews: List[BarOverview] = database.get_bar_overview()

print(f"发现 {len(overviews)} 个K线数据集,开始处理...")
print("-" * 60)

# 遍历所有数据集
for overview in overviews:
    symbol: str = overview.symbol
    exchange: Exchange = overview.exchange
    interval: Interval = overview.interval
    start: datetime = overview.start
    end: datetime = overview.end

    # 从数据库加载该合约的全部K线数据
    bars: List[BarData] = database.load_bar_data(
        symbol=symbol,
        exchange=exchange,
        interval=interval,
        start=start,
        end=end
    )

    data_length = len(bars)

    # 检查是否有数据
    if not data_length:
        print(f"合约: {symbol}, 频率: {interval.value} -> 数据长度为 0, 跳过")
        continue

    # 打印处理信息
    print(f"合约: {symbol}, 频率: {interval.value} -> 正在导出 {data_length} 条数据...")

    # 构建CSV文件名和路径
    file_name = f"{symbol}_{exchange.value}_{interval.value}.csv"
    file_path = os.path.join(output_folder, file_name)

    # 定义CSV表头
    fieldnames: list = [
        "symbol",
        "exchange",
        "datetime",
        "interval",
        "open",
        "high",
        "low",
        "close",
        "volume",
        "turnover",
        "open_interest"
    ]

    # 将数据写入CSV文件
    with open(file_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()

        for bar in bars:
            row_data = {
                "symbol": bar.symbol,
                "exchange": bar.exchange.value,
                "datetime": bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
                "interval": bar.interval.value,
                "open": bar.open_price,
                "high": bar.high_price,
                "low": bar.low_price,
                "close": bar.close_price,
                "volume": bar.volume,
                "turnover": bar.turnover,
                "open_interest": bar.open_interest,
            }
            writer.writerow(row_data)

print("-" * 60)
print(f"所有数据导出完成,文件已保存至 '{output_folder}' 文件夹。")

点击查看Gemini CLI输出过程

从上图可以看出,Gemini CLI 体现了其作为命令行工具的独特工作流:

  • 计划先行:在执行前,它会先分析问题、阅读代码,然后输出详细的计划步骤并提出运行建议。
  • 项目为基:与 Cursor 不同,它和 Claude Code 都以项目文件夹为基础进行交互,因此对于外部文件需要指定绝对路径。
  • 注重安全:Gemini CLI 非常注重安全性,在处理多文件夹交互时,推荐使用其沙盒模式来保障环境。
  • 部署灵活:其纯命令行模式不仅可以在本地 IDE 中使用,也非常适合在服务器上进行远程开发。
     
    #### Claude Code
#!/usr/bin/env python3
"""
批量导出所有K线数据的Python脚本
基于VeighNa框架的vnpy_datamanager插件实现
"""

import csv
import os
from datetime import datetime
from pathlib import Path

from vnpy.trader.database import BaseDatabase, get_database, BarOverview
from vnpy.trader.constant import Interval, Exchange
from vnpy.trader.object import BarData


def create_export_directory() -> Path:
    """创建导出目录"""
    export_dir = Path("exported_kline_data")
    export_dir.mkdir(exist_ok=True)
    return export_dir


def generate_filename(symbol: str, exchange: Exchange, interval: Interval) -> str:
    """生成导出文件名"""
    interval_name_map = {
        Interval.MINUTE: "1min",
        Interval.HOUR: "1hour", 
        Interval.DAILY: "1day"
    }

    interval_suffix = interval_name_map.get(interval, interval.value)
    return f"{symbol}_{exchange.value}_{interval_suffix}.csv"


def export_bar_data_to_csv(
    database: BaseDatabase,
    file_path: str,
    symbol: str,
    exchange: Exchange,
    interval: Interval,
    start: datetime,
    end: datetime
) -> bool:
    """导出K线数据到CSV文件"""
    bars = database.load_bar_data(symbol, exchange, interval, start, end)

    if not bars:
        print(f"  警告: {symbol}.{exchange.value} {interval.value} 无数据")
        return False

    fieldnames = [
        "symbol",
        "exchange", 
        "datetime",
        "open",
        "high",
        "low",
        "close",
        "volume",
        "turnover",
        "open_interest"
    ]

    with open(file_path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()

        for bar in bars:
            data_row = {
                "symbol": bar.symbol,
                "exchange": bar.exchange.value,
                "datetime": bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
                "open": bar.open_price,
                "high": bar.high_price,
                "low": bar.low_price,
                "close": bar.close_price,
                "volume": bar.volume,
                "turnover": bar.turnover,
                "open_interest": bar.open_interest,
            }
            writer.writerow(data_row)

    return True


def main():
    """主函数:批量导出所有K线数据"""
    print("=== VeighNa K线数据批量导出工具 ===")
    print("正在初始化数据库连接...")

    # 获取数据库实例
    database: BaseDatabase = get_database()

    # 创建导出目录
    export_dir = create_export_directory()
    print(f"导出目录: {export_dir.absolute()}")

    # 获取所有K线数据概览
    print("正在获取数据库中的K线数据概览...")
    overviews: list[BarOverview] = database.get_bar_overview()

    if not overviews:
        print("数据库中没有找到任何K线数据")
        return

    print(f"发现 {len(overviews)} 个数据集")
    print("-" * 80)

    success_count = 0
    failed_count = 0

    # 遍历所有数据集进行导出
    for i, overview in enumerate(overviews, 1):
        # 生成文件名和完整路径
        filename = generate_filename(overview.symbol, overview.exchange, overview.interval)
        file_path = export_dir / filename

        # 输出处理信息
        print(f"[{i:3d}/{len(overviews)}] 正在处理: {overview.symbol}.{overview.exchange.value} | "
              f"频率: {overview.interval.value} | 数据量: {overview.count:,} 条")
        print(f"        时间范围: {overview.start.strftime('%Y-%m-%d %H:%M:%S')} "
              f"至 {overview.end.strftime('%Y-%m-%d %H:%M:%S')}")

        # 导出数据
        success = export_bar_data_to_csv(
            database,
            str(file_path),
            overview.symbol,
            overview.exchange,
            overview.interval,
            overview.start,
            overview.end
        )

        if success:
            file_size = os.path.getsize(file_path)
            print(f"        ✓ 导出成功: {filename} ({file_size:,} 字节)")
            success_count += 1
        else:
            print(f"        ✗ 导出失败: {filename}")
            failed_count += 1

        print()

    # 输出汇总信息
    print("=" * 80)
    print("导出完成!")
    print(f"成功导出: {success_count} 个文件")
    print(f"导出失败: {failed_count} 个文件")
    print(f"导出位置: {export_dir.absolute()}")

    if success_count > 0:
        print("\n导出的文件列表:")
        for file_path in sorted(export_dir.glob("*.csv")):
            file_size = os.path.getsize(file_path)
            print(f"  - {file_path.name} ({file_size:,} 字节)")


if __name__ == "__main__":
    main()

点击查看Claude Code输出过程

从输出过程可以看出,Claude Code 强调的是结构化的任务管理和丰富的工具生态:

  • 结构化任务管理:在接收到复杂任务后,通过 TodoWrite 工具将任务分解为清晰的步骤并实时展示进度,让用户对AI的工作状态一目了然。
  • 人机协作流程:生成代码后会主动询问用户是否执行,提供了交互式的确认环节,增强了可控性(此行为可配置)。
  • 深度 IDE 集成:支持通过快捷键在 VS Code、JetBrains 等主流 IDE 中启动,实现上下文共享、差异对比和诊断信息利用。
  • 多模态辅助:支持便捷的图片输入,可以直接分析截图中的UI或错误信息,拓宽了交互的维度。

     

总结与选型建议

 

经过前面的详细对比和实战测试,我们对这三款工具有了全面的认识。它们各有千秋,而选择的关键在于开发者的具体需求和工作流偏好。

核心优势一览

工具 核心优势
Cursor AI与IDE的无缝融合体验,强大的实时编码辅助。
Gemini CLI 卓越的长上下文处理能力,高度可定制的开源框架。
Claude Code 完善的工具链生态与结构化任务管理,深度集成主流IDE。

适用场景推荐

Cursor

  • 提供实时的代码补全和预测(增强版智能提示)
  • 无缝集成 VS Code(完整插件生态支持 + 可视化的变更检查和编辑)
  • 支持多文件夹工作空间(便于在多个项目间切换开发)
  • 适合需要参考历史对话的项目(@past chats 功能)

Gemini CLI

  • 适合大型代码库的分析和重构(超大上下文窗口优势)
  • 适用于服务器环境和远程开发(纯命令行)
  • 注重项目安全的开发环境(沙盒模式 + 请求确认机制)
  • 适合需要深度定制的开发工作流(开源便于个性化开发)

Claude Code

  • 深度集成 IDE 的开发工作流(VS Code、JetBrains 自动上下文共享)
  • 提供丰富的工具链(智能文件导航、智能上下文选择等)
  • 支持图片辅助开发(直接传图片)
  • 提供结构化项目管理(TodoWrite 任务分解和进度跟踪)

成本对比

工具 成本水平 免费额度
Cursor 中等 提供两周免费试用
Gemini CLI 包含免费层级的 API 调用,新用户可叠加 GCP 试用赠金
Claude Code 无免费版,需订阅 Pro/Max

 

结语

 
三款工具各有其独特优势,选择应基于具体需求。随着工具的不断发展,未来的功能差距可能会缩小。每款工具都有其独特优势和适用场景,Cursor 更适合需要通过 IDE 实时编写代码的用户,Gemini CLI 适合对长上下文感知和项目安全有高要求的用户,Claude Code 则在 IDE 深度集成和丰富工具链方面表现突出。在实践中,建议开发者还是要根据自己的具体需求和工作方式选择合适的工具,或者考虑组合使用,以达到最佳的开发效率。



在Mac上安装vnpy,保证一次成功!

一、提前下载需要的安装包:
1、Miniconda3
https://docs.conda.io/en/latest/miniconda.html#
选择MacOSX installers里的最新版本,这里是Python 3.9下载。

2、pycharm
pycharm-community-2020.3.3.dmg
从官网上下载社区版https://www.jetbrains.com/pycharm/

3、vnpy安装包(解压后,复制文件夹到自己喜欢的位置)
从vnpy在gitee的官方地址下载最新的安装包,采用zip格式下载。
https://gitee.com/vnpy/vnpy

二、安装
1、安装Miniconda,这里是Miniconda3-latest-MacOSX-x86_64.pkg
2、添加国内源:
添加国内源:在当前用户下,编辑.condarc,内容如下:
channels:

3、创建虚拟环境
conda create -n py37_vnpy python=3.7
conda activate py37_vnpy
(退出:conda deactivate)

4、安装python.app
conda install -c conda-forge python.app
可能会因为网络问题不成功,多试几次。

5、安装pycharm-community-2020.3.3.dmg
从官网上下载社区版https://www.jetbrains.com/pycharm/

7、打开vnpy所在的文件夹,进行配置
点击‘PyCharm’菜单->Preferences菜单->Project:vnpy一级菜单->Python Interpreter二级菜单->点击右上齿轮->Add菜单->Conda Environment->Existing enviroment->Interpreter:/opy/miniconda3/envs/py37_vnpy/bin/pythonw(选择前面新建的虚拟环境的pythonw)->点OK->点OK->点OK

8、(确认在PyCharm里已经打开了vnpy项目),在PyCharm的底部,找到Terminal的标签,点击,进入py37_vnpy环境的终端,并且当前路径位于vnpy项目的文件夹。
执行以下的安装语句(requirements.txt是vnpy项目文件夹下面的一个文件),这个安装时间比较长,需要较好的网络。

pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

9、创建run.py文件,复制以下代码,来源 README.md

因为mac上不支持ctp接口,所以要注释掉ctp接口,否则运行会报错。

from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp

# from vnpy.gateway.ctp import CtpGateway

from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_backtester import CtaBacktesterApp

def main():
"""Start VN Trader"""
qapp = create_qapp()

event_engine = EventEngine()
main_engine = MainEngine(event_engine)

# main_engine.add_gateway(CtpGateway)
main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp)

main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()

qapp.exec()


if name == "main":
main()

10、运行 python run.py,注意环境名称是 py37_vnpy



《vn.py 3.0.0源代码深入分析》

我学Python的目的很明确,就是量化交易。从一开始就有关注vn.py,但我学的是Python3,那时vn.py还处于版本1.x时期,所以只能望vn.py兴叹。
vn.py 2.0出来之后我并没有及时注意,等反应过来已经是2.0.7版。很兴奋,认真研究,并将心得写成《vn.py 2.0.7源代码深入分析》,分享在vn.py社区的经验分享板块。
出于对量化交易的爱好,出于对Python在量化交易中作用的认同,一定程度受vn.py强大功能的鼓舞,我与同事合写了《Python量化交易从入门到实战》一书,对vn.py的讨论是其中很重要的一部分内容。
后续又写了《vn.py 2.1.4源代码深入分析》和《vn.py 2.2.0源代码深入分析》两个文档,感谢各位老师的认可。
vn.py 3.0.0版发布于2022-03-23,这是我一直期待的一个版本,所以它刚一推出,我就立刻开始试用,并着手整理《vn.py 3.0.0源代码深入分析》。夜以继日,终于在前天完成。先发到了书籍的资源群中,接受了两天批评,现分享到此处。
写作本文档的一个主要目的是对vn.py的开源精神做出一点支持,希望本文档能够对大家学习使用vn.py有所帮助。

百度网盘链接:https://pan.baidu.com/s/1cl2MA9hNFhHlxfHM0gGe2A
提取码:s7u6



MacOS M1 安装 VeighNa 3.7.0 过程实录(基于Anaconda)

入门选手,参考了很多论坛和知乎的官方教程以及解决方案,在MacOS M1 Anaconda虚拟环境下安装成功 VeighNa 3.7.0 。具体功能还未探索,仅限于打开UI界面...
主要参考资料:https://zhuanlan.zhihu.com/p/608546190https://www.vnpy.com/forum/topic/31679-macos-yin-te-er-xin-pian-lao-ban-an-zhuang-veighna-3-6-0-ban-ben-shi-jian-bi-ji

一、下载

  1. 官网下载安装 Anaconda
  2. 创建新环境,选择python3.10
    description

  3. 下载vnpy3.7.0 (download zip并解压):https://github.com/vnpy/vnpy
    description

二、安装

  1. terminal打开Anaconda vnpy 环境:

    conda activate vnpy
  2. 安装TA-Lib和NumPy:

    brew install ta-lib
    pip install ta-lib==0.4.24
    pip install numpy
  3. 安装vnpy和相关配置:

    pip install vnpy
    pip install vnpy_ctastrategy vnpy_ctabacktester vnpy_datamanager vnpy_sqlite vnpy_rqdata

    相关配置可以根据需求调整上述只为例子包括CTA策略实盘和回测模块,历史数据管理模块,SQLite数据库驱动,RQData数据服务适配器

  4. 安装CTP交易接口:
    pip install vnpy_ctp
    安装过程中如果出现报错某些依赖库的缺失,可以尝试pip install该依赖库

三、启动界面

  1. 在下载的vnpy3.7.0文件夹中,任意目录下创建run.py,写入示例代码:代码参考https://github.com/vnpy/vnpy 脚本运行部分
  2. 打开run.py所在目录/路径:

    cd 具体路径, 如cd /Users/name/Downloads/vnpy-master/vnpy
  3. 运行run.py:

    python3 run.py
  4. 潜在问题
    最开始试的是python run.py, 会出现ModuleNotFoundError: No module named 'vnpy',改为python3 run.py 后解决报错
    如出现报错“missing font family "微软雅黑" with one that exists to avoid this cost. 找不到数据服务驱动vnpy...."mac系统运行vnpy正常的提示,通常不会影响正常的页面启动
    如出现报错“AttributeError: 'PySide6.QtWidgets.QDockWidget' object has no attribute 'DockWidgetFloatable'.”, 可尝试
    pip install PySide6==6.3.0

上述方法均是本人参考后使用的解决方案,以及中间不小心关了terminal,凭回忆记录了一部分过程,不保证百分百正确,具体情况也可能不一样,仅供参考。



VeighNa发布v4.1.0 - 整体完成模块移植工作

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2025-07-19
 
7月初发布了VeighNa的4.1.0版本,本次更新的主要内容是完成了绝大多数VeighNa开源社区版中的模块移植(接口、应用等),得益于Python 3.13带来的显著性能提升,强烈建议还在使用3.0大版本的用户升级,感受新一代版本带来的性能飞跃。

对于已经安装了VeighNa Studio 4.0版本的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载\VeighNa Studio-4.1.0**,体验一键安装的量化交易Python发行版,下载链接:

https://download.vnpy.com/veighna_studio-4.1.0.exe

 

聊聊Python 3.13的性能

 

关于Python 3.13具体的性能提升水平,社区里已经有许多同学讨论了,这里借着4.1.0发布的机会,对互联网上的公开资料做个整理。

熟悉Python发展历史的同学可能知道,自3.10版本以来,CPython官方团队在性能优化上投入了巨大的精力,几乎每个新版本都是一次“提速”。下面就来回顾一下这几个版本中和性能相关的核心改动,看看3.13版本对比3.10究竟快了多少。

Python 3.11:革命性的性能飞跃

Python 3.11是“Faster CPython”项目第一个取得丰硕成果的版本,其性能相较于3.10有巨大提升。根据官方文档,Python 3.11在标准基准测试套件上比3.10平均快了1.25倍

主要改进包括:

  1. PEP 659 - 专业化自适应解释器:这是3.11性能提升的核心。解释器现在可以在运行时将通用的字节码替换为针对特定数据类型的“专业化”版本。例如,对于重复执行的a + b操作,如果ab总是整数,解释器会使用专门处理整数加法的快速指令,大大提高了执行效率。
  2. 更快的启动速度:通过将核心模块的字节码“冻结”在内存中,减少了解析和加载时间,使得Python解释器的启动速度提升了10-15%。
  3. 更快的函数调用:优化了函数调用过程中的帧(frame)创建和管理,减少了C栈的使用和内存分配,使得纯Python函数调用更加高效。

Python 3.12:精益求精的持续优化

Python 3.12延续了3.11的势头,在现有基础上进行了更多细致的优化。虽然不像3.11那样有革命性的飞跃,但它同样带来了稳固的性能增长。3.12的官方文档中没有提及具体的平均性能提升数字,但根据社区测试的结果,Intel平台上对比3.10的平均提升在5%

主要改进包括:

  1. 更多的专业化指令:在3.11的基础上,为更多的字节码指令增加了专业化版本,覆盖了更多的代码场景。
  2. 改进的内存管理:通过优化对象结构和垃圾回收机制,减少了内存开销,从而提高了缓存效率。
  3. 解释器循环优化:对解释器的主循环(evaluation loop)进行了生成方式的重构,使其更容易维护和优化,并为未来的JIT(Just-In-Time)编译器等更激进的优化铺平了道路。

Python 3.13:异步性能的又一次飞跃

Python 3.13继续沿着性能优化的道路前进,根据社区测试的结果,对比3.12的平均性能提升大约是5%(Intel平台)

这一版本的改进主要集中在:

  1. 异步性能大幅提升:对asyncio库进行了大量优化,根据基准测试,asyncio相关任务整体性能提升了\1.19倍**。**
  2. 常用操作加速:许多基础操作,如序列解包(unpack_sequence)、生成器(generators)等都变得更快。
  3. 为未来铺路:虽然这里不讨论实验性功能,但值得一提的是,3.13的许多改动(如试验性的No-GIL模式、新的JIT编译器框架)都在为未来更大幅度的性能提升奠定基础。

结论:所以到底快了多少?

综合来看,从Python 3.10到3.13,CPython的性能经历了持续且显著的增长。通过将各个版本的性能提升进行串联估算 (1.25 1.05 1.05),我们可以得出一个大致的结论:

 

Python 3.13的平均性能大约比Python 3.10快35-40%。

 

对于VeighNa用户而言,这意味着策略回测、实盘交易中涉及的大量纯Python计算逻辑(例如信号计算、交易执行、投研分析等)都将运行得更快,从而降低延迟、提升策略执行效率。因此,我们强烈建议使用开源社区版的用户升级到基于Python 3.13的VeighNa 4.1.0版本,来享受这份免费的“性能午餐”。

 

CHANGELOG

 

新增

  1. vnpy_mcdata新增对于Tick数据查询的支持
  2. OrderType枚举值增加ETF类型,支持ETF申购和赎回业务
  3. 增加遗传算法优化函数run_ga_optimization的入参,允许用户控制优化过程中所使用的全部超参
  4. CTA策略回测引擎,增加对于遗传算法优化函数新入参的支持

调整

  1. 升级扩展模块适配4.0版本,具体模块列表请参考该页面
  2. 使用close函数替代unbind,来实现vnpy.rpc模块中zmq.Socket的安全关闭
  3. 修改PySide6依赖版本为6.8.2.1,解决部分底层warning输出问题
  4. 修改ta-lib依赖版本为0.6.4,解决Linux和Mac系统的安装问题
  5. 调整Qt层捕捉到全局异常时的日志输出级别为Critical
  6. vnpy_datarecorder移除不必要的行情录制异常抛出,改为记录日志
  7. vnpy_rqdata下载股票数据时,除权方式由pre改为pre_volume
  8. 数据库模块录制行情数据时,默认跳过extra字段
  9. vnpy_ib支持10.30.1版本的ibapi,增加对于新版本撤单函数的传参支持

修复

  1. 修复新版本ta-lib中,MA_Type类不再是枚举值导致的部分指标计算问题
  2. 修复补全MainEngine缺失的get_tick函数
  3. 修复邮件发送引擎在使用QQ邮箱时出现的发送后报错问题
  4. 修复日志模块由于缺失默认gateway_name参数,在Qt层捕捉到全局异常时输出错误的问题
  5. vnpy_rohon新增Linux安装脚本,解决动态库找不到的问题
  6. vnpy_rqdata修复品种代码为小写合约的次主力88A2历史数据查询问题

 



基于TDengine实现海量高频Tick数据录制

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2025-03-15
 
在量化交易领域,高质量的行情数据是策略研发和回测的基础。近期在VeighNa社区中,经常有用户咨询关于高频Tick数据录制的问题。

传统的SQLite等关系型数据库在面对大量合约同时录制的场景时,往往因写入性能瓶颈导致丢包,影响数据完整性。

为了解决以上难题,本文中将介绍如何利用VeighNa量化平台结合TDengine时序数据库,实现高效的行情数据录制。

 

安装配置TDengine

 

对于大多数VeighNa用户,推荐使用TDengine 3.0版本的Docker容器镜像来安装配置时序数据库的服务端程序。

首先需要确保操作系统中已经安装好了Docker,并将其设置为随操作系统启动自动运行。使用Windows系统的同学可以参考这篇文章中的Docker Desktop安装教程:https://mp.weixin.qq.com/s/m3whdCs6jRs-Ye3Ip5oZmw

随后使用命令拉取TDengine容器镜像:

docker pull tdengine/tdengine:latest

拉取完成后,通过下述命令来启动数据库服务容器:

docker run -d `
  -v C:/my_tdengine/data:/var/lib/taos `
  -v C:/my_tdengine/log:/var/log/taos `
  -p 6030:6030 `  
  -p 6041:6041 `  
  -p 6043:6043 `  
  -p 6044-6049:6044-6049 `  
  -p 6044-6045:6044-6045/udp `  
  -p 6060:6060 `  
  --restart=always `  
  tdengine/tdengine

注意,以上PowerShell多行命令的每行结尾使用了反引号(键盘Tab上方按键)作为换行符。将上述命令复制到PowerShell窗口中运行,等待几秒后容器启动成功会输出一段较长的随机字符串(容器编号)。

命令中的具体参数(以-或者--开头)说明如下:

-v:用于挂载Windows系统下的指定文件夹到容器中,为容器中运行的TDengine程序提供数据存储输出,可以根据自己的需求修改:
* /var/lib/taos,对应的是TDengine的数据存储路径
* /var/log/taos,对应的是TDengine的日志输出路径。

-p:用于将Windows系统的端口映射绑定到容器中对应的端口,为外部程序提供数据库访问,这些端口参数建议保持不变:
* 6030,主要用于应用程序(如VeighNa)连接TDengine;
* 6041,提供数据库管理终端(如DBeaver)连接TDengine;
* 其他端口提供更加进阶的功能,这里可以忽略。

--restart:用于设置容器的重启策略,always代表每次Docker Desktop启动后都立即启动TDengine容器,结合之前设置的Docker Desktop开机自动启动即可实现时序数据库的后台服务式运行。

 

数据录制脚本

 

安装配置好TDengine后,只需运行一个简单的Python脚本,就能开始高效录制行情数据。

导入必要模块

首先,需要导入相关的Python标准库和VeighNa框架组件:

# 加载Python标准库
from logging import INFO
from time import sleep

# 加载VeighNa核心框架
from vnpy.event import EventEngine, Event
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine, LogEngine
from vnpy.trader.object import ContractData
from vnpy.trader.constant import Exchange, Product
from vnpy.trader.event import EVENT_CONTRACT

# 加载VeighNa插件模块
from vnpy_ctp import CtpGateway
from vnpy_datarecorder import DataRecorderApp, RecorderEngine
from vnpy_datarecorder.engine import EVENT_RECORDER_LOG

这部分代码导入了程序运行所需的各种模块:

  • Python标准库中的日志和时间模块
  • VeighNa核心框架中的事件引擎、主引擎、日志引擎等组件
  • VeighNa的CTP接口和数据录制应用模块

配置日志设置

日志对于监控系统运行状态和排查问题至关重要:

# 开启日志记录功能
SETTINGS["log.active"] = True       # 激活日志功能
SETTINGS["log.level"] = INFO        # 设置日志级别为INFO,输出详细信息
SETTINGS["log.console"] = True      # 在控制台显示日志,方便实时查看

这段代码配置了VeighNa的日志系统,设置为INFO级别并在控制台显示,便于实时监控数据录制过程中的各种事件和可能出现的问题。

设置登录信息

接下来,配置连接到期货市场所需的CTP接口信息:

# CTP接口登录信息
# 以下使用的是SimNow模拟账户信息,初学者可以在SimNow官网申请
ctp_setting: dict[str, str] = {
    "用户名": "demo",                        # SimNow账户名
    "密码": "123456",                        # SimNow密码
    "经纪商代码": "9999",                     # SimNow经纪商代码固定为9999
    "交易服务器": "180.168.146.187:10201",    # SimNow交易服务器地址和端口
    "行情服务器": "180.168.146.187:10211",    # SimNow行情服务器地址和端口
    "产品名称": "simnow_client_test",         # 产品名称,用于区分不同的客户端
    "授权编码": "0000000000000000"            # 授权编码,SimNow模拟账户使用默认值即可
}

这里使用的是SimNow仿真账户(记得替换为你的用户名和密码),在实盘环境中需要替换为实际的期货账户信息。

定义录制范围

为了灵活控制数据录制的范围,脚本允许指定要录制的交易所和品种类型:

# 要录制数据的交易所列表
recording_exchanges: list[Exchange] = [
    Exchange.CFFEX,          # 中国金融期货交易所
    # Exchange.SHFE,         # 上海期货交易所
    # Exchange.DCE,          # 大连商品交易所
    # Exchange.CZCE,         # 郑州商品交易所
    # Exchange.GFEX,         # 广州期货交易所
    # Exchange.INE,          # 上海国际能源交易中心
]

# 要录制数据的品种类型
recording_products: list[Product] = [
    Product.FUTURES,        # 期货品种
    # Product.OPTION,       # 期权品种
]

这段代码定义了两个列表,分别指定要录制数据的交易所和品种类型。默认只录制中国金融期货交易所的期货品种,用户可以根据需要取消注释来添加更多交易所和品种。

主函数实现

脚本的核心是run_recorder函数,它负责初始化系统组件并启动数据录制:

def run_recorder() -> None:
    """
    运行行情录制程序

    该函数是程序的主体,按照以下步骤工作:
    1. 创建VeighNa核心组件(事件引擎、主引擎)
    2. 添加交易接口和应用模块
    3. 设置数据录制规则
    4. 连接到交易所并开始录制数据
    """
    # 创建事件引擎,负责系统内各模块间的通信
    event_engine: EventEngine = EventEngine()

    # 创建主引擎,管理系统功能模块,包括底层接口、上层应用等
    main_engine: MainEngine = MainEngine(event_engine)

    # 添加CTP接口,连接到期货市场
    main_engine.add_gateway(CtpGateway)

    # 添加数据录制引擎,用于录制Tick行情入库
    recorder_engine: RecorderEngine = main_engine.add_app(DataRecorderApp)

这部分代码初始化了VeighNa的核心组件:

  1. 创建事件引擎,负责系统内各模块间的事件驱动通信
  2. 创建主引擎,管理系统的各个功能模块
  3. 添加CTP接口,用于连接到期货市场
  4. 添加数据录制应用,用于将接收到的行情数据保存到数据库

合约订阅处理

接下来,定义了一个合约订阅函数,用于自动订阅符合条件的合约行情:

# 定义合约订阅函数
    def subscribe_data(event: Event) -> None:
        """
        处理合约推送并订阅行情

        当系统接收到合约信息后,根据预设的交易所和品种过滤条件,
        自动为符合条件的合约添加行情录制任务。

        参数:
            event: 包含合约信息的事件对象
        """
        # 从事件对象中获取合约数据
        contract: ContractData = event.data

        # 判断合约是否符合录制条件
        if (
            contract.exchange in recording_exchanges    # 检查合约所属交易所是否在预设列表中
            and contract.product in recording_products  # 检查合约品种类型是否在预设列表中
        ):
            # 添加该合约的行情录制任务,vt_symbol是VeighNa中的唯一标识符,格式为"代码.交易所"
            recorder_engine.add_tick_recording(contract.vt_symbol)
    # 注册合约事件处理函数,当有新合约信息推送时,会自动调用subscribe_data函数
    event_engine.register(EVENT_CONTRACT, subscribe_data)

这里基于VeighNa平台核心的事件驱动架构,实现了一套自动化的合约订阅机制:

  1. 当CTP接口接收到合约信息时,会触发EVENT_CONTRACT事件
  2. 事件引擎调用注册的subscribe_data函数处理该事件
  3. 函数检查合约是否符合预设的交易所和品种条件
  4. 对于符合条件的合约,自动添加Tick数据录制任务

该机制使得系统能够自动识别和订阅符合条件的合约,无需手动指定每个合约代码。

模块日志输出

为了方便监控DataRecorder数据录制模块的内部运行状态,脚本还定义了专门的日志输出函数:

# 获取日志引擎并设置日志处理
    log_engine: LogEngine = main_engine.get_engine("log")

    def print_log(event: Event) -> None:
        """
        处理数据录制模块的日志事件

        将数据录制模块产生的日志信息输出到控制台和日志文件中,
        便于监控录制过程和排查问题。

        参数:
            event: 包含日志信息的事件对象
        """
        log_engine.logger.log(INFO, event.data)

    # 注册日志事件处理函数,当有新的日志推送时,会自动调用print_log函数
    event_engine.register(EVENT_RECORDER_LOG, print_log)

注意前面对于SETTINGS全局配置字典的修改,仅影响底层接口和核心引擎部分的日志记录,而这里的DataRecorder模块属于上层应用,需要注册额外的日志事件处理函数后才能实现输出。

接口连接登录

之后就可以连接登录CTP接口并开始录制数据:

# 连接CTP接口并登录,第一个参数是接口设置,第二个参数是接口名称
    main_engine.connect(ctp_setting, CtpGateway.default_name)

    # 等待30秒,CTP接口连接后需要一段时间来完成初始化
    sleep(30)

    # 提示用户程序已经开始运行,用户可以根据需要随时退出
    input(">>>>>> 高频行情数据录制已启动,正在记录数据。按回车键退出程序 <<<<<<")

    # 关闭主引擎实现安全退出,避免出现内存中未入库数据的丢失
    main_engine.close()

这部分代码完成了以下操作:

  1. 连接CTP接口并登录
  2. 等待30秒,让CTP接口完成初始化(包括合约查询、订阅等操作)
  3. 提示用户程序已经开始运行,并等待用户按回车键退出
  4. 当用户按下回车键后,安全关闭主引擎,确保所有数据都被正确保存

程序标准入口

最后是Python程序的标准入口:

# Python程序的标准入口写法,直接运行此脚本时会执行run_recorder函数
if __name__ == "__main__":
    run_recorder()

当直接运行该脚本时,会执行run_recorder函数开始数据录制。

脚本补充说明

通过替换脚本中加载的底层接口模块和连接登录配置,就可以很方便的实现不同市场的高频Tick数据录制。

除了TDengine外,同样也可以使用其他VeighNa支持的高性能数据库(如DolphinDB等)作为后端的数据存储服务。

 

完整脚本代码

 

在文章结尾,附上完整的tick_recorder.py脚本源代码:

"""
该程序使用VeighNa框架通过CTP接口连接到期货市场,并自动录制指定交易所和品种的行情数据。

适合初学者了解VeighNa框架的基本用法和数据录制流程。
"""

# 加载Python标准库
from logging import INFO
from time import sleep

# 加载VeighNa核心框架
from vnpy.event import EventEngine, Event
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine, LogEngine
from vnpy.trader.object import ContractData
from vnpy.trader.constant import Exchange, Product
from vnpy.trader.event import EVENT_CONTRACT

# 加载VeighNa插件模块
from vnpy_ctp import CtpGateway
from vnpy_datarecorder import DataRecorderApp, RecorderEngine
from vnpy_datarecorder.engine import EVENT_RECORDER_LOG


# 开启日志记录功能
# 日志对于排查问题和监控系统运行状态非常重要
SETTINGS["log.active"] = True       # 激活日志功能
SETTINGS["log.level"] = INFO        # 设置日志级别为INFO,输出详细信息
SETTINGS["log.console"] = True      # 在控制台显示日志,方便实时查看


# CTP接口登录信息
# 以下使用的是SimNow模拟账户信息,初学者可以在SimNow官网申请
ctp_setting: dict[str, str] = {
    "用户名": "demo",                       # SimNow账户名
    "密码": "Vnpy@123456",                    # SimNow密码
    "经纪商代码": "9999",                     # SimNow经纪商代码固定为9999
    "交易服务器": "180.168.146.187:10201",    # SimNow交易服务器地址和端口
    "行情服务器": "180.168.146.187:10211",    # SimNow行情服务器地址和端口
    "产品名称": "simnow_client_test",         # 产品名称,用于区分不同的客户端
    "授权编码": "0000000000000000"            # 授权编码,SimNow模拟账户使用默认值即可
}


# 要录制数据的交易所列表
# 可以根据需要取消注释来添加更多交易所
recording_exchanges: list[Exchange] = [
    Exchange.CFFEX,          # 中国金融期货交易所
    # Exchange.SHFE,         # 上海期货交易所
    # Exchange.DCE,          # 大连商品交易所
    # Exchange.CZCE,         # 郑州商品交易所
    # Exchange.GFEX,         # 广州期货交易所
    # Exchange.INE,          # 上海国际能源交易中心
]


# 要录制数据的品种类型
# 可以根据需要取消注释来添加更多品种
recording_products: list[Product] = [
    Product.FUTURES,        # 期货品种
    # Product.OPTION,       # 期权品种
]


def run_recorder() -> None:
    """
    运行行情录制程序

    该函数是程序的主体,按照以下步骤工作:
    1. 创建VeighNa核心组件(事件引擎、主引擎)
    2. 添加交易接口和应用模块
    3. 设置数据录制规则
    4. 连接到交易所并开始录制数据
    """
    # 创建事件引擎,负责系统内各模块间的通信
    event_engine: EventEngine = EventEngine()

    # 创建主引擎,管理系统功能模块,包括底层接口、上层应用等
    main_engine: MainEngine = MainEngine(event_engine)

    # 添加CTP接口,连接到期货市场
    main_engine.add_gateway(CtpGateway)

    # 添加数据录制引擎,用于录制Tick行情入库
    recorder_engine: RecorderEngine = main_engine.add_app(DataRecorderApp)

    # 定义合约订阅函数
    def subscribe_data(event: Event) -> None:
        """
        处理合约推送并订阅行情

        当系统接收到合约信息后,根据预设的交易所和品种过滤条件,
        自动为符合条件的合约添加行情录制任务。

        参数:
            event: 包含合约信息的事件对象
        """
        # 从事件对象中获取合约数据
        contract: ContractData = event.data

        # 判断合约是否符合录制条件
        if (
            contract.exchange in recording_exchanges    # 检查合约所属交易所是否在预设列表中
            and contract.product in recording_products  # 检查合约品种类型是否在预设列表中
        ):
            # 添加该合约的行情录制任务,vt_symbol是VeighNa中的唯一标识符,格式为"代码.交易所"
            recorder_engine.add_tick_recording(contract.vt_symbol)

    # 注册合约事件处理函数,当有新合约信息推送时,会自动调用subscribe_data函数
    event_engine.register(EVENT_CONTRACT, subscribe_data)

    # 获取日志引擎并设置日志处理
    log_engine: LogEngine = main_engine.get_engine("log")

    def print_log(event: Event) -> None:
        """
        处理数据录制模块的日志事件

        将数据录制模块产生的日志信息输出到控制台和日志文件中,
        便于监控录制过程和排查问题。

        参数:
            event: 包含日志信息的事件对象
        """
        log_engine.logger.log(INFO, event.data)

    # 注册日志事件处理函数,当有新的日志推送时,会自动调用print_log函数
    event_engine.register(EVENT_RECORDER_LOG, print_log)

    # 连接CTP接口并登录,第一个参数是接口设置,第二个参数是接口名称
    main_engine.connect(ctp_setting, CtpGateway.default_name)

    # 等待30秒,CTP接口连接后需要一段时间来完成初始化
    sleep(30)

    # 提示用户程序已经开始运行,用户可以根据需要随时退出
    input(">>>>>> 高频行情数据录制已启动,正在记录数据。按回车键退出程序 <<<<<<")

    # 关闭主引擎实现安全退出,避免出现内存中未入库数据的丢失
    main_engine.close()


# Python程序的标准入口写法,直接运行此脚本时会执行run_recorder函数
if __name__ == "__main__":
    run_recorder()

 


新消息

3个月前

统计

主题
10362
帖子
39255
已注册用户
64401
最新用户
在线用户
155
在线来宾用户
27673
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】