入门选手,参考了很多论坛和知乎的官方教程以及解决方案,在MacOS M1 Anaconda虚拟环境下安装成功 VeighNa 3.7.0 。具体功能还未探索,仅限于打开UI界面...
主要参考资料:https://zhuanlan.zhihu.com/p/608546190 和 https://www.vnpy.com/forum/topic/31679-macos-yin-te-er-xin-pian-lao-ban-an-zhuang-veighna-3-6-0-ban-ben-shi-jian-bi-ji
创建新环境,选择python3.10
下载vnpy3.7.0 (download zip并解压):https://github.com/vnpy/vnpy
terminal打开Anaconda vnpy 环境:
conda activate vnpy
安装TA-Lib和NumPy:
brew install ta-lib
pip install ta-lib==0.4.24
pip install numpy
安装vnpy和相关配置:
pip install vnpy
pip install vnpy_ctastrategy vnpy_ctabacktester vnpy_datamanager vnpy_sqlite vnpy_rqdata
相关配置可以根据需求调整上述只为例子包括CTA策略实盘和回测模块,历史数据管理模块,SQLite数据库驱动,RQData数据服务适配器
pip install vnpy_ctp
安装过程中如果出现报错某些依赖库的缺失,可以尝试pip install该依赖库打开run.py所在目录/路径:
cd 具体路径, 如cd /Users/name/Downloads/vnpy-master/vnpy
运行run.py:
python3 run.py
pip install PySide6==6.3.0
上述方法均是本人参考后使用的解决方案,以及中间不小心关了terminal,凭回忆记录了一部分过程,不保证百分百正确,具体情况也可能不一样,仅供参考。
发布于vn.py社区公众号【vnpy-community】
原文作者: 丛子龙 | 发布时间:2023-09-19
2023年VeighNa小班特训营【套利价差交易】即将在10月中旬开班!对比趋势跟踪类的CTA策略,均值回归类的价差策略由于其高胜率的特征,能够实现相对更加平稳的盈利绩效,适合用于量化投资组合在市场横盘震荡期的配置优化。目前半数名额已经被报名锁定,感兴趣的同学请抓紧!内容大纲戳这里。
本文将会在上一篇文章中提到的策略基础之上,扩大可投资的标的数量,在更多种类的大类资产中寻找超额收益。
我们从上市时间在2016年以前的ETF中选择了如下标的:
医药50ETF(512120.SSE)
金融ETF(510230.SSE)
TMTETF(512220.SSE)
信息科技ETF(512330.SSE)
证券保险ETF(512070.SSE)
可选消费ETF(159936.SZSE)
必选消费ETF(512600.SSE)
能源ETF(159930.SZSE)
材料ETF(159944.SZSE)
大宗商品ETF(510170.SSE)
黄金ETF(518880.SSE)
相信大家对A股市场中的行业轮动现象都不陌生。行业轮动是利用市场趋势获利的一种主动交易策略,其本质是利用不同投资品种强势时间的错位对行业持仓进行切换,以达到投资收益优化的目的。
通俗点讲,就是根据不同行业的区间表现差异性进行轮动配置,力求能够抓住区间内表现较好的行业、剔除表现不佳的行业,在判断市场不佳的时候,降低权益类仓位,提升另类资产的比例。
在本策略中,专门选择了能代表具体行业的ETF作为资产配置标的,上述列表中除了行业ETF之外还包括了大宗商品和黄金ETF。大宗商品具有较好的抗通胀属性,而黄金则是具有避险资产的属性。同时它们自身也与权益类资产一样,拥有较强的趋势性。
选定了资产池以后,下一步就是将数据加载到回测引擎中进行回测,策略层面将会沿用上篇文章中的EtfRotationStrategy。
这次在回测任务层面会与常用的单次回测稍有不同,原因是我们并不知道怎样配置资产组合才能获得更好的效果,所以在每次回测中需要尝试调整:
为了尽可能寻找到优质的资产组合,本次投研将使用“嵌套循环+批量回测”的方法,具体实现包括以下三步:
第一步:确定组合搜索空间
第二步:遍历组合列表回测
第三步:对回测结果排序分析
首先要计算的是:给定一定数量的合约,一共有多少种组合方法。在这里会用到Python的内置库【itertools】:
# 大类资产ETF范围
etf_list = [
"510170.SSE",
"159944.SZSE",
"512220.SSE",
"512120.SSE",
"159936.SZSE",
"159930.SZSE",
"512330.SSE",
"510230.SSE",
"512600.SSE",
"512070.SSE",
"518880.SSE",
]
# 筛选投资池,下限4只
all_combinations = []
for r in range(4, len(etf_list)):
all_combinations.extend(combinations(etf_list, r))
# 显示总投资池数量
len(all_combinations)
运行上述代码,即可得到一个缓存着所有可能组合的列表。
这么多组合,用手来一个一个敲入系统进行回测肯定是不现实的。随着标的池扩大,组合总数会以指数级增长。此时需要编写一段脚本让程序实现批量回测并缓存目标数据:
def run_backtesting(vt_symbols: list[str], holding_size: int) -> float:
"""运行回测并返回夏普比率"""
engine = BacktestingEngine()
engine.output = lambda a: a
engine.set_parameters(
vt_symbols=vt_symbols,
interval=Interval.DAILY,
start=datetime(2016, 1, 1),
end=datetime.now(),
rates={key: 0.0001 for key in vt_symbols},
slippages={key: 0.001 for key in vt_symbols},
sizes={key: 1 for key in vt_symbols},
priceticks={key: 0.001 for key in vt_symbols},
capital=1_000_000,
)
setting = {"holding_size": holding_size}
engine.add_strategy(EtfRotationStrategy, setting)
engine.load_data()
engine.run_backtesting()
engine.calculate_result()
statistics = engine.calculate_statistics(output=False)
return statistics["sharpe_ratio"]
# 遍历执行回测
results = {}
for combo in all_combinations:
vt_symbols = list(combo)
for i in range(1, math.floor(len(vt_symbols) / 2) + 1):
sharpe_ratio = run_backtesting(vt_symbols, i)
results[combo, i] = sharpe_ratio
运行完成后就会得到一个名为【results】的字典,字典当中包含每一个组合的夏普比率,下一步对results进行排序:
# 基于Sharpe Ratio排序
sorted_results = sorted(results, key=results.get, reverse=True)
# 查看排名前100的组合
print(sorted_results[:100])
首先需要查看在排名靠前的组合中哪些ETF出现的次数较多:
# 计算参数出现频次
etf_counts = defaultdict(int)
size_counts = defaultdict(int)
for tp in sorted_results[:100]:
vt_symbols, holding_size = tp
for vt_symbol in vt_symbols:
etf_counts[vt_symbol] +=1
size_counts[holding_size] += 1
# 绘制ETF代码的出现频率
plt.figure(figsize=(12, 6))
plt.bar(etf_counts.keys(), etf_counts.values()) ))
# 绘制持仓数量出现频率
plt.bar(size_counts.keys(), size_counts.values())
运行过后得到下图(为了阅读清晰,这里将ETF代码转换成了名称):
排名靠前的投资组合中,出现频率较高的有必选消费ETF,信息科技ETF,金融ETF和黄金ETF。这说明在回测时段中,能为策略提供较好收益的资产是它们。
同时不同资产出现频率的差别并没有非常显著,说明每一个资产都可以在不同的时间提供一定的超额收益。
另一方面,排名靠前的ETF持仓数量为2,其次是3和1:
这显示并非同时持仓合约的数量越大效果就越好,反而将合约数量控制在一定水平以内可能获得更好的整体绩效。
接下来使用排名靠前的ETF组合池作为回测合约,并用优化引擎对【regression_window】参数进行穷举优化,得到的结果如下:
从资金曲线上看,虽然经过了前文的种种优化,但本次选择的ETF组合池整体回测绩效不如之前文章中的结果。
既然是轮动策略,那么理所当然会希望分析在历史回测过程中策略持仓成分的变化。这时可以遍历回测引擎的逐笔成交记录字典(engine.trades)来一笔笔检视每日持仓的变化,但无疑很麻烦,而且不够直观。
更方便的方法,是通过回测引擎提供的【get_all_daily_results】获取策略的逐日盯市统计数据来实现可视化分析:
import pandas as pd
import plotly.graph_objects as go
# 这里的实现需要运行完回测程序,并得到了一个engine对象
# 获取每日持仓数据
results = engine.get_all_daily_results()
# 创建DataFrame
df = pd.DataFrame(
[pos.end_poses for pos in results], index=[pos.date for pos in results]
)
# 这里对数据进行简单处理,我们假设所有持仓都是同等大小
df = df.clip(0, 100)
# 还记得持仓种类数额吗?这里将N设置为策略里的security_size
df[df.T.sum() > 100] *= 1 / 2
# 绘制图表
fig = go.Figure()
for col in df.columns:
fig.add_trace(
go.Scatter(x=df.index, y=df[col], mode="none", fill="tozeroy", name=col)
)
fig.show()
输出的历史持仓分布图表如下:
需要注意的是,上述绘图方法只适用于策略在每只ETF上持仓市值相同的情况。有兴趣的同学也可以自行调整大类资产ETF的范围,结合之前两篇文章介绍的优化方法进一步的研究。
完整策略代码和回测数据文件,可以通过【VeighNa进阶用户交流群】获取:
免责声明
文章中的信息或观点仅供参考,作者不对其准确性或完整性做出任何保证。读者应以其独立判断做出投资决策,作者不对因使用本报告的内容而引致的损失承担任何责任。
发布于vn.py社区公众号【vnpy-community】
原文作者:何若楠 | 发布时间:2023-08-02
社区有不少新接触VeighNa的同学咨询如何上手学习,这里推荐下官方团队推出的小鹅通线上课程:《零基础入门系列》覆盖【Python基础】、【数据分析】、【GUI开发】三阶段,适合有金融背景的编程小白快速建立自己的Python开发知识体系;《全实战进阶系列》则针对具体的量化策略应用,适合有开发背景的金融小白通过实践来迅速掌握量化投研能力,包括【CTA实战】、【超越海龟】、【期权入门】和【投组策略】。
PyCharm是由JetBrains公司推出针对Python语言的IDE,其内置一整套可以帮助用户在使用Python语言开发时提高其效率的工具。本文意在为用户提供通过PyCharm开发使用VeighNa的方案以供参考。
本中的内容基于Windows系统编写,但对于Linux和Mac系统大部分也都适用。
当前时点,VeighNa适用的Windows系统包括:
Windows Server 2019/2022
其他版本的Windows系统安装时可能遇到各种依赖库问题,不推荐使用。
在Windows系统上使用VeighNa,推荐安装官方推出的【VeighNa Studio】Python发行版,尤其是初次接触Python开发的新手用户。
首先从PyCharm官网下载PyCharm Community安装包:
下载完成后,双击安装包则可进入PyCharm安装向导:
如果想对安装选项进行设置,可以在PyCharm Community Edition Setup页面对相关选项进行勾选:
安装完成后,会跳转到安装成功页面:
如果前面勾选了Create Desktop Shortcut选项来创建桌面快捷方式的话,此时桌面上会出现PyCharm的图标,双击图标即可运行PyCharm。
启动PyCharm之后,在弹出的欢迎界面中点击【New Project】创建新项目,如下图所示:
在弹出的新项目窗口中,首先需要选择存放项目的文件夹路径【Location】,然后勾选Python解释器选项中的【Previously configured interpreter】选项(即当前系统中已经安装的Python环境):
点击右侧Add Interpreter下拉框中的【Add Local Interpreter】,在弹出的对话框中点击左侧的【System Interpreter】标签,并在右侧出现的下拉框中选择VeighNa Studio自带Python解释器所在的路径:
点击底部的【OK】按钮保存解释器配置,回到新项目窗口中,点击右下方的【Create】按钮来完成新项目的创建:
创建成功的项目窗口如下图所示:
此时点击左上方的【External Libraries】,即可看到项目中可以调用的外部库:
点击site_packages文件夹,往下滚动就能找到VeighNa Studio中的vnpy核心框架包以及vnpy_前缀的插件模块包。此时可以通过点击对应图标来查看每个包中的文件源码,如下图所示:
把鼠标光标移到代码上方,会自动弹出对应代码的文档信息:
若按住Ctrl键的同时用鼠标左键点击代码,则会跳转到代码的声明部分:
点击窗口右下角的【Python 3.10】按钮,会弹出【Settings】项目配置窗口,可以看到当前解释器环境下安装的包名称和版本号信息。带有升级符号(向上箭头)的包,说明当前版本有更新版本,点击升级符号即可自动升级。
请注意:由于VeighNa对于部分依赖库有严格的版本要求,不建议用户手动升级安装的包,可能会出现版本冲突。
从Github代码仓库下载VeighNa Trader启动脚本文件run.py,并将其放置于trader文件夹下,即可在窗口左侧的项目导航栏中看见run.py文件:
若部分代码下方可以看见绿色波浪线显示(变量名称英文词语检查),可以点击项目名称左方的主菜单按钮 -【File】-【Settings】-【Editor】-【Inspections】-【Proofreading】,取消【Typo】的勾选后点击【OK】确认。再回到主窗口,可以发现绿色波浪线已经消失:
点击鼠标右键,选择【Run 'run'】,即可开始运行run.py脚本:
此时在界面底部的终端内容输出区域中,可以看到程序运行时的打印信息:
与此同时,VeighNa Trader的主窗口也会自动弹出显示:
回到PyCharm,可以看到项目界面右上角已经有run脚本的运行记录了,后续直接点击三角形运行按钮也可运行脚本,如下图所示:
PyCharm的断点调试功能十分强大,这里使用一个VeighNa的策略历史回测脚本来演示。
在左侧项目导航栏中点击鼠标右键,选择【New】-【File】, 在弹出的对话框中创建backtest.py:
然后在文件中编写简单的策略回测代码(具体可参考Github仓库中的回测示例),在想要调试的地方打上断点,如下图所示:
点击鼠标右键选择【Debug 'backtest'】, 即可开始调试脚本:
此时项目界面右上角已经可以看到backtest.py的运行记录,后续也可以通过点击这里的按钮直接启动调试任务:
启动调试后,可以看到主界面底部的Debug窗口开始输出程序运行信息,并且程序会暂停运行在第一个断点处。左侧显示的是线程信息,右侧则是当前上下文中的变量信息:
点击类似播放键的【Resume Program】即可继续运行调试,直到下一个断点处再次暂停:
此时可以看到底部右侧监控窗口中,当前上下文中的变量发生了变化:
后续重复上述步骤,点击【Resume Program】直到调试结束,可以看到Debug窗口的相应输出:
调试完之后,点击【Rerun 'backtest'】即可重新调试:
在调试过程中,点击【Step Into】可以进入函数的内部查看运行时的细节状态:
点击【Step Out】则可跳出当前函数,查看外层调用栈的状态:
点击【Step Over】可越过子函数(子函数会执行):
点击【Stop 'backtest'】则会直接停止当前程序的运行:
在PyCharm中新建项目时,默认是在当前目录下运行程序。若需要指定程序运行的目录,可以点击项目界面右上角的【Edit】进入【Run/Debug Configurations】界面:
修改程序启动时的目录【Working directory】即可:
通常情况下,PyCharm只能在Python解释器中启动的线程里进行代码断点调试。之前有部分用户反馈过尝试在C++回调函数(如CTP API接口、PySide图形库等)中打断点但无法起效的问题。针对这种情况,可以通过在代码中设置断点的方式,来实现对非Python线程(即C++线程)的断点调试。
在项目左侧导航栏中点击鼠标右键,选择【New】-【File】, 创建geteway_test.py。
在创建成功的geteway_test.py中添加一段脚本策略的代码(可参考该文件),然后按住Ctrl同时用鼠标左键点击代码中的CtpGateway,跳转至ctp_gateway.py的源码中,在想要调试的回调函数内打上断点(注意不要打在函数定义的def那一行),如下图所示:
回到gateway_test.py,点击鼠标右键选择【Debug 'gateway_test'】开始调试:
请注意,如果用load_json函数读取connect_ctp.json,请确保读取对应.vntrader文件夹的json文件中配置了CTP账户登录信息。
此时可观察到并没有进入之前设定的断点,如下图所示:
终止调试后,找到之前在ctp_gateway.py中设定的断点处,在回调函数内的断点之前添加以下代码:
import pydevd
pydevd.settrace(suspend=False, trace_only_current_thread=True)
请注意:
然后再次运行调试gateway_test.py脚本:
此时可以看到底部的调试窗口中开始输出相关信息,同时程序暂停在了之前设置的断点处。左侧显示的是线程信息(可以看到多了一个Dummy线程显示),右侧显示的是变量信息(可以看到回调函数的入参):
1.Lib\site-packages\vnpy路径下新建user_tools文件夹,放入pricetick.py
pricetick.py内容如下:
import json
import os
class PRICETICK(object):
PATH = os.environ["USERPROFILE"] + "\\.vntrader\\unit.json"
PRICE_TICK = 0.0 #最小跳动点 float
UNIT = 0 #合约乘数 int
def get_unit(self,symbol):
with open(self.PATH,"r",encoding="utf_8") as f:
result = json.load(f)
for key_symbol in result:
if key_symbol == symbol:
self.PRICE_TICK = float(result[symbol][0]) #json读取最小跳动点
self.UNIT = int(result[symbol][1]) #json读取合约乘数
break
return self.PRICE_TICK,self.UNIT
2. .vntrader\unit.json 在.\vntrader文件夹下,新建unit.json, json文件存储所有标的最小跳动点,合约乘数
unit.json内容如下:
{
"IF":["0.2","300"],
"IC":["0.2","200"],
"IH":["0.2","300"],
"IM":["0.2","200"],
"T":["0.005","10000"],
"TF":["0.005","10000"],
"TS":["0.005","20000"],
"TL":["0.01","10000"],
"AU":["0.02","1000"],
"AG":["1","15"],
"CU":["10","5"],
"AL":["5","5"],
"ZN":["5","5"],
"PB":["5","5"],
"NI":["10","1"],
"SN":["10","1"],
"RB":["1","10"],
"WR":["1","10"],
"I":["0.5","100"],
"HC":["1","10"],
"SS":["5","5"],
"SF":["2","5"],
"SM":["2","5"],
"JM":["0.5","60"],
"J":["0.5","100"],
"ZC":["0.2","100"],
"FG":["1","20"],
"SP":["2","10"],
"FU":["1","10"],
"LU":["1","10"],
"SC":["0.1","1000"],
"BC":["10","5"],
"EC":["0.1","50"],
"BU":["1","10"],
"PG":["1","20"],
"RU":["5","10"],
"NR":["5","10"],
"L":["1","5"],
"TA":["2","5"],
"V":["1","5"],
"EG":["1","10"],
"MA":["1","10"],
"PP":["1","5"],
"EB":["1","5"],
"UR":["1","20"],
"SA":["1","20"],
"C":["1","10"],
"A":["1","10"],
"CS":["1","10"],
"B":["1","10"],
"M":["1","10"],
"Y":["2","10"],
"RM":["1","10"],
"OI":["1","10"],
"P":["2","10"],
"CF":["5","5"],
"SR":["1","10"],
"JD":["1","10"],
"AP":["1","10"],
"CJ":["5","5"],
"PF":["2","5"],
"PK":["2","5"],
"AO":["1","20"],
"LC":["50","1"],
"SI":["5","5"],
"BR":["5","5"],
"RR":["1","10"],
"LH":["5","16"],
"PX":["2","5"],
"SH":["1","30"]
}
3.在自己的策略中调用
from vnpy.user_tools.pricetick import PRICETICK
初始化时候def __init__():
实例化 self.contract = PRICETICK()
获取标的 self.symbol = "".join(re.findall(r"\D+",self.get_data()["vt_symbol"].split(".")[0])).upper()
提取最小变动价格,合约乘数 self.PRICE_TICK,self.UNIT = self.contract.get_unit(self.symbol) #从json提取最小变动价位,合约乘数
发布于veighna社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2023-09-15
2023年VeighNa小班特训营【套利价差交易】即将在10月中旬开班!对比趋势跟踪类的CTA策略,均值回归类的价差策略由于其高胜率的特征,能够实现相对更加平稳的盈利绩效,适合用于量化投资组合在市场横盘震荡期的配置优化。目前半数名额已经被报名锁定,感兴趣的同学请抓紧!内容大纲戳这里。
上周发布了VeighNa的3.8.0版本,本次更新的主要内容是IB接口的整体功能强化,升级到10.19.1新版本API的同时,也进一步完善了期权相关的交易功能,包括期权链合约数据的查询获取,以及订阅获取IB实时推送的隐含波动率和希腊值风险数据。
对于已经安装了VeighNa Studio的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载VeighNa Studio-3.8.0,体验一键安装的量化交易Python发行版,下载链接:
https://download.vnpy.com/veighna_studio-3.8.0.exe
使用Ubuntu或者Mac系统的用户,推荐使用VeighNa Docker量化交易容器解决方案:
https://hub.docker.com/repository/docker/veighna/veighna
VeighNa平台的IB接口模块vnpy_ib,底层基于IB官方推出的ibapi接口库开发。目前ibapi的10.19.1新版本已经不再通过PyPI发布,也就是无法直接通过pip install来安装了(会安装到老版本的9.81.1.post1),同样也意味着无法在VeighNa Studio中打包提供。
用户首先需要前往IB官网的API下载页面,下载自己操作系统对应的API安装程序:
推荐选择Stable版本,图中这里对应的是2022年11月16日发布的10.19版本(左上红色方框),下载完成后运行安装,API开发库会被安装到指定目录(默认为C:\TWS API)。
打开该目录下的source\pythonclient文件夹,看到如下图所示的内容:
该文件夹中包含的就是ibapi接口库源代码,在空白处按住Shift点击鼠标右键,菜单中选择【在此处打开Powershell窗口】,在弹出的命令行中运行下述命令即可将ibapi安装到Python环境中:
python setup.py install
由于IB接入的金融市场数量众多,导致其上可交易的合约数量非常庞大,不同交易所之间经常会出现合约代码冲突的情况。而国内金融市场大部分情况下都可以通过一个简短的代码来确定具体要交易的合约,比如IF2309.CFFEX、600036.SSE等。
VeighNa核心框架设计上遵循了这一规则,在早期vnpy_ib版本中合约代码(symbol)使用的是由IB(而非交易所)为每个合约分配的ConId,即【数字代码】(如下图中的51529211):
在TWS软件中【右键点击任意合约】->【金融产品信息】->【详情】,即可弹出上图中的页面。
尽管使用ConId在程序内部逻辑上非常方便,但对于交易员来说却不便记忆。基于社区用户的反馈建议,vnpy_ib后续版本中替换为了IB合约描述信息的一个字符串组合,即【描述代码】(如SPY-USD-STK、ES-202002-USD-FUT)。
使用【描述代码】2年多后,陆续收到期权交易相关的问题反馈,核心原因有两点:
综合来看,这两种代码类型都有各自好用的场景和难用的问题,那解决方案自然就是两者一起支持。所以在3.8.0版本中,vnpy_ib接口重新引入了对于【数字代码】的支持,并且两种代码类型可以混合使用,遵循以下规则:
建议之前已经习惯了【描述代码】的用户可以正常继续使用,对于有期权交易需求的用户则更推荐使用【数字代码】。
Interactive Brokers(IB)创始人Thomas Peterffy,早年以期权交易所场内做市交易员身份进入金融行业,后来创建了Timber Hill这家在全球期权市场发展历史上贡献颇多的自营做市商公司,而今的IB只是其当时的副业(用于提供全球交易所的交易链路接入)。
因此IB的TWS平台期权交易方面的功能可谓十分强大,本文封面图片就是TWS平台内置的Volatility Lab(波动率实验室)截图,对内部功能细节感兴趣的话推荐可以看这里的IB官方文档。
3.8.0版本的IB接口,在连接登录时提供了【查询期权】选项:
选择为【是】后,每次订阅标的合约行情时,均会自动发起查询其上期权链合约,从而满足期权策略交易中所需的期权产品组合信息。
同时在IB期权Tick行情数据推送中,增加了IB所提供的隐含波动率和希腊值风险数据支持,可以通过TickData.extra字典来访问获取,其中具体包括:
前缀
后缀
前缀和后缀两者组合为具体的数据字段,举例来说:
以上TickData数据对象,可以通过VeighNa平台中策略模块(CtaStrategy、PortfolioStrategy等)下策略模板所提供的on_tick函数来接收获取实时推送。
之前版本中的vnpy_tora基于奇点官方提供的Python 3.7 API开发,只能在Python 3.7环境中使用,VeighNa Station因为内置环境是Python 3.10一直用不了。
本次3.8.0版本更新中使用了奇点的C++ API重构封装,对于API层和Gateway层都做了兼容性调整,目前已经可以在VeighNa Station中直接加载使用。
K线合成器(BarGenerator)增加对日K线的合成支持
基于华鑫奇点柜台的C++ API重构vnpy_tora,实现VeighNa Station加载支持
新增vnpy_ib对于期权合约查询、波动率和希腊值等扩展行情数据的支持
vnpy_rest/vnpy_websocket限制在Windows上改为必须使用Selector事件循环
vnpy_rest/vnpy_websocket客户端关闭时确保所有会话结束,并等待有异步任务完成后安全退出
vnpy_ctp升级6.6.9版本API
vnpy_ctp支持大商所的1毫秒级别行情时间戳
vnpy_tqsdk过滤不支持的K线频率查询并输出日志
vnpy_datamanager增加数据频率下按交易所显示支持,优化数据加载显示速度
vnpy_ctabacktester如果加载的历史数据为空,则不执行后续回测
vnpy_spreadtrading采用轻量级数据结构,优化图形界面更新机制
vnpy_spreadtrading价差子引擎之间的事件推送,不再经过事件引擎,降低延迟水平
vnpy_rpcservice增加对下单返回委托号的gateway_name替换处理
vnpy_portfoliostrategy策略模板增加引擎类型查询函数get_engine_type
vnpy_sec更新行情API至1.6.45.0版本,更新交易API版本至1.6.88.18版本
vnpy_ib更新10.19.1版本的API,恢复对于数字格式代码(ConId)的支持
没有配置数据服务或者加载模块失败的情况下,使用BaseDatafeed作为数据服务
遗传优化算法运行时,子进程指定使用spawn方式启动,避免数据库连接对象异常
合约管理控件,增加对于期权合约的特有数据字段显示
修复vnpy_datarecorder对于新版本vnpy_spreadtrading价差数据的录制支持
修复vnpy_algotrading条件委托算法StopAlgo全部成交后状态更新可能缺失的问题
修复vnpy_ctastrategy策略初始化时,历史数据重复推送调用on_bar的问题
修复vnpy_wind查询日线历史数据时,数值存在NaN的问题
ReqQryInstrument : 请求查询合约,填空可以查询到所有合约。
响应:OnRspQryInstrument
◇ 1.函数原型
virtual int ReqQryInstrument(CThostFtdcQryInstrumentField *pQryInstrument, int nRequestID) = 0;
◇ 2.参数
pQryInstrument:查询合约
struct CThostFtdcQryInstrumentField
{
TThostFtdcInstrumentIDType InstrumentID; ///合约代码
TThostFtdcExchangeIDType ExchangeID; ///交易所代码
TThostFtdcExchangeInstIDType ExchangeInstID; ///合约在交易所的代码
TThostFtdcInstrumentIDType ProductID;///产品代码
};
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。
请求查询合约响应,当执行ReqQryInstrument后,该方法被调用。
◇ 1.函数原型
virtual void OnRspQryInstrument(CThostFtdcInstrumentField *pInstrument, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数pInstrument:
合约
struct CThostFtdcInstrumentField
{
TThostFtdcInstrumentIDType InstrumentID;///合约代码
TThostFtdcExchangeIDType ExchangeID; ///交易所代码
TThostFtdcInstrumentNameType InstrumentName; ///合约名称
TThostFtdcExchangeInstIDType ExchangeInstID;///合约在交易所的代码
TThostFtdcInstrumentIDType ProductID; ///产品代码
TThostFtdcProductClassType ProductClass; ///产品类型
TThostFtdcYearType DeliveryYear; ///交割年份
TThostFtdcMonthType DeliveryMonth;///交割月
TThostFtdcVolumeType MaxMarketOrderVolume; ///市价单最大下单量
TThostFtdcVolumeType MinMarketOrderVolume;///市价单最小下单量
TThostFtdcVolumeType MaxLimitOrderVolume; ///限价单最大下单量
TThostFtdcVolumeType MinLimitOrderVolume; ///限价单最小下单量
TThostFtdcVolumeMultipleType VolumeMultiple; ///合约数量乘数
TThostFtdcPriceType PriceTick; ///最小变动价位
TThostFtdcDateType CreateDate; ///创建日
TThostFtdcDateType OpenDate; ///上市日
TThostFtdcDateType ExpireDate;///到期日
TThostFtdcDateType StartDelivDate; ///开始交割日
TThostFtdcDateType EndDelivDate; ///结束交割日
TThostFtdcInstLifePhaseType InstLifePhase; ///合约生命周期状态
TThostFtdcBoolType IsTrading;///当前是否交易
TThostFtdcPositionTypeType PositionType; ///持仓类型
TThostFtdcPositionDateTypeType PositionDateType;///持仓日期类型
TThostFtdcRatioType LongMarginRatio;///多头保证金率
TThostFtdcRatioType ShortMarginRatio; ///空头保证金率
TThostFtdcMaxMarginSideAlgorithmType MaxMarginSideAlgorithm;///是否使用大额单边保证金算法
TThostFtdcInstrumentIDType UnderlyingInstrID;///基础商品代码
TThostFtdcPriceType StrikePrice;///执行价
TThostFtdcOptionsTypeType OptionsType;///期权类型
TThostFtdcUnderlyingMultipleType UnderlyingMultiple; ///合约基础商品乘数
TThostFtdcCombinationTypeType CombinationType;///组合类型
};
VolumeMultiple:合约乘数(同交易所)
PriceTick:最小变动价位(同交易所)
IsTrading:是否活跃(同交易所)
DeliveryYear:交割年份(同交易所)
DeliveryMonth:交割月(同交易所)
OpenDate:上市日(同交易所)
CreateDate:创建日(同交易所)
ExpireDate:到期日(同交易所)
StartDeliveDate:开始交割日(同交易所)
EndDelivDate:结束交割日(同交易所)
同交易所表示这些字段每天更新自交易所,其余字段为柜台设置值。如果发现有些字段值有误,则以此来判断是交易所问题还是CTP柜台设置问题。
pRspInfo:响应信息
struct CThostFtdcRspInfoField
{
TThostFtdcErrorIDType ErrorID; ///错误代码
TThostFtdcErrorMsgType ErrorMsg;///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。
bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。
ReqQryInstrumentMarginRate
请求查询合约保证金率,对应响应OnRspQryInstrumentMarginRate。如果InstrumentID填空,则返回持仓对应的合约保证金率,否则返回相应InstrumentID的保证金率。
目前无法通过一次查询得到所有合约保证金率,如果要查询所有,则需要通过多次查询得到。
◇ 1.函数原型
virtual int ReqQryInstrumentMarginRate(CThostFtdcQryInstrumentMarginRateField *pQryInstrumentMarginRate, int nRequestID) = 0;
◇ 2.参数pQryInstrumentMarginRate:
查询合约保证金率
struct CThostFtdcQryInstrumentMarginRateField
{
///经纪公司代码
TThostFtdcBrokerIDType BrokerID;
///投资者代码
TThostFtdcInvestorIDType InvestorID;
///合约代码
TThostFtdcInstrumentIDType InstrumentID;
///投机套保标志
TThostFtdcHedgeFlagType HedgeFlag;
///交易所代码
TThostFtdcExchangeIDType ExchangeID;
///投资单元代码
TThostFtdcInvestUnitIDType InvestUnitID;
};
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。
OnRspQryInstrumentMarginRate
请求查询合约保证金率响应,当执行ReqQryInstrumentMarginRate后,该方法被调用。
◇ 1.函数原型
virtual void OnRspQryInstrumentMarginRate(CThostFtdcInstrumentMarginRateField *pInstrumentMarginRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数 ///:
合约保证金率
struct CThostFtdcInstrumentMarginRateField
{
TThostFtdcInstrumentIDType InstrumentID;///合约代码
TThostFtdcInvestorRangeType InvestorRange;///投资者范围
TThostFtdcBrokerIDType BrokerID; ///经纪公司代码
TThostFtdcInvestorIDType InvestorID;///投资者代码
TThostFtdcHedgeFlagType HedgeFlag; ///投机套保标志
TThostFtdcRatioType LongMarginRatioByMoney;///多头保证金率
TThostFtdcMoneyType LongMarginRatioByVolume;///多头保证金费
TThostFtdcRatioType ShortMarginRatioByMoney; ///空头保证金率
TThostFtdcMoneyType ShortMarginRatioByVolume; ///空头保证金费
TThostFtdcBoolType IsRelative;///是否相对交易所收取
TThostFtdcExchangeIDType ExchangeID;///交易所代码
TThostFtdcInvestUnitIDType InvestUnitID; ///投资单元代码
};
pRspInfo:响应信息
struct CThostFtdcRspInfoField
{
TThostFtdcErrorIDType ErrorID;///错误代码
TThostFtdcErrorMsgType ErrorMsg;///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。
bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。
ReqQryInstrumentCommissionRate
请求查询合约手续费率,对应响应OnRspQryInstrumentCommissionRate。如果InstrumentID填空,则返回持仓对应的合约手续费率。
目前无法通过一次查询得到所有合约手续费率,如果要查询所有,则需要通过多次查询得到。
◇ 1.函数原型
virtual int ReqQryInstrumentCommissionRate(CThostFtdcQryInstrumentCommissionRateField *pQryInstrumentCommissionRate, int nRequestID) = 0;
◇ 2.参数pQryInstrumentCommissionRate:
查询手续费率
struct CThostFtdcQryInstrumentCommissionRateField
{
TThostFtdcBrokerIDType BrokerID; ///经纪公司代码
TThostFtdcInvestorIDType InvestorID;///投资者代码
TThostFtdcInstrumentIDType InstrumentID;///合约代码
TThostFtdcExchangeIDType ExchangeID;///交易所代码
TThostFtdcInvestUnitIDType InvestUnitID;///投资单元代码
};
InstrumentID:返回手续费率对应的合约。
但是如果在柜台没有设置具体合约的手续费率,则默认会返回产品的手续费率,InstrumentID就为对应产品ID。
nRequestID:请求ID,对应响应里的nRequestID,无递增规则,由用户自行维护。
◇ 3.返回
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。
OnRspQryInstrumentCommissionRate
请求查询合约手续费率响应,当执行ReqQryInstrumentCommissionRate后,该方法被调用。
◇ 1.函数原型
virtual void OnRspQryInstrumentCommissionRate(CThostFtdcInstrumentCommissionRateField *pInstrumentCommissionRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {};
◇ 2.参数pInstrumentCommissionRate:合约手续费率
struct CThostFtdcInstrumentCommissionRateField
{
TThostFtdcInstrumentIDType InstrumentID; ///合约代码
TThostFtdcInvestorRangeType InvestorRange; ///投资者范围
TThostFtdcBrokerIDType BrokerID;///经纪公司代码
TThostFtdcInvestorIDType InvestorID; ///投资者代码
TThostFtdcRatioType OpenRatioByMoney; ///开仓手续费率
TThostFtdcRatioType OpenRatioByVolume; ///开仓手续费
TThostFtdcRatioType CloseRatioByMoney;///平仓手续费率
TThostFtdcRatioType CloseRatioByVolume;///平仓手续费
TThostFtdcRatioType CloseTodayRatioByMoney;///平今手续费率
TThostFtdcRatioType CloseTodayRatioByVolume;///平今手续费
TThostFtdcExchangeIDType ExchangeID; ///交易所代码
TThostFtdcBizTypeType BizType;///业务类型
TThostFtdcInvestUnitIDType InvestUnitID;///投资单元代码
};
pRspInfo:
响应信息
struct CThostFtdcRspInfoField
{
TThostFtdcErrorIDType ErrorID; ///错误代码
TThostFtdcErrorMsgType ErrorMsg; ///错误信息
};
nRequestID:返回用户操作请求的ID,该ID 由用户在操作请求时指定。
bIsLast:指示该次返回是否为针对nRequestID的最后一次返回。
令:
合约查询结果 = C
保证金率查询结果 = M
手续费查询结果 = S
则:
C["VolumeMultiple"]
if M["Is_Relative"] == 1:
多头保证金率 = C["LongMarginRatio"] + M["LongMarginRatioByMoney"]
空头保证金率 = C["ShortMarginRatio"] + M["ShortMarginRatioByMoney"]
else:
多头保证金率 = M["LongMarginRatioByMoney"]
空头保证金率 = M["ShortMarginRatioByMoney"]
if S.open_ratio_bymoney == 0.0:
开仓手续费= [FeeType.LOT,S["OpenRatioByVolume"] ]
平仓手续费= [FeeType.LOT,S["CloseRatioByVolume"] ]
平今手续费= [FeeType.LOT,S["CloseTodayRatioByVolume"] ]
else:
开仓手续费 = [FeeType.RATE,S["OpenRatioByMoney"] ]
平仓手续费 = [FeeType.RATE,S["CloseRatioByMoney"] ]
平今手续费 = [FeeType.RATE,S["CloseTodayRatioByMoney"] ]
我学Python的目的很明确,就是量化交易。从一开始就有关注vn.py,但我学的是Python3,那时vn.py还处于版本1.x时期,所以只能望vn.py兴叹。
vn.py 2.0出来之后我并没有及时注意,等反应过来已经是2.0.7版。很兴奋,认真研究,并将心得写成《vn.py 2.0.7源代码深入分析》,分享在vn.py社区的经验分享板块。
出于对量化交易的爱好,出于对Python在量化交易中作用的认同,一定程度受vn.py强大功能的鼓舞,我与同事合写了《Python量化交易从入门到实战》一书,对vn.py的讨论是其中很重要的一部分内容。
后续又写了《vn.py 2.1.4源代码深入分析》和《vn.py 2.2.0源代码深入分析》两个文档,感谢各位老师的认可。
vn.py 3.0.0版发布于2022-03-23,这是我一直期待的一个版本,所以它刚一推出,我就立刻开始试用,并着手整理《vn.py 3.0.0源代码深入分析》。夜以继日,终于在前天完成。先发到了书籍的资源群中,接受了两天批评,现分享到此处。
写作本文档的一个主要目的是对vn.py的开源精神做出一点支持,希望本文档能够对大家学习使用vn.py有所帮助。
百度网盘链接:https://pan.baidu.com/s/1cl2MA9hNFhHlxfHM0gGe2A
提取码:s7u6
首先说明,这里所合成出来的价差K线只是简易价差K线,它和交易所发表的套利品种的K线相比还是有所差别的,主要的差别在最高价、最低价和开盘价,但收盘价是准确的。
# hxxjava debug spread_trading
def query_tick_from_rq(
symbol: str, exchange: Exchange, start: datetime, end: datetime
):
"""
Query tick data from RQData.
"""
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.object import HistoryRequest
if not rqdata_client.inited:
rqdata_client.init()
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=Interval.TICK,
start=start,
end=end
)
data = rqdata_client.query_tick_history(req)
return data
有两种方式:一种是从米筐加载数据下载tick,另一种是从数据库中读取已经录制的该价差的tick数据。
@lru_cache(maxsize=999)
def load_tick_data(
spread: SpreadData,
start: datetime,
end: datetime,
pricetick: float = 0
):
""""""
# hxxjava debug spread_trading
# 目前没有考虑反向合约的情况,以后解决
spread_ticks: List[TickData] = []
try:
# 防止因为用户没有米筐tick数据权限而发生异常
# Load tick data of each spread leg
dt_legs: Dict[str, Dict] = {} # datetime string : Dict[vt_symbol:tick]
format_str = "%Y%m%d%H%M%S.%f"
for vt_symbol in spread.legs.keys():
symbol, exchange = extract_vt_symbol(vt_symbol)
# hxxjava debug spread_trading
tick_data = query_tick_from_rq(symbol=symbol, exchange=exchange,start=start,end=end)
if tick_data:
print(f"load from rqdatac {symbol}.{exchange} tick_data, len of = {len(tick_data)}")
# save all the spread's legs tick into a dictionary by tick's datetime
for tick in tick_data:
dt_str = tick.datetime.strftime(format_str)
if dt_str in dt_legs:
dt_legs[dt_str].update({vt_symbol:tick})
else:
dt_legs[dt_str] = {vt_symbol:tick}
# Calculate spread bar data
# snapshot of all legs's ticks
snapshot:Dict[str,TickData] = {}
spread_leg_count = len(spread.legs)
for dt_str in sorted(dt_legs.keys()):
dt = datetime.strptime(dt_str,format_str).astimezone(LOCAL_TZ)
# get each datetime
spread_price = 0
spread_value = 0
# get all legs's ticks dictionary at the datetime
leg_ticks = dt_legs.get(dt_str)
for vt_symbol,tick in leg_ticks.items():
# save each tick into the snapshot
snapshot.update({vt_symbol:tick})
if len(snapshot) < spread_leg_count:
# if not all legs tick saved in the snapshot
continue
# out_str = f"{dt_str} "
# format_str1 = "%Y-%m-%d %H:%M:%S.%f "
for vt_symbol,tick in snapshot.items():
price_multiplier = spread.price_multipliers[vt_symbol]
spread_price += price_multiplier * tick.last_price
spread_value += abs(price_multiplier) * tick.last_price
# out_str += f"[{vt_symbol} {tick.datetime.strftime(format_str1)} {tick.last_price}],"
# print(out_str)
if pricetick:
spread_price = round_to(spread_price, pricetick)
spread_tick = TickData(
symbol=spread.name,
exchange=exchange.LOCAL,
datetime=dt,
open_price=spread_price,
high_price=spread_price,
low_price=spread_price,
last_price=spread_price,
gateway_name="SPREAD")
spread_tick.value = spread_value
spread_ticks.append(spread_tick)
if spread_ticks:
print(f"load {symbol}.{exchange}' ticks from rqdatac, len of = {len(tick_data)}")
finally:
if not spread_ticks:
# 读取数据库中已经录制过的该价差的tick数据
spread_ticks = database_manager.load_tick_data(spread.name, Exchange.LOCAL, start, end)
return spread_ticks
只要在你的价差策略中on_init()中添加如下代码,就可以调用:
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_tick(days=3)
修改vnpy\app\spread_trading\engine.py
增加下面函数:
def get_previous_trading_date(dt:datetime,days:int): # hxxjava add
"""
得到某个日期dt的去除了节假日的前days个交易日
"""
from vnpy.trader.rqdata import rqdata_client
import rqdatac as rq
if not rqdata_client.inited:
rqdata_client.init()
prev_td = rq.get_previous_trading_date(date=dt.date(),n=days)
return prev_td
修改如下,修改后两个函数的days参数就代表交易日了。
def load_bar(
self, spread: SpreadData, days: int, interval: Interval, callback: Callable
):
""""""
end = datetime.now()
# start = end - timedelta(days)
start = get_previous_trading_date(dt = end,days=days) # hxxjava change
bars = load_bar_data(spread, interval, start, end)
print(f"{spread.name} {start}-{end} len of bars = {len(bars)}") # hxxjava debug spead_trading
for bar in bars:
callback(bar)
def load_tick(self, spread: SpreadData, days: int, callback: Callable):
""""""
end = datetime.now()
# start = end - timedelta(days=days)
start = get_previous_trading_date(dt = end,days=days) # hxxjava change
ticks = load_tick_data(spread, start, end)
for tick in ticks:
callback(tick)
vnpy中多采用各种应用系统的策略进行交易的,虽然也有各种日志和提示出现,但平常总是静悄悄的。
如果你想了解系统和策略的运行情况,可以查看各种运行日志,例如MainWindow的日志,委托列表,成交的列表,账户列表。想要查询你的策略运行情况,可以查看你的策略管理器的变量输出,等等。可是人总不能一直盯着屏幕,那样太累了。
如果能够有个声音和语音播报各种交易活动,用户会及时得到提醒。例如:
在vnpy\usertools目录下添加文件sound_player.py,内容如下:
"""
多线程音乐和文本播放器,介绍如下:
特点:
既可以播放wav,mp3 等格式,有可以对文本进行播放。
使用消息引擎创建该音乐播放器,多线程并行播放声音,不会因为播放声音而阻塞业务流程。
假设其他应用或者策略中需要播放声音, sound_name为字符型的声音文件名称,使用方法有两种:
方法1: 先获取消息引擎event_engine(注:与SoundPlayer实例成交时使用的消息引擎是
相同的),那么可以这样播放:
event_engine.put(Event(EVENT_SOUND,"Connected.wav"))
event_engine.put(Event(EVENT_SPEEK,"您收到一条委托单"))
方法2: 将SoundPlayer多play_sound()接口安装到MainEngine到实例main_engine,那么
可以先回去获取main_engine,然后这样播放:
event_engine.play_sound("Connected.wav")
event_engine.speek_text("您收到一条委托单")
作者:hxxjava 时间:2023-2-14,情人节————献给心爱的人!
修改:增加回测与实盘的区分功能,使得只在实盘环境才播放声音和文本。
修改:hxxjava 时间:2023-2-28
依赖库:pyttsx3, 安装:pip install pyttsx3
"""
from typing import Any
from pathlib import Path
from threading import Thread
from vnpy.trader.engine import EventEngine,Event
from winsound import PlaySound,SND_FILENAME
import pyttsx3
EVENT_SOUND = "eSound."
EVENT_SPEEK = "eSpeak."
class SoundPlayer():
"""
多线程声音播放器
"""
def __new__(cls, *args, **kwargs):
""" singleton constructor """
if not hasattr(cls, "_instance"):
cls._instance = super(SoundPlayer, cls).__new__(cls)
return cls._instance
def __init__(self,event_engine:EventEngine,switch:bool=True):
""" 初始化函数 """
self.event_engine = event_engine
# control play sound file
self.switch = switch
self.register_event()
def register_event(self):
""" """
self.event_engine.register(EVENT_SOUND,self.process_sound_event)
self.event_engine.register(EVENT_SPEEK,self.process_speak_event)
def set_switch(self,switch:bool=True):
""" set the swith which control play sound file """
self.switch = switch
def _get_sound_path(self,sound_name: str):
"""
Get path for sound file with sound name.
"""
this_file_path:Path = Path(__file__).parent
sound_path:Path = this_file_path.joinpath("sounds", sound_name)
return str(sound_path)
def process_sound_event(self,event:Event):
""" EVENT消息处理过程 """
wavname,is_testing = event.data['wavname'],event.data['is_testing']
if self.switch == True and is_testing == False:
filename = self._get_sound_path(wavname)
thread = Thread(target=self._play_sound,kwargs=({"filename":filename}),daemon=True)
thread.start()
def process_speak_event(self,event:Event):
""" EVENT消息处理过程 """
santence,is_testing = event.data['santence'],event.data['is_testing']
if self.switch == True and is_testing == False:
santence:str = event.data
thread = Thread(target=self._do_speak,kwargs=({"santence":santence}),daemon=True)
thread.start()
def _play_sound(self,filename:str):
""" 音乐文件播放线程执行过程 """
PlaySound(filename,SND_FILENAME)
def _do_speak(self,santence:str):
""" 文本播放线程执行过程 """
print(santence)
speaker = pyttsx3.init()
speaker.say(santence)
speaker.runAndWait()
def play_sound(self,sound_name:str,is_testing:bool=False):
"""
用户音乐播放接口。
参数:
sound_name:传入声音文件名
is_testing:回测=True;实盘=False(默认)
"""
self.event_engine.put(Event(EVENT_SOUND,{"wavname":sound_name,"is_testing":is_testing}))
def speak_text(self,santence:str,is_testing:bool=False):
"""
用户文字播放接口。
参数:
santence:传入声音文件名
is_testing:回测=True;实盘=False(默认)
"""
self.event_engine.put(Event(EVENT_SPEEK,{"santence":santence,"is_testing":is_testing}))
在vnpy\trader\engine.py中做如下修改:
from vnpy.usertools.sound_player import SoundPlayer
self.sound_player = SoundPlayer(event_engine,True) # test sound player
def add_function(self) -> None:
"""Add query function to main engine."""
... ...
self.main_engine.play_sound = self.sound_player.play_sound
self.main_engine.speak_text = self.sound_player.speak_text
这样你的MainEngine就有了可以音乐和语音功能了。
class sound_player规定音频文件存放在vnpy\usertools\sounds\目录下,当然你也可以修改代码中规定的目录,放在自己喜欢的目录下。
文件可以是wav、mp3格式的音乐文件均可,可以自己录制。
取一些有意义的文件名,如connected.wav代表网络连接成功,disconnection.wav代表网络断开,自己发挥吧,方便自己在自己vnpy系统中用函数调用。
本来本人有一套音乐文件的,可是论坛里没有文件上传功能,所以无法共享给大家,如果需要可以私信我。
下面用连接网关成功和连接断开,分别给出音乐和语音播放的示例:
def process_connect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口连接消息处理 """
gateway:GatewayData = event.data
self.main_engine.play_sound("Connected.wav")
def process_disconnect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口断开消息处理 """
gateway:GatewayData = event.data
self.main_engine.play_sound("ConnectionLost.wav")
# 增加引用
from vnpy.usertools.sound_player import EVENT_SOUND,EVENT_SPEEK
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
# 当策略收到委托单时播放提示音乐
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SOUND,
{"wavname":"order.wav","is_testing":is_testing}
)
event_engine.put(event)
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
# 当策略收到成交单时播放提示音乐
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SOUND, {"wavname":"traded.wav","is_testing":is_testing} )
event_engine.put(event)
def process_connect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口连接消息处理 """
gateway:GatewayData = event.data
self.main_engine.speak_text(f"感谢您,连接{gateway.name}的{gateway.type}接口成功!")
def process_disconnect_event(self, event: Event) -> None: # hxxjava add
""" CTP接口断开消息处理 """
gateway:GatewayData = event.data
self.main_engine.speak_text(f"请注意:{gateway.name}的{gateway.type}接口已断开!")
假设你的策略中实现了on_order()和on_trade()这两个回调函数:
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SPEEK,
{"santance":f"策略{self.strategy_name}收到委托单,价格{order.price},手数{order.volume},已经成交{order.traded}",
"is_testing":is_testing})
event_engine.put(event)
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
event_engine:EventEngine = self.cta_engine.event_engine
is_testing = self.cta_engine.get_engine_type() != EngineType.LIVE
event = Event(EVENT_SPEEK,
{"santance":f"策略{self.strategy_name}收到{trade.vt_symbol}成交单,成交价{trade.price}手数{trade.volume}.",
"is_testing":is_testing})
event_engine.put(event)
实盘中用户策略是可以通过应用应用引擎获得vnpy系统的MainEngine的,这样就可以使用 play_sound() 和 speak_text()函数来播放音乐和语音了。但是,在策略中使用这个两个播放函数,应该考虑到回测时不要有声音的。应该根据应用引擎的不同,在策略中使用 play_sound() 和 speak_text()时,将参数is_testing设置为True,这样策略回测就不会有音乐和语音了。
这里以CTA策略模板CtaTemplate为例,演示如何将音乐和语音播放功能封装到各种应用的策略中,其他应用系统的模板可以参考以下的做法去封装,就不再一一讲解。
# 在引用部分增加对音乐和语音播器的引用
from vnpy.usertools.sound_player import SoundPlayer
class CtaTemplate(ABC):
""""""
author: str = ""
parameters: list = []
variables: list = []
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
) -> None:
""""""
... ... # 原来的初始化代码
# 音乐和语音播放器 hxxjava add
self.sound_player:SoundPlayer = SoundPlayer(self.cta_engine.event_engine)
def play_sound(self,sound_name:str):
""" 播放音乐 hxxjava add """
if self.cta_engine.get_engine_type() == EngineType.LIVE:
self.sound_player.play_sound(sound_name)
def speak_text(self,santence:str):
""" 播放语音 hxxjava add """
if self.cta_engine.get_engine_type() == EngineType.LIVE:
self.sound_player.speak_text(santence)
经过上面对CtaTemplate的修改,用户策略中就可以像下面的语句一样直接调用音乐和语音播放了,更加简便。
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化开始")
self.load_bar(10)
self.speak_text(f"策略{self.strategy_name}开始初始化")
经过好几天的反复,终于完成了。所谓的复盘,就是盘后把行情从新播放一遍,如果使用tick数据,就和真实的盘面一模一样,我这里使用的是1分钟数据复盘,所以简化了很多。
代码如下:
import multiprocessing
import time
from datetime import datetime
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import database_manager
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.trader.ui import create_qapp, QtCore
from vnpy.trader.object import BarData
import os
bar: BarData
def putbardata(q_1m,q_5m,q_30m,q_4h,su):
#从数据库中读取1分钟数据,你的数据库必须有下载好的数据。
bars = database_manager.load_bar_data(
symbol="APEUSDT",
exchange=Exchange.BINANCE,
interval=Interval.MINUTE,
start=datetime(2022, 5, 4),
end=datetime(2025, 1, 1)
)
sudu = 0.055
i = 0
for bar in bars:
q_1m.put(bar)
q_5m.put(bar)
q_30m.put(bar)
q_4h.put(bar)
if i > 1200: #先快速播放一定数量的一分钟bar
if not su.empty():
sudu = int(su.get(True))
print("速度已经设定为:", sudu)
if i % 10 == 1 :
os.system("pause") #正常播放以后,每10个一分钟bar暂停一下,按任意键继续,不需要这个功能的可以删掉。
time.sleep(sudu)
i = i + 1
def MINUTE_5m(q):
app = create_qapp()
widget = ChartWidget()
widget.add_plot("candle", hide_x_axis=True)
widget.add_plot("volume", maximum_height=180)
widget.add_item(CandleItem, "candle", "candle")
widget.add_item(VolumeItem, "volume", "volume")
widget.add_cursor()
history : BarData
history = []
global i_5
i_5 = 0
global bar_
def update_bar():
global i_5
global bar_
if not q.empty():
bar = q.get(True)
if i_5 == 0 :
bar_ = bar
i_5 = 1
history.append(bar_)
if i_5 == 5 :
bar_ = bar
i_5 = 1
history.append(bar_)
else :
bar_.close_price = bar.close_price
if bar.high_price > bar_.high_price:
bar_.high_price = bar.high_price
if bar.low_price < bar_.low_price:
bar_.low_price = bar.low_price
bar_.volume = bar_.volume + bar.volume
i_5 = i_5 + 1
history[-1] = bar_ #这一段是把一分钟数据形成5分钟数据
widget.clear_all()
widget.update_history(history) #刷新图形数据
timer = QtCore.QTimer()
timer.timeout.connect(update_bar)
timer.start(50)
widget.setWindowTitle("五分钟") #设定五分钟窗口的标题和窗口大小以及位置
widget.setGeometry(0, 0, 900, 550)
widget.show()
app.exec_()
def MINUTE_30m(q): #30分钟和5分钟类似
app = create_qapp()
widget = ChartWidget()
widget.add_plot("candle", hide_x_axis=True)
widget.add_plot("volume", maximum_height=180)
widget.add_item(CandleItem, "candle", "candle")
widget.add_item(VolumeItem, "volume", "volume")
widget.add_cursor()
history: BarData
history = []
global i_30
i_30 = 0
global bar_
def update_bar():
global i_30
global bar_
if not q.empty():
bar = q.get(True)
if i_30 == 0 :
bar_ = bar
i_30 = 1
history.append(bar_)
if i_30 == 30 :
bar_ = bar
i_30 = 1
history.append(bar_)
else :
bar_.close_price = bar.close_price
if bar.high_price > bar_.high_price:
bar_.high_price = bar.high_price
if bar.low_price < bar_.low_price:
bar_.low_price = bar.low_price
bar_.volume = bar_.volume + bar.volume
i_30 = i_30 + 1
history[-1] = bar_
widget.clear_all()
widget.update_history(history)
timer = QtCore.QTimer()
timer.timeout.connect(update_bar)
timer.start(50)
widget.setWindowTitle("三十分钟")
widget.setGeometry(0, 560, 900, 550)
widget.show()
app.exec_()
def MINUTE_4h(q):
app = create_qapp()
widget = ChartWidget()
widget.add_plot("candle", hide_x_axis=True)
widget.add_plot("volume", maximum_height=180)
widget.add_item(CandleItem, "candle", "candle")
widget.add_item(VolumeItem, "volume", "volume")
widget.add_cursor()
history: BarData
history = []
global i_4h
i_4h = 0
global bar_
def update_bar():
global i_4h
global bar_
if not q.empty():
bar = q.get(True)
if i_4h == 0 :
bar_ = bar
i_4h = 1
history.append(bar_)
if i_4h == 240 :
bar_ = bar
i_4h = 1
history.append(bar_)
else :
bar_.close_price = bar.close_price
if bar.high_price > bar_.high_price:
bar_.high_price = bar.high_price
if bar.low_price < bar_.low_price:
bar_.low_price = bar.low_price
bar_.volume = bar_.volume + bar.volume
i_4h = i_4h + 1
history[-1] = bar_
widget.clear_all()
widget.update_history(history)
timer = QtCore.QTimer()
timer.timeout.connect(update_bar)
timer.start(50)
widget.setWindowTitle("四小时")
widget.setGeometry(860, 560, 1050, 530)
widget.show()
app.exec_()
def MINUTE(q): #一分钟的是最简单的,直接使用就好。
app = create_qapp()
widget = ChartWidget()
widget.add_plot("candle", hide_x_axis=True)
widget.add_plot("volume", maximum_height=180)
widget.add_item(CandleItem, "candle", "candle")
widget.add_item(VolumeItem, "volume", "volume")
widget.add_cursor()
def update_bar():
if not q.empty():
bar = q.get(True)
widget.update_bar(bar)
timer = QtCore.QTimer()
timer.timeout.connect(update_bar)
timer.start(50)
widget.setWindowTitle("一分钟")
widget.setGeometry(860, 15, 1050, 550)
widget.show()
app.exec_()
if __name__ == '__main__':
manager = multiprocessing.Manager()
q_1m = manager.Queue()
q_5m = manager.Queue()
q_30m = manager.Queue()
q_4h = manager.Queue()
su = manager.Queue()
pw = multiprocessing.Process(target=putbardata, args=(q_1m,q_5m,q_30m,q_4h,su))
pr_1m = multiprocessing.Process(target=MINUTE, args=(q_1m,))
pr_5m = multiprocessing.Process(target=MINUTE_5m, args=(q_5m,))
pr_30m = multiprocessing.Process(target=MINUTE_30m, args=(q_30m,))
pr_4h = multiprocessing.Process(target=MINUTE_4h, args=(q_4h,))
pw.start()
pr_1m.start()
pr_5m.start()
pr_30m.start()
pr_4h.start()
sudu = input("请输入速度:")
su.put(sudu)
time.sleep(1000000)
print('任务完成')
大概说一下原理,程序设定了5个进程,通过通道交换数据,其中一个进程发送数据,另外4个进程接受数据,接受数据的四个进程就是4个周期的窗口,把接受的一分钟数据变化成3分钟30分钟等,并用图形展示出来。
只要控制发送数据的节奏,就可以动态的把行情从新演示一遍了。
这是盘后复盘用的,可以回忆一下当天到底发生了什么。
国内期货有一个盘立方软件是可以完美复盘的,数字货币没有这个东西,tradingview有这个功能,但是每月要收费90元,而且tradingview也只能使用1分钟数据复盘。
身为程序员,当然不愿意掏钱,因为自己可以写一个。
感谢vnpy提供的ChartWidget,真的很好用。