了准备Tick数据
要获取Tick数据,并插入到vn.py数据库中,整体上有3种方法:
- 使用vn.py行情记录模块DataRecorder来自行录取:在保证网络稳定的情况下,启动一个独立的进程,负责在交易时段录制行情数据;并在收盘后,使用脚本工具完成数据的自动清洗工作,整体上实现起来比较耗费精力;
- 从RQData或者其他数据源下载:专业的数据服务商可以提供已经清洗好的Tick数据,但是价格往往比较贵;
- 通过CSV文件来载入:CSV格式的Tick数据相对来说比较容易获取(万能的淘宝~),因此对个人用户来说是个比较容易的选择。
那么本文我们就选择第3种方法,通过读取CSV文件,把数据载入到数据库中。
载入CSV文件
首先需要保证你已经在系统上安装配置好了数据库,这里演示用的是MongoDB数据库以及图形化客户端Robo 3T。
注意在MongoDB中需要创建新数据库“vnpy”,然后在全局配置对话框中,修改相关配置:
"database.driver": "mongodb",
"database.database": "vnpy",
"database.host": "localhost",
"database.port": 27017,
"database.user": "",
"database.password": "",
"database.authentication_source": ""
注意输入上述内容到配置对话框中时,请忽略引号。修改完毕保存后,请重新启动VN Trader,检查相关配置是否已经修改成功。
然后我们把所有的CSV文件放在同一文件夹下,这样就可以使用一个脚本来读取该文件夹内的所有CSV格式文件,并批量载入到数据库中。
在开始处理数据之前,我们需要知道CSV文件中的表头信息和数据特征。用Excel打开其中任意一个CSV文件,查看其中的内容后,建立一个比较直观的印象,大概知道:
- 哪些数据需要写入到数据库;
- 哪些数据不需要写入;
- 哪些数据需要强行赋值。
这里我们的CSV文件,表头以及第一行内容如下:
交易日,合约代码,交易所代码,合约在交易所的代码,最新价,上次结算价,昨收盘,昨持仓量,今开盘,最高价,最低价,数量,成交金额,持仓量,今收盘,本次结算价,涨停板价,跌停板价,昨虚实度,今虚实度,最后修改时间,最后修改毫秒,申买价一,申买量一,申卖价一,申卖量一,申买价二,申买量二,申卖价二,申卖量二,申买价三,申买量三,申卖价三,申卖量三,申买价四,申买量四,申卖价四,申卖量四,申买价五,申买量五,申卖价五,申卖量五,当日均价,业务日期
20190102,ru1905,,,11280.0000,11290.0000,11305.0000,322472,11280.0000,11280.0000,11280.0000,246,27748800.0000,322468,0.0000,0.0000,12080.0000,10495.0000,0,0,08:59:00,500,11280.0000,10,11290.0000,10,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,112800.0000,20190102
从以上内容中,我们发现下述特征:
- 表头是中文;
- 可以直接载入的数据:最新价,持仓量,买一价,买一量,卖一价,卖一量;
- 需要合成的数据:datetime,它由3列数据合成,分别是交易日,最后修改时间,最后修改毫秒;
- 强行赋值数据是主力连续合约(RU88)和交易所(SHFE);
- 集合竞价阶段数据需要保留;
- 非交易时段产生的垃圾数据需要剔除。
有了这样的需求后,我们在接下来开发脚本的过程中就有了方向:
1)使用for循环遍历同一文件夹内所有CSV格式的文件(即以“.csv"结尾的文件名),使用csv_load函数来载入数据:
import os
import csv
from datetime import datetime, time
from vnpy.trader.constant import Exchange
from vnpy.trader.database import database_manager
from vnpy.trader.object import TickData
def run_load_csv():
"""
遍历同一文件夹内所有csv文件,并且载入到数据库中
"""
for file in os.listdir("."):
if not file.endswith(".csv"):
continue
print("载入文件:", file)
csv_load(file)
2)csv_load函数的具体设计
- 使用csv.DictReader类,来访问CSV文件中的数据;
- 通过字符串相加合成标准时间,在使用datetime.strptime函数转化成时间元组;
- 通过datetime.time函数的判断来剔除非交易时间段数据;
- 将符合标准数据,生成TickData数据对象;
- 在循环中把TickData插入到ticks列表中;
- 最终使用database_manager.save_tick_data函数把ticks列表中的数据写入到数据库中。
def csv_load(file):
"""
读取csv文件内容,并写入到数据库中
"""
with open(file, "r") as f:
reader = csv.DictReader(f)
ticks = []
start = None
count = 0
for item in reader:
# generate datetime
date = item["交易日"]
second = item["最后修改时间"]
millisecond = item["最后修改毫秒"]
standard_time = date + " " + second + "." + millisecond
dt = datetime.strptime(standard_time, "%Y%m%d %H:%M:%S.%f")
# filter
if dt.time() > time(15, 1) and dt.time() < time(20, 59):
continue
tick = TickData(
symbol="RU88",
datetime=dt,
exchange=Exchange.SHFE,
last_price=float(item["最新价"]),
volume=float(item["持仓量"]),
bid_price_1=float(item["申买价一"]),
bid_volume_1=float(item["申买量一"]),
ask_price_1=float(item["申卖价一"]),
ask_volume_1=float(item["申卖量一"]),
gateway_name="DB",
)
ticks.append(tick)
# do some statistics
count += 1
if not start:
start = tick.datetime
end = tick.datetime
database_manager.save_tick_data(ticks)
print("插入数据", start, "-", end, "总数量:", count)
if __name__ == "__main__":
run_load_csv()
创建好脚本后可以直接运行:进入cmd或者Powershell,运行命令python load_tickdata.py即可,效果如下图所示:
此时我们使用Robo 3T客户端来连接上MongoDB,在数据库【vnpy】->【db_tick_data】可以看到新载入的数据:
Tick模式回测
CTA策略模块(CtaStrategy)的回测引擎BacktestingEngine支持Tick数据的回测,以下代码推荐在Jupyter Notebook中运行。
第一步我们需要在策略文件中进行一些修改,这里以AtrRsiStrategy策略为例:找到on_init函数,把其中的load_bar(10)
改为load_tick(10)
,即指定加载过去10天的Tick数据来执行策略初始化任务,而不是加载K线Bar数据进行初始化。
然后在加载回测相关的模块时,需要额外加载BacktestingMode枚举类型,其中包含有回测引擎所支持的Bar(K线)和Tick两种模式:
from vnpy.app.cta_strategy.backtesting import BacktestingEngine, OptimizationSetting
from vnpy.app.cta_strategy.base import BacktestingMode
from datetime import datetime
from atr_rsi_strategy import AtrRsiStrategy
创建回测引擎对象的实例后,在调用set_parameters函数时,参数中需要新增“mode=BacktestingMode.TICK ”,来指定回测引擎使用Tick回测模式。
同时需要注意另外2点:
- interval需要传入"1m",尽管在Tick回测中用不到K线,但不能设置为None或者不传,会导致报错;
- 合约乘数、手续费、滑点三个参数,需要根据合约品种的具体情况做调整。
engine = BacktestingEngine()
engine.set_parameters(
vt_symbol="RU88.SHFE",
interval="1m",
start=datetime(2019, 1, 1),
end=datetime(2019, 4, 1),
rate=0.5/10000,
slippage=5,
size=10,
pricetick=5,
capital=1_000_000,
mode=BacktestingMode.TICK
)
engine.add_strategy(AtrRsiStrategy, {})
后续的操作和K线模式回测就几乎完全相同了,加载历史数据并执行数据回放,然后基于逐笔成交计算每日盈亏数据,并生成最终的策略统计结果以及回测图表:
engine.load_data()
engine.run_backtesting()
df = engine.calculate_result()
engine.calculate_statistics()
engine.show_chart()
由于这3个月行情多为区间震荡,所以以去趋势跟踪为核心逻辑的AtrRsiStrategy的回测效果不太理想:
下面我们如果想再看看详细的逐笔成交记录,可以遍历回测引擎中保存所有成交数据的trades字典,并打印每笔成交相关的字段信息:
trades = engine.trades
for value in trades.values():
print("时间:",value.datetime,value.direction.value,value.offset.value, "价格:",value.price, "数量:",value.volume)
if value.offset.value == "平":
print("---------------------------------------------------------")
这样就能看到每笔成交具体发生的时间点,并和Tick数据当时的盘口情况进行相应的比对检查了:
《vn.py全实战进阶》课程已经更新过半!一共50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程,详细内容请戳课程上线:《vn.py全实战进阶》!
了解更多知识,请关注vn.py社区公众号。