VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 5361
声望: 325

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2025-03-15
 
在量化交易领域,高质量的行情数据是策略研发和回测的基础。近期在VeighNa社区中,经常有用户咨询关于高频Tick数据录制的问题。

传统的SQLite等关系型数据库在面对大量合约同时录制的场景时,往往因写入性能瓶颈导致丢包,影响数据完整性。

为了解决以上难题,本文中将介绍如何利用VeighNa量化平台结合TDengine时序数据库,实现高效的行情数据录制。

 

安装配置TDengine

 

对于大多数VeighNa用户,推荐使用TDengine 3.0版本的Docker容器镜像来安装配置时序数据库的服务端程序。

首先需要确保操作系统中已经安装好了Docker,并将其设置为随操作系统启动自动运行。使用Windows系统的同学可以参考这篇文章中的Docker Desktop安装教程:https://mp.weixin.qq.com/s/m3whdCs6jRs-Ye3Ip5oZmw

随后使用命令拉取TDengine容器镜像:

docker pull tdengine/tdengine:latest

拉取完成后,通过下述命令来启动数据库服务容器:

docker run -d `
  -v C:/my_tdengine/data:/var/lib/taos `
  -v C:/my_tdengine/log:/var/log/taos `
  -p 6030:6030 `  
  -p 6041:6041 `  
  -p 6043:6043 `  
  -p 6044-6049:6044-6049 `  
  -p 6044-6045:6044-6045/udp `  
  -p 6060:6060 `  
  --restart=always `  
  tdengine/tdengine

注意,以上PowerShell多行命令的每行结尾使用了反引号(键盘Tab上方按键)作为换行符。将上述命令复制到PowerShell窗口中运行,等待几秒后容器启动成功会输出一段较长的随机字符串(容器编号)。

命令中的具体参数(以-或者--开头)说明如下:

-v:用于挂载Windows系统下的指定文件夹到容器中,为容器中运行的TDengine程序提供数据存储输出,可以根据自己的需求修改:
* /var/lib/taos,对应的是TDengine的数据存储路径
* /var/log/taos,对应的是TDengine的日志输出路径。

-p:用于将Windows系统的端口映射绑定到容器中对应的端口,为外部程序提供数据库访问,这些端口参数建议保持不变:
* 6030,主要用于应用程序(如VeighNa)连接TDengine;
* 6041,提供数据库管理终端(如DBeaver)连接TDengine;
* 其他端口提供更加进阶的功能,这里可以忽略。

--restart:用于设置容器的重启策略,always代表每次Docker Desktop启动后都立即启动TDengine容器,结合之前设置的Docker Desktop开机自动启动即可实现时序数据库的后台服务式运行。

 

数据录制脚本

 

安装配置好TDengine后,只需运行一个简单的Python脚本,就能开始高效录制行情数据。

导入必要模块

首先,需要导入相关的Python标准库和VeighNa框架组件:

# 加载Python标准库
from logging import INFO
from time import sleep

# 加载VeighNa核心框架
from vnpy.event import EventEngine, Event
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine, LogEngine
from vnpy.trader.object import ContractData
from vnpy.trader.constant import Exchange, Product
from vnpy.trader.event import EVENT_CONTRACT

# 加载VeighNa插件模块
from vnpy_ctp import CtpGateway
from vnpy_datarecorder import DataRecorderApp, RecorderEngine
from vnpy_datarecorder.engine import EVENT_RECORDER_LOG

这部分代码导入了程序运行所需的各种模块:

  • Python标准库中的日志和时间模块
  • VeighNa核心框架中的事件引擎、主引擎、日志引擎等组件
  • VeighNa的CTP接口和数据录制应用模块

配置日志设置

日志对于监控系统运行状态和排查问题至关重要:

# 开启日志记录功能
SETTINGS["log.active"] = True       # 激活日志功能
SETTINGS["log.level"] = INFO        # 设置日志级别为INFO,输出详细信息
SETTINGS["log.console"] = True      # 在控制台显示日志,方便实时查看

这段代码配置了VeighNa的日志系统,设置为INFO级别并在控制台显示,便于实时监控数据录制过程中的各种事件和可能出现的问题。

设置登录信息

接下来,配置连接到期货市场所需的CTP接口信息:

# CTP接口登录信息
# 以下使用的是SimNow模拟账户信息,初学者可以在SimNow官网申请
ctp_setting: dict[str, str] = {
    "用户名": "demo",                        # SimNow账户名
    "密码": "123456",                        # SimNow密码
    "经纪商代码": "9999",                     # SimNow经纪商代码固定为9999
    "交易服务器": "180.168.146.187:10201",    # SimNow交易服务器地址和端口
    "行情服务器": "180.168.146.187:10211",    # SimNow行情服务器地址和端口
    "产品名称": "simnow_client_test",         # 产品名称,用于区分不同的客户端
    "授权编码": "0000000000000000"            # 授权编码,SimNow模拟账户使用默认值即可
}

这里使用的是SimNow仿真账户(记得替换为你的用户名和密码),在实盘环境中需要替换为实际的期货账户信息。

定义录制范围

为了灵活控制数据录制的范围,脚本允许指定要录制的交易所和品种类型:

# 要录制数据的交易所列表
recording_exchanges: list[Exchange] = [
    Exchange.CFFEX,          # 中国金融期货交易所
    # Exchange.SHFE,         # 上海期货交易所
    # Exchange.DCE,          # 大连商品交易所
    # Exchange.CZCE,         # 郑州商品交易所
    # Exchange.GFEX,         # 广州期货交易所
    # Exchange.INE,          # 上海国际能源交易中心
]

# 要录制数据的品种类型
recording_products: list[Product] = [
    Product.FUTURES,        # 期货品种
    # Product.OPTION,       # 期权品种
]

这段代码定义了两个列表,分别指定要录制数据的交易所和品种类型。默认只录制中国金融期货交易所的期货品种,用户可以根据需要取消注释来添加更多交易所和品种。

主函数实现

脚本的核心是run_recorder函数,它负责初始化系统组件并启动数据录制:

def run_recorder() -> None:
    """
    运行行情录制程序

    该函数是程序的主体,按照以下步骤工作:
    1. 创建VeighNa核心组件(事件引擎、主引擎)
    2. 添加交易接口和应用模块
    3. 设置数据录制规则
    4. 连接到交易所并开始录制数据
    """
    # 创建事件引擎,负责系统内各模块间的通信
    event_engine: EventEngine = EventEngine()

    # 创建主引擎,管理系统功能模块,包括底层接口、上层应用等
    main_engine: MainEngine = MainEngine(event_engine)

    # 添加CTP接口,连接到期货市场
    main_engine.add_gateway(CtpGateway)

    # 添加数据录制引擎,用于录制Tick行情入库
    recorder_engine: RecorderEngine = main_engine.add_app(DataRecorderApp)

这部分代码初始化了VeighNa的核心组件:

  1. 创建事件引擎,负责系统内各模块间的事件驱动通信
  2. 创建主引擎,管理系统的各个功能模块
  3. 添加CTP接口,用于连接到期货市场
  4. 添加数据录制应用,用于将接收到的行情数据保存到数据库

合约订阅处理

接下来,定义了一个合约订阅函数,用于自动订阅符合条件的合约行情:

# 定义合约订阅函数
    def subscribe_data(event: Event) -> None:
        """
        处理合约推送并订阅行情

        当系统接收到合约信息后,根据预设的交易所和品种过滤条件,
        自动为符合条件的合约添加行情录制任务。

        参数:
            event: 包含合约信息的事件对象
        """
        # 从事件对象中获取合约数据
        contract: ContractData = event.data

        # 判断合约是否符合录制条件
        if (
            contract.exchange in recording_exchanges    # 检查合约所属交易所是否在预设列表中
            and contract.product in recording_products  # 检查合约品种类型是否在预设列表中
        ):
            # 添加该合约的行情录制任务,vt_symbol是VeighNa中的唯一标识符,格式为"代码.交易所"
            recorder_engine.add_tick_recording(contract.vt_symbol)
    # 注册合约事件处理函数,当有新合约信息推送时,会自动调用subscribe_data函数
    event_engine.register(EVENT_CONTRACT, subscribe_data)

这里基于VeighNa平台核心的事件驱动架构,实现了一套自动化的合约订阅机制:

  1. 当CTP接口接收到合约信息时,会触发EVENT_CONTRACT事件
  2. 事件引擎调用注册的subscribe_data函数处理该事件
  3. 函数检查合约是否符合预设的交易所和品种条件
  4. 对于符合条件的合约,自动添加Tick数据录制任务

该机制使得系统能够自动识别和订阅符合条件的合约,无需手动指定每个合约代码。

模块日志输出

为了方便监控DataRecorder数据录制模块的内部运行状态,脚本还定义了专门的日志输出函数:

# 获取日志引擎并设置日志处理
    log_engine: LogEngine = main_engine.get_engine("log")

    def print_log(event: Event) -> None:
        """
        处理数据录制模块的日志事件

        将数据录制模块产生的日志信息输出到控制台和日志文件中,
        便于监控录制过程和排查问题。

        参数:
            event: 包含日志信息的事件对象
        """
        log_engine.logger.log(INFO, event.data)

    # 注册日志事件处理函数,当有新的日志推送时,会自动调用print_log函数
    event_engine.register(EVENT_RECORDER_LOG, print_log)

注意前面对于SETTINGS全局配置字典的修改,仅影响底层接口和核心引擎部分的日志记录,而这里的DataRecorder模块属于上层应用,需要注册额外的日志事件处理函数后才能实现输出。

接口连接登录

之后就可以连接登录CTP接口并开始录制数据:

# 连接CTP接口并登录,第一个参数是接口设置,第二个参数是接口名称
    main_engine.connect(ctp_setting, CtpGateway.default_name)

    # 等待30秒,CTP接口连接后需要一段时间来完成初始化
    sleep(30)

    # 提示用户程序已经开始运行,用户可以根据需要随时退出
    input(">>>>>> 高频行情数据录制已启动,正在记录数据。按回车键退出程序 <<<<<<")

    # 关闭主引擎实现安全退出,避免出现内存中未入库数据的丢失
    main_engine.close()

这部分代码完成了以下操作:

  1. 连接CTP接口并登录
  2. 等待30秒,让CTP接口完成初始化(包括合约查询、订阅等操作)
  3. 提示用户程序已经开始运行,并等待用户按回车键退出
  4. 当用户按下回车键后,安全关闭主引擎,确保所有数据都被正确保存

程序标准入口

最后是Python程序的标准入口:

# Python程序的标准入口写法,直接运行此脚本时会执行run_recorder函数
if __name__ == "__main__":
    run_recorder()

当直接运行该脚本时,会执行run_recorder函数开始数据录制。

脚本补充说明

通过替换脚本中加载的底层接口模块和连接登录配置,就可以很方便的实现不同市场的高频Tick数据录制。

除了TDengine外,同样也可以使用其他VeighNa支持的高性能数据库(如DolphinDB等)作为后端的数据存储服务。

 

完整脚本代码

 

在文章结尾,附上完整的tick_recorder.py脚本源代码:

"""
该程序使用VeighNa框架通过CTP接口连接到期货市场,并自动录制指定交易所和品种的行情数据。

适合初学者了解VeighNa框架的基本用法和数据录制流程。
"""

# 加载Python标准库
from logging import INFO
from time import sleep

# 加载VeighNa核心框架
from vnpy.event import EventEngine, Event
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine, LogEngine
from vnpy.trader.object import ContractData
from vnpy.trader.constant import Exchange, Product
from vnpy.trader.event import EVENT_CONTRACT

# 加载VeighNa插件模块
from vnpy_ctp import CtpGateway
from vnpy_datarecorder import DataRecorderApp, RecorderEngine
from vnpy_datarecorder.engine import EVENT_RECORDER_LOG


# 开启日志记录功能
# 日志对于排查问题和监控系统运行状态非常重要
SETTINGS["log.active"] = True       # 激活日志功能
SETTINGS["log.level"] = INFO        # 设置日志级别为INFO,输出详细信息
SETTINGS["log.console"] = True      # 在控制台显示日志,方便实时查看


# CTP接口登录信息
# 以下使用的是SimNow模拟账户信息,初学者可以在SimNow官网申请
ctp_setting: dict[str, str] = {
    "用户名": "demo",                       # SimNow账户名
    "密码": "Vnpy@123456",                    # SimNow密码
    "经纪商代码": "9999",                     # SimNow经纪商代码固定为9999
    "交易服务器": "180.168.146.187:10201",    # SimNow交易服务器地址和端口
    "行情服务器": "180.168.146.187:10211",    # SimNow行情服务器地址和端口
    "产品名称": "simnow_client_test",         # 产品名称,用于区分不同的客户端
    "授权编码": "0000000000000000"            # 授权编码,SimNow模拟账户使用默认值即可
}


# 要录制数据的交易所列表
# 可以根据需要取消注释来添加更多交易所
recording_exchanges: list[Exchange] = [
    Exchange.CFFEX,          # 中国金融期货交易所
    # Exchange.SHFE,         # 上海期货交易所
    # Exchange.DCE,          # 大连商品交易所
    # Exchange.CZCE,         # 郑州商品交易所
    # Exchange.GFEX,         # 广州期货交易所
    # Exchange.INE,          # 上海国际能源交易中心
]


# 要录制数据的品种类型
# 可以根据需要取消注释来添加更多品种
recording_products: list[Product] = [
    Product.FUTURES,        # 期货品种
    # Product.OPTION,       # 期权品种
]


def run_recorder() -> None:
    """
    运行行情录制程序

    该函数是程序的主体,按照以下步骤工作:
    1. 创建VeighNa核心组件(事件引擎、主引擎)
    2. 添加交易接口和应用模块
    3. 设置数据录制规则
    4. 连接到交易所并开始录制数据
    """
    # 创建事件引擎,负责系统内各模块间的通信
    event_engine: EventEngine = EventEngine()

    # 创建主引擎,管理系统功能模块,包括底层接口、上层应用等
    main_engine: MainEngine = MainEngine(event_engine)

    # 添加CTP接口,连接到期货市场
    main_engine.add_gateway(CtpGateway)

    # 添加数据录制引擎,用于录制Tick行情入库
    recorder_engine: RecorderEngine = main_engine.add_app(DataRecorderApp)

    # 定义合约订阅函数
    def subscribe_data(event: Event) -> None:
        """
        处理合约推送并订阅行情

        当系统接收到合约信息后,根据预设的交易所和品种过滤条件,
        自动为符合条件的合约添加行情录制任务。

        参数:
            event: 包含合约信息的事件对象
        """
        # 从事件对象中获取合约数据
        contract: ContractData = event.data

        # 判断合约是否符合录制条件
        if (
            contract.exchange in recording_exchanges    # 检查合约所属交易所是否在预设列表中
            and contract.product in recording_products  # 检查合约品种类型是否在预设列表中
        ):
            # 添加该合约的行情录制任务,vt_symbol是VeighNa中的唯一标识符,格式为"代码.交易所"
            recorder_engine.add_tick_recording(contract.vt_symbol)

    # 注册合约事件处理函数,当有新合约信息推送时,会自动调用subscribe_data函数
    event_engine.register(EVENT_CONTRACT, subscribe_data)

    # 获取日志引擎并设置日志处理
    log_engine: LogEngine = main_engine.get_engine("log")

    def print_log(event: Event) -> None:
        """
        处理数据录制模块的日志事件

        将数据录制模块产生的日志信息输出到控制台和日志文件中,
        便于监控录制过程和排查问题。

        参数:
            event: 包含日志信息的事件对象
        """
        log_engine.logger.log(INFO, event.data)

    # 注册日志事件处理函数,当有新的日志推送时,会自动调用print_log函数
    event_engine.register(EVENT_RECORDER_LOG, print_log)

    # 连接CTP接口并登录,第一个参数是接口设置,第二个参数是接口名称
    main_engine.connect(ctp_setting, CtpGateway.default_name)

    # 等待30秒,CTP接口连接后需要一段时间来完成初始化
    sleep(30)

    # 提示用户程序已经开始运行,用户可以根据需要随时退出
    input(">>>>>> 高频行情数据录制已启动,正在记录数据。按回车键退出程序 <<<<<<")

    # 关闭主引擎实现安全退出,避免出现内存中未入库数据的丢失
    main_engine.close()


# Python程序的标准入口写法,直接运行此脚本时会执行run_recorder函数
if __name__ == "__main__":
    run_recorder()

 

Member
avatar
加入于:
帖子: 117
声望: 2

京东云,2核2g 解决不了安装好了Docker desktop 需要支持虚拟化技术的 CPU,并且虚拟化功能必须在 BIOS 中启用。

Member
avatar
加入于:
帖子: 1821
声望: 138

云服务器Windows系统装不了Docker,用Ubuntu吧

Member
avatar
加入于:
帖子: 1
声望: 0

CFrdRegister.....Init...version:frd_frame_1.02.06_20190413.17:59:18
CFrdRegister.....Init...version:frd_frame_1.02.06_20190413.17:59:18
Traceback (most recent call last):
File "D:\vn-py-master\vn-py-master\examples\veighna_trader\run.py", line 102, in <module>
main()
File "D:\vn-py-master\vn-py-master\examples\veighna_trader\run.py", line 77, in main
main_engine.add_app(CtaStrategyApp)
File "D:\vn-py-master\vn-py-master\vnpy\trader\engine.py", line 105, in add_app
engine: BaseEngine = self.add_engine(app.engine_class)
File "D:\vn-py-master\vn-py-master\vnpy\trader\engine.py", line 76, in add_engine
engine: BaseEngine = engine_class(self, self.event_engine)
File "D:\veighna_studio\Lib\site-packages\vnpy_ctastrategy\engine.py", line 97, in init
self.database: BaseDatabase = get_database()
File "D:\vn-py-master\vn-py-master\vnpy\trader\database.py", line 159, in get_database
database = module.Database()
File "D:\veighna_studio\Lib\site-packages\vnpy_taos\taos_database.py", line 37, in init
self.conn: taos.TaosConnection = taos.connect(
File "D:\veighna_studio\Lib\site-packages\taos__init.py", line 75, in connect
return TaosConnection(*args, **kwargs)
File "D:\veighna_studio\Lib\site-packages\taos\connection.py", line 27, in
init__
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
File "D:\veighna_studio\Lib\site-packages\taos\cinterface.py", line 2269, in connect
return taos_connect(host, user, password, db, port)
File "D:\veighna_studio\Lib\site-packages\taos\cinterface.py", line 249, in taos_connect
raise ConnectionError(errstr, errno)
taos.error.ConnectionError: [0x011e]: Version not compatible

链接mongodb数据库正常,但是链接TDengine报错,是不是驱动问题,求大佬帮我看看

Member
avatar
加入于:
帖子: 1821
声望: 138

TDengine的客户端API和Docker里的服务端,两者版本不一致

都升级到最新版本试试吧

LLM学员
加入于:
帖子: 29
声望: 0

3.9.4运行这个代码录制数据,使用TDengine录制不起期权的数据,用mongodb正常,vnpy_taos的锅?

Member
avatar
加入于:
帖子: 13
声望: 0

上面的代码,能够录制所有品种。由于录制的品种太多,影响计算机运行。现在需要修改为只录制持仓量大于十万的品种合约,处样修改?(另外,由于我已运行上面的代码,vn.py中的记录行情里面已订阅所有品种,怎样不再全部订阅,只订阅持仓量大于十万的合约)

Member
avatar
加入于:
帖子: 117
声望: 2

用云服务器录数据的最小配置要几核几G,我打算录股票期权的tick 数据

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】