发布于veighna社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2024-01-20
K线历史时序数据,不管对于期货CTA、股票多因子还是期权波动率等量化策略的开发来说,都是不可或缺的原材料。在大多数Quant的日常工作流程中,固定一块内容就是对每日增量数据的下载更新。
这块数据更新工作如果通过手动的方式执行各项步骤无疑会相当繁琐(而且容易误操作),所以本文中将会分享一套基于迅投研的量化数据自运维下载更新方案。
关于迅投研数据服务的详细介绍可以参考之前的这篇《VeighNa发布v3.9.0 - 迅投研数据服务!》。
前期准备
试用申请
迅投研数据服务为VeighNa社区用户提供了14天免费试用,点击以下专属链接即可申请:
https://xuntou.net/#/signup?utm_source=vnpy
目前只需要提供手机号验证即可申请,试用账号中包括了股票、指数、期货、期权、基金等数据权限。
试用申请完成后,回到首页(https://xuntou.net/)登录账号,然后点击右上角的用户中心:
在弹出页面中底部即可找到【接口TOKEN】,点击【复制】按钮即可快速复制token内容,该token将用于后续VeighNa Trader中的数据服务配置:
同时在页面左下方【行情服务】的到期日,可以看到试用权限的到期时间。
模块安装
VeighNa Studio的3.9.0版中已经包含了迅投研相关的功能模块。使用其他Python环境的用户可以运行下述命令安装:
pip install xtquant vnpy_xt --index=https://pypi.vnpy.com
账号配置
启动VeighNa Trader,点击顶部菜单栏的【配置】按钮,在弹出的对话框中修改下述字段:
- datafeed.name:xt
- datafeed.username:token
- datafeed.password:之前在用户中心复制的token内容
修改完成后点击对话框底部的【确定】按钮,重启VeighNa Trader即可生效。老用户也同样可以通过修改.vntrader目录下的vt_setting.json文件来进行配置。
数据更新
下载合约信息
迅投研数据服务的客户端xtquant在工作机制上和其他数据服务有一个较大的区别,即需要先从服务器下载所需的数据到本地缓存目录的数据文件,然后才能从数据文件中查询获取所需的数据。
合约更新任务的代码本身较为简单:
def update_history_data() -> None:
"""更新历史合约信息"""
# 在子进程中加载xtquant
from xtquant.xtdata import download_history_data
# 初始化数据服务
datafeed = get_datafeed()
datafeed.init()
# 下载历史合约信息
download_history_data("", "historycontract")
print("xtquant历史合约信息下载完成")
但对于合约信息数据,xtquant模块会在当前进程中首次初始化时读取,后续即使执行了下载更新操作,在当前进程的运行过程中也不会重载,必须等到下一次进程重启后才会更新。
因此对于历史合约信息更新函数update_history_data,需要使用multiprocessing模块在子进程中运行:
# 使用子进程更新历史合约信息
process: Process = Process(target=update_history_data)
process.start()
process.join() # 等待子进程执行完成
这样后续在主进程中获取合约信息时(等同于重启了新的进程),就是已经更新后的数据。
获取合约信息
为了实现自运维交易所数据更新,需要能够获取每日交易所新上市的合约代码以及之前已有存续的合约代码信息,这里需要用到xtquant.xtdata模块中所提供的get_stock_list_in_sector函数:
# 查询交易所历史合约代码
active_symbols: list[str] = get_stock_list_in_sector(sector_name)
expire_symbols: list[str] = get_stock_list_in_sector("过期" + sector_name)
xt_symbols: list[str] = active_symbols + expire_symbols
函数中所传入的sector_name为迅投研定义的交易市场名称信息,一些比较常用的名称包括:
- 中金所、上期所、大商所、郑商所、能源中心
- 上证A股、深圳A股
- 上证期权、深圳期权
同时对于每个市场,都包含了当前处于挂牌状态的合约,以及已经到期退市的合约(市场名称前需要加上【过期】前缀),需要分开查询后拼接成为该市场目前的整体合约代码列表。
数据增量过滤
对于本地已有K线数据的合约,每日仅需要下载更新增量的部分,而没有必要从头开始下载(浪费运行时间和数据流量)。这里借助于VeighNa数据库组件所提供的BarOverview汇总来查询当前本地已有的数据范围:
# 获取本地已有数据汇总
data: list[BarOverview] = database.get_bar_overview()
overviews: dict[str, BarOverview] = {}
for o in data:
vt_symbol: str = f"{o.symbol}.{o.exchange.value}"
overviews[vt_symbol] = o
在遍历整个市场合约代码列表的过程中,可以先查询该合约的到期时间,然后只有在满足下列条件时才执行更新:
- 本地从未下载过该合约的数据(合约刚上市或者首次运行下载)
- 合约到期时间大于当前时间(尚未到期所以还有数据更新)
条件过滤代码如下:
# 查询合约信息
data: dict = get_instrument_detail(xt_symbol, True)
# 获取合约到期时间
expiry: datetime = None
if data["ExpireDate"]:
expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")
# 拆分迅投研代码
symbol, xt_exchange = xt_symbol.split(".")
# 生成本地代码
exchange: Exchange = EXCHANGE_XT2VT[xt_exchange]
vt_symbol: str = f"{symbol}.{exchange.value}"
# 查询数据汇总
overview: BarOverview = overviews.get(vt_symbol, None)
# 如果已经到期,则跳过
if overview and expiry and expiry < now:
continue
下载数据入库
对于通过了前述过滤筛选的合约,则执行K线下载并将数据写入到数据库:
# 实现增量查询
start: datetime = START_TIME
if overview:
start = overview.end
# 执行数据查询和更新入库
req: HistoryRequest = HistoryRequest(
symbol=symbol,
exchange=exchange,
start=start,
end=now,
interval=interval
)
bars: list[BarData] = datafeed.query_bar_history(req)
if bars:
database.save_bar_data(bars)
start_dt: datetime = bars[0].datetime
end_dt: datetime = bars[-1].datetime
msg: str = f"{vt_symbol}数据更新成功,{start_dt} - {end_dt}"
print(msg)
以上针对每个市场的K线数据更新功能,整体封装在了update_bar_data函数中,用户可以根据需求传入所需的市场名称参数执行更新任务:
# 这里只更新两个期货交易所的数据 update_bar_data("中金所") update_bar_data("上期所")
完整代码
老规矩还是附上完整的程序代码:
from multiprocessing import Process
from datetime import datetime
from vnpy.trader.database import BarOverview
from vnpy.trader.datafeed import get_datafeed
from vnpy.trader.database import get_database
from vnpy.trader.object import BarData, HistoryRequest
from vnpy.trader.constant import Exchange, Interval
# 交易所映射关系
EXCHANGE_XT2VT = {
"SH": Exchange.SSE,
"SZ": Exchange.SZSE,
"BJ": Exchange.BSE,
"SF": Exchange.SHFE,
"IF": Exchange.CFFEX,
"INE": Exchange.INE,
"DF": Exchange.DCE,
"ZF": Exchange.CZCE,
"GF": Exchange.GFEX
}
# 开始查询时间
START_TIME = datetime(2018, 1, 1)
def update_history_data() -> None:
"""更新历史合约信息"""
# 在子进程中加载xtquant
from xtquant.xtdata import download_history_data
# 初始化数据服务
datafeed = get_datafeed()
datafeed.init()
# 下载历史合约信息
download_history_data("", "historycontract")
print("xtquant历史合约信息下载完成")
def update_bar_data(
sector_name: str,
interval: Interval = Interval.MINUTE
) -> None:
"""更新K线数据"""
# 在子进程中加载xtquant
from xtquant.xtdata import (
get_stock_list_in_sector,
get_instrument_detail
)
# 初始化数据服务
datafeed = get_datafeed()
datafeed.init()
# 连接数据库
database = get_database()
# 获取当前时间戳
now: datetime = datetime.now()
# 获取本地已有数据汇总
data: list[BarOverview] = database.get_bar_overview()
overviews: dict[str, BarOverview] = {}
for o in data:
vt_symbol: str = f"{o.symbol}.{o.exchange.value}"
overviews[vt_symbol] = o
# 查询交易所历史合约代码
xt_symbols: list[str] = get_stock_list_in_sector(sector_name)
# 遍历列表查询合约信息
for xt_symbol in xt_symbols:
# 查询合约信息
data: dict = get_instrument_detail(xt_symbol, True)
# 获取合约到期时间
expiry: datetime = None
if data["ExpireDate"]:
expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")
# 拆分迅投研代码
symbol, xt_exchange = xt_symbol.split(".")
# 生成本地代码
exchange: Exchange = EXCHANGE_XT2VT[xt_exchange]
vt_symbol: str = f"{symbol}.{exchange.value}"
# 查询数据汇总
overview: BarOverview = overviews.get(vt_symbol, None)
# 如果已经到期,则跳过
if overview and expiry and expiry < now:
continue
# 实现增量查询
start: datetime = START_TIME
if overview:
start = overview.end
# 执行数据查询和更新入库
req: HistoryRequest = HistoryRequest(
symbol=symbol,
exchange=exchange,
start=start,
end=now,
interval=interval
)
bars: list[BarData] = datafeed.query_bar_history(req)
if bars:
database.save_bar_data(bars)
start_dt: datetime = bars[0].datetime
end_dt: datetime = bars[-1].datetime
msg: str = f"{vt_symbol}数据更新成功,{start_dt} - {end_dt}"
print(msg)
if __name__ == "__main__":
# 使用子进程更新历史合约信息
process: Process = Process(target=update_history_data)
process.start()
process.join() # 等待子进程执行完成
# 更新历史数据
update_bar_data("上期所")
update_bar_data("过期上期所")