发布于VeighNa社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2025-03-15
在量化交易领域,高质量的行情数据是策略研发和回测的基础。近期在VeighNa社区中,经常有用户咨询关于高频Tick数据录制的问题。
传统的SQLite等关系型数据库在面对大量合约同时录制的场景时,往往因写入性能瓶颈导致丢包,影响数据完整性。
为了解决以上难题,本文中将介绍如何利用VeighNa量化平台结合TDengine时序数据库,实现高效的行情数据录制。
安装配置TDengine
对于大多数VeighNa用户,推荐使用TDengine 3.0版本的Docker容器镜像来安装配置时序数据库的服务端程序。
首先需要确保操作系统中已经安装好了Docker,并将其设置为随操作系统启动自动运行。使用Windows系统的同学可以参考这篇文章中的Docker Desktop安装教程:https://mp.weixin.qq.com/s/m3whdCs6jRs-Ye3Ip5oZmw
随后使用命令拉取TDengine容器镜像:
docker pull tdengine/tdengine:latest
拉取完成后,通过下述命令来启动数据库服务容器:
docker run -d `
-v C:/my_tdengine/data:/var/lib/taos `
-v C:/my_tdengine/log:/var/log/taos `
-p 6030:6030 `
-p 6041:6041 `
-p 6043:6043 `
-p 6044-6049:6044-6049 `
-p 6044-6045:6044-6045/udp `
-p 6060:6060 `
--restart=always `
tdengine/tdengine
注意,以上PowerShell多行命令的每行结尾使用了反引号(键盘Tab上方按键)作为换行符。将上述命令复制到PowerShell窗口中运行,等待几秒后容器启动成功会输出一段较长的随机字符串(容器编号)。
命令中的具体参数(以-或者--开头)说明如下:
-v:用于挂载Windows系统下的指定文件夹到容器中,为容器中运行的TDengine程序提供数据存储输出,可以根据自己的需求修改:
* /var/lib/taos,对应的是TDengine的数据存储路径
* /var/log/taos,对应的是TDengine的日志输出路径。
-p:用于将Windows系统的端口映射绑定到容器中对应的端口,为外部程序提供数据库访问,这些端口参数建议保持不变:
* 6030,主要用于应用程序(如VeighNa)连接TDengine;
* 6041,提供数据库管理终端(如DBeaver)连接TDengine;
* 其他端口提供更加进阶的功能,这里可以忽略。
--restart:用于设置容器的重启策略,always代表每次Docker Desktop启动后都立即启动TDengine容器,结合之前设置的Docker Desktop开机自动启动即可实现时序数据库的后台服务式运行。
数据录制脚本
安装配置好TDengine后,只需运行一个简单的Python脚本,就能开始高效录制行情数据。
导入必要模块
首先,需要导入相关的Python标准库和VeighNa框架组件:
# 加载Python标准库
from logging import INFO
from time import sleep
# 加载VeighNa核心框架
from vnpy.event import EventEngine, Event
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine, LogEngine
from vnpy.trader.object import ContractData
from vnpy.trader.constant import Exchange, Product
from vnpy.trader.event import EVENT_CONTRACT
# 加载VeighNa插件模块
from vnpy_ctp import CtpGateway
from vnpy_datarecorder import DataRecorderApp, RecorderEngine
from vnpy_datarecorder.engine import EVENT_RECORDER_LOG
这部分代码导入了程序运行所需的各种模块:
- Python标准库中的日志和时间模块
- VeighNa核心框架中的事件引擎、主引擎、日志引擎等组件
- VeighNa的CTP接口和数据录制应用模块
配置日志设置
日志对于监控系统运行状态和排查问题至关重要:
# 开启日志记录功能
SETTINGS["log.active"] = True # 激活日志功能
SETTINGS["log.level"] = INFO # 设置日志级别为INFO,输出详细信息
SETTINGS["log.console"] = True # 在控制台显示日志,方便实时查看
这段代码配置了VeighNa的日志系统,设置为INFO级别并在控制台显示,便于实时监控数据录制过程中的各种事件和可能出现的问题。
设置登录信息
接下来,配置连接到期货市场所需的CTP接口信息:
# CTP接口登录信息
# 以下使用的是SimNow模拟账户信息,初学者可以在SimNow官网申请
ctp_setting: dict[str, str] = {
"用户名": "demo", # SimNow账户名
"密码": "123456", # SimNow密码
"经纪商代码": "9999", # SimNow经纪商代码固定为9999
"交易服务器": "180.168.146.187:10201", # SimNow交易服务器地址和端口
"行情服务器": "180.168.146.187:10211", # SimNow行情服务器地址和端口
"产品名称": "simnow_client_test", # 产品名称,用于区分不同的客户端
"授权编码": "0000000000000000" # 授权编码,SimNow模拟账户使用默认值即可
}
这里使用的是SimNow仿真账户(记得替换为你的用户名和密码),在实盘环境中需要替换为实际的期货账户信息。
定义录制范围
为了灵活控制数据录制的范围,脚本允许指定要录制的交易所和品种类型:
# 要录制数据的交易所列表
recording_exchanges: list[Exchange] = [
Exchange.CFFEX, # 中国金融期货交易所
# Exchange.SHFE, # 上海期货交易所
# Exchange.DCE, # 大连商品交易所
# Exchange.CZCE, # 郑州商品交易所
# Exchange.GFEX, # 广州期货交易所
# Exchange.INE, # 上海国际能源交易中心
]
# 要录制数据的品种类型
recording_products: list[Product] = [
Product.FUTURES, # 期货品种
# Product.OPTION, # 期权品种
]
这段代码定义了两个列表,分别指定要录制数据的交易所和品种类型。默认只录制中国金融期货交易所的期货品种,用户可以根据需要取消注释来添加更多交易所和品种。
主函数实现
脚本的核心是run_recorder
函数,它负责初始化系统组件并启动数据录制:
def run_recorder() -> None:
"""
运行行情录制程序
该函数是程序的主体,按照以下步骤工作:
1. 创建VeighNa核心组件(事件引擎、主引擎)
2. 添加交易接口和应用模块
3. 设置数据录制规则
4. 连接到交易所并开始录制数据
"""
# 创建事件引擎,负责系统内各模块间的通信
event_engine: EventEngine = EventEngine()
# 创建主引擎,管理系统功能模块,包括底层接口、上层应用等
main_engine: MainEngine = MainEngine(event_engine)
# 添加CTP接口,连接到期货市场
main_engine.add_gateway(CtpGateway)
# 添加数据录制引擎,用于录制Tick行情入库
recorder_engine: RecorderEngine = main_engine.add_app(DataRecorderApp)
这部分代码初始化了VeighNa的核心组件:
- 创建事件引擎,负责系统内各模块间的事件驱动通信
- 创建主引擎,管理系统的各个功能模块
- 添加CTP接口,用于连接到期货市场
- 添加数据录制应用,用于将接收到的行情数据保存到数据库
合约订阅处理
接下来,定义了一个合约订阅函数,用于自动订阅符合条件的合约行情:
# 定义合约订阅函数
def subscribe_data(event: Event) -> None:
"""
处理合约推送并订阅行情
当系统接收到合约信息后,根据预设的交易所和品种过滤条件,
自动为符合条件的合约添加行情录制任务。
参数:
event: 包含合约信息的事件对象
"""
# 从事件对象中获取合约数据
contract: ContractData = event.data
# 判断合约是否符合录制条件
if (
contract.exchange in recording_exchanges # 检查合约所属交易所是否在预设列表中
and contract.product in recording_products # 检查合约品种类型是否在预设列表中
):
# 添加该合约的行情录制任务,vt_symbol是VeighNa中的唯一标识符,格式为"代码.交易所"
recorder_engine.add_tick_recording(contract.vt_symbol)
# 注册合约事件处理函数,当有新合约信息推送时,会自动调用subscribe_data函数
event_engine.register(EVENT_CONTRACT, subscribe_data)
这里基于VeighNa平台核心的事件驱动架构,实现了一套自动化的合约订阅机制:
- 当CTP接口接收到合约信息时,会触发
EVENT_CONTRACT
事件 - 事件引擎调用注册的
subscribe_data
函数处理该事件 - 函数检查合约是否符合预设的交易所和品种条件
- 对于符合条件的合约,自动添加Tick数据录制任务
该机制使得系统能够自动识别和订阅符合条件的合约,无需手动指定每个合约代码。
模块日志输出
为了方便监控DataRecorder数据录制模块的内部运行状态,脚本还定义了专门的日志输出函数:
# 获取日志引擎并设置日志处理
log_engine: LogEngine = main_engine.get_engine("log")
def print_log(event: Event) -> None:
"""
处理数据录制模块的日志事件
将数据录制模块产生的日志信息输出到控制台和日志文件中,
便于监控录制过程和排查问题。
参数:
event: 包含日志信息的事件对象
"""
log_engine.logger.log(INFO, event.data)
# 注册日志事件处理函数,当有新的日志推送时,会自动调用print_log函数
event_engine.register(EVENT_RECORDER_LOG, print_log)
注意前面对于SETTINGS全局配置字典的修改,仅影响底层接口和核心引擎部分的日志记录,而这里的DataRecorder模块属于上层应用,需要注册额外的日志事件处理函数后才能实现输出。
接口连接登录
之后就可以连接登录CTP接口并开始录制数据:
# 连接CTP接口并登录,第一个参数是接口设置,第二个参数是接口名称
main_engine.connect(ctp_setting, CtpGateway.default_name)
# 等待30秒,CTP接口连接后需要一段时间来完成初始化
sleep(30)
# 提示用户程序已经开始运行,用户可以根据需要随时退出
input(">>>>>> 高频行情数据录制已启动,正在记录数据。按回车键退出程序 <<<<<<")
# 关闭主引擎实现安全退出,避免出现内存中未入库数据的丢失
main_engine.close()
这部分代码完成了以下操作:
- 连接CTP接口并登录
- 等待30秒,让CTP接口完成初始化(包括合约查询、订阅等操作)
- 提示用户程序已经开始运行,并等待用户按回车键退出
- 当用户按下回车键后,安全关闭主引擎,确保所有数据都被正确保存
程序标准入口
最后是Python程序的标准入口:
# Python程序的标准入口写法,直接运行此脚本时会执行run_recorder函数
if __name__ == "__main__":
run_recorder()
当直接运行该脚本时,会执行run_recorder
函数开始数据录制。
脚本补充说明
通过替换脚本中加载的底层接口模块和连接登录配置,就可以很方便的实现不同市场的高频Tick数据录制。
除了TDengine外,同样也可以使用其他VeighNa支持的高性能数据库(如DolphinDB等)作为后端的数据存储服务。
完整脚本代码
在文章结尾,附上完整的tick_recorder.py脚本源代码:
"""
该程序使用VeighNa框架通过CTP接口连接到期货市场,并自动录制指定交易所和品种的行情数据。
适合初学者了解VeighNa框架的基本用法和数据录制流程。
"""
# 加载Python标准库
from logging import INFO
from time import sleep
# 加载VeighNa核心框架
from vnpy.event import EventEngine, Event
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine, LogEngine
from vnpy.trader.object import ContractData
from vnpy.trader.constant import Exchange, Product
from vnpy.trader.event import EVENT_CONTRACT
# 加载VeighNa插件模块
from vnpy_ctp import CtpGateway
from vnpy_datarecorder import DataRecorderApp, RecorderEngine
from vnpy_datarecorder.engine import EVENT_RECORDER_LOG
# 开启日志记录功能
# 日志对于排查问题和监控系统运行状态非常重要
SETTINGS["log.active"] = True # 激活日志功能
SETTINGS["log.level"] = INFO # 设置日志级别为INFO,输出详细信息
SETTINGS["log.console"] = True # 在控制台显示日志,方便实时查看
# CTP接口登录信息
# 以下使用的是SimNow模拟账户信息,初学者可以在SimNow官网申请
ctp_setting: dict[str, str] = {
"用户名": "demo", # SimNow账户名
"密码": "Vnpy@123456", # SimNow密码
"经纪商代码": "9999", # SimNow经纪商代码固定为9999
"交易服务器": "180.168.146.187:10201", # SimNow交易服务器地址和端口
"行情服务器": "180.168.146.187:10211", # SimNow行情服务器地址和端口
"产品名称": "simnow_client_test", # 产品名称,用于区分不同的客户端
"授权编码": "0000000000000000" # 授权编码,SimNow模拟账户使用默认值即可
}
# 要录制数据的交易所列表
# 可以根据需要取消注释来添加更多交易所
recording_exchanges: list[Exchange] = [
Exchange.CFFEX, # 中国金融期货交易所
# Exchange.SHFE, # 上海期货交易所
# Exchange.DCE, # 大连商品交易所
# Exchange.CZCE, # 郑州商品交易所
# Exchange.GFEX, # 广州期货交易所
# Exchange.INE, # 上海国际能源交易中心
]
# 要录制数据的品种类型
# 可以根据需要取消注释来添加更多品种
recording_products: list[Product] = [
Product.FUTURES, # 期货品种
# Product.OPTION, # 期权品种
]
def run_recorder() -> None:
"""
运行行情录制程序
该函数是程序的主体,按照以下步骤工作:
1. 创建VeighNa核心组件(事件引擎、主引擎)
2. 添加交易接口和应用模块
3. 设置数据录制规则
4. 连接到交易所并开始录制数据
"""
# 创建事件引擎,负责系统内各模块间的通信
event_engine: EventEngine = EventEngine()
# 创建主引擎,管理系统功能模块,包括底层接口、上层应用等
main_engine: MainEngine = MainEngine(event_engine)
# 添加CTP接口,连接到期货市场
main_engine.add_gateway(CtpGateway)
# 添加数据录制引擎,用于录制Tick行情入库
recorder_engine: RecorderEngine = main_engine.add_app(DataRecorderApp)
# 定义合约订阅函数
def subscribe_data(event: Event) -> None:
"""
处理合约推送并订阅行情
当系统接收到合约信息后,根据预设的交易所和品种过滤条件,
自动为符合条件的合约添加行情录制任务。
参数:
event: 包含合约信息的事件对象
"""
# 从事件对象中获取合约数据
contract: ContractData = event.data
# 判断合约是否符合录制条件
if (
contract.exchange in recording_exchanges # 检查合约所属交易所是否在预设列表中
and contract.product in recording_products # 检查合约品种类型是否在预设列表中
):
# 添加该合约的行情录制任务,vt_symbol是VeighNa中的唯一标识符,格式为"代码.交易所"
recorder_engine.add_tick_recording(contract.vt_symbol)
# 注册合约事件处理函数,当有新合约信息推送时,会自动调用subscribe_data函数
event_engine.register(EVENT_CONTRACT, subscribe_data)
# 获取日志引擎并设置日志处理
log_engine: LogEngine = main_engine.get_engine("log")
def print_log(event: Event) -> None:
"""
处理数据录制模块的日志事件
将数据录制模块产生的日志信息输出到控制台和日志文件中,
便于监控录制过程和排查问题。
参数:
event: 包含日志信息的事件对象
"""
log_engine.logger.log(INFO, event.data)
# 注册日志事件处理函数,当有新的日志推送时,会自动调用print_log函数
event_engine.register(EVENT_RECORDER_LOG, print_log)
# 连接CTP接口并登录,第一个参数是接口设置,第二个参数是接口名称
main_engine.connect(ctp_setting, CtpGateway.default_name)
# 等待30秒,CTP接口连接后需要一段时间来完成初始化
sleep(30)
# 提示用户程序已经开始运行,用户可以根据需要随时退出
input(">>>>>> 高频行情数据录制已启动,正在记录数据。按回车键退出程序 <<<<<<")
# 关闭主引擎实现安全退出,避免出现内存中未入库数据的丢失
main_engine.close()
# Python程序的标准入口写法,直接运行此脚本时会执行run_recorder函数
if __name__ == "__main__":
run_recorder()