如何编写一个python脚本将本地.csv
文件导入数据库是vn.py论坛中新用户提问较多的问题之一。本文的主要目的是帮助新用户解决数据入库问题,以便用户可以快速进入量化策略的开发。
本文主要分为三大部分,第一部分介绍了在vn.py中使用MongoDB数据库所需要进行的配置 (只打算使用vn.py默认SQLite数据库的用户,可以简单了解一下 )。第二部分介绍了数据入库的基本流程 (适用于vn.py支持的所有数据库)。最后一部分则是具体的实现:分别将数据导入MongoDB和SQLite数据库(适用于vn.py支持的所有数据库)。
另外,在正文开始之前,还需要提醒大家:在将数据导入数据库之前,务必要确保这些数据已经是被清洗过的干净 的数据。如果将没有被清洗过,质量差的数据直接入库进行回测,可能会导致各种问题,影响策略开发的效率。因此,建议大家使用 高质量 的数据源。
配置数据库
vn.py 中默认使用 SQLite 数据库。 因此,如果需要使用 MongoDB 数据库,则需要修改 vn.py 的全局配置。具体流程如下:
在
C:\Users\你的用户名\.vntrader
目录下找到vt_setting.json
文件对
vt_setting.json
文件中的database.driver,database.database,database.host,database.port
进行修改。
下图是我自己设置的数据库配置信息:
上图中的两个 "mongodb"
可能让人会有些困扰。实际上第一个"mongodb"
是告诉 vn.py 我们使用的数据库类型是 MongoDB 数据库而不是默认的 SQLite 数据库。 第二个 "mongodb"
是告诉 vn.py 回测所需要的数据储存在 MongoDB 数据库中一个叫做 "mongodb"
的 database 中。这样说可能有些绕口,请看下图:
上图是 MongoDB 数据库的官方图形界面。我们可以清楚的看到在该 MongoDB 数据库中一共有四个 database, 分别是 admin,config,local, mongodb
。 另外,mongodb
database 中分别储存了不同类型的期货数据。比如,AP888,IF888, RB888
等。在该 database 中,不同类型的期货数据,分别 储存在不同的 collection(表)中。vn.py默认是将所有的期货数据储存在同一个collection中并进行读取。 如果,你想让 vn.py 将不同类型的期货数据分别储存在不同的collection中。请阅读这两篇文章vn.py社区精选18 - 老用户福音,MongDB分表重构 和 vn.py社区精选19 - 福音收尾,MongoDB分表读取数据。
数据入库基本流程
在配置 MongoDB 数据库和设置好 MongoDB 分表读取之后(不配置数据库或设置分表读取,不影响后面的内容),我们可以正式开始讨论数据入库的基本流程。vn.py 提供了很多工具使数据入库这个过程变得简单,快捷。下面是数据入库的基本流程:
先确定要入库的数据是
Tick
还是Bar
(K线) 类型数据将需要入库的数据转化成 vn.py 定义的
TickData
或BarData
数据类型使用 vn.py 提供的数据入库工具函数
database_manager.save_tick_data
或database_manager.save_bar_data
将相应的TickData
或BarData
入库
从上面的数据入库流程中可以看出,在 vn.py 中数据入库的流程还是比较简单的。难点集中在第二步(将本地数据转换成vn.py 定义的 TickData
或 BarData
)。一般来说,就是对数据的时间戳处理上。另外,如果数据本身的质量不高,比如数据的整个时间戳格式前后不一致,那需要先对数据的时间戳格式进行统一。时间戳相关知识请查看 Python时间日期格式化之time与datetime模块 这篇文章。
数据入库具体实现
在了解了数据入库的基本流程之后,我们来实现一次数据入库的过程。首先,来看一看我们要入库的数据:
从上图可以看出,C 列储存的是表示时间的数据且 C2 和 C3 的时间间隔是1分钟。所以,要入库的数据是1分钟的 Bar
(K线)数据类型。下面我们进行第二步:将需要入库的数据转化成 vn.py 定义的 BarData
。
首先,我们先来认识一下 vn.py 中的BarData
:
从上图中可以看出,BarData
一共有11个属性。其中,BarData.vt_symbol
会在 BarData
实例化的时候自动生成。另外,需要指出的是BarData.exchange
和 BarData.inteval
的数据类型分别是 vn.py 中定义好的枚举常量 Exchange
和 Inteval
而 BarData.datetime
则是 python 标准库 datetime 中的 datetime
数据类型。
Exchange
数据结构的代码:
Interval
数据结构的代码:
在认识了vn.py 中的 BarData
之后,我们开始着手将需要入库的数据转化成 BarData
类型数据。再来重温一下,需要入库数据的格式:
通过和上文 BarData
的数据结构对比,我们有以下几个发现:
csv文件中的
合约代码 时间 开 高 低 收 成交量 持仓量
和BarData
中的symbol datetime open_price high_price low_price close_price volume open_interest
一一对应(从名称就就可以看出)。csv文件中
市场代码
没办法和BarData
中的exchange
对应。因为csv文件中市场代码
都是SC
,而在上图Exchange
数据结构代码截图中找不到和 SC 对应的枚举常量的绑定值。从合约代码 ag1608(沪银1608) 可以推断出这里的 SC 指的就是上海期货交易所,对应的枚举常量是Exchang.SHFE
。csv文件中缺少了和
BarData
中的interval
相对应的数据。上文我们已经发现了 csv文件中储存的是1分钟的BarData
,对应的枚举常量是Interval.MINUTE
。
基于上面的发现,很自然的,我们需要进行如下的操作:
将csv文件中
市场代码
的SC
替换成Exchang.SHFE
增加一列数据,且该列数据的所有值都是
Interval.MINUTE
一般情况下,使用 python 的 pandas
库可以方便的完成上面的操作。如果数据的质量较差,比如数据的分隔符设置存在问题,会使得pd.read_csv
函数没办法正确的读取.csv
文件。这时则需要使用python的 csv
库。本文的数据入库过程统一使用 pandas
来完成。 具体操作,如下:
from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
# 读取需要入库的csv文件,该文件是用gbk编码
imported_data = pd.read_csv('需要入库的数据的绝对路径',encoding='gbk')
# 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
imported_data['市场代码'] = Exchange.SHFE
# 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
imported_data['interval'] = Interval.MINUTE
接下来,我们还需要对每列数据的数据类型进行修改,确保和 BarData
中各个属性的数据类型一致。BarData
中属性的数据类型可以分为三大类:float
类, datetime
类 和 自定义枚举类 (Interval
和 Exchange
)。因为,上面已经修改过了Interval
和 Exchange
,下面只需要修改 float
和 datetime
类。
修改 float
类代码:
# 明确需要是float数据类型的列
float_columns = ['开', '高', '低', '收', '成交量', '持仓量']
for col in float_columns:
imported_data[col] = imported_data[col].astype('float')
修改 datatime
类代码:
# 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y%m%d %H:%M:%S'
imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
下一步,我们还需要对列名进行修改:
# 因为没有用到 成交额 这一列的数据,所以该列列名不变
imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
另外,因为该csv文件储存的是ag的主力连续数据,即多张ag合约的拼接。因此,symbol
列中有多个不同到期日的ag合约代码,这里需要将合约代码统一为ag88
:
imported_data['symbol'] ='ag88'
最后,我们使用 vn.py 封装好的 database_manager.save_bar_data
将数据入库:
# 导入 database_manager 模块
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)
# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
bars = []
start = None
count = 0
for row in imported_data.itertuples():
bar = BarData(
symbol=row.symbol,
exchange=row.exchange,
datetime=row.datetime,
interval=row.interval,
volume=row.volume,
open_price=row.open,
high_price=row.high,
low_price=row.low,
close_price=row.close,
open_interest=row.open_interest,
gateway_name="DB",
)
bars.append(bar)
# do some statistics
count += 1
if not start:
start = bar.datetime
end = bar.datetime
# insert into database
database_manager.save_bar_data(bars, collection_name)
print(f"Insert Bar: {count} from {start} - {end}")
如果,默认的数据库是其它vn.py支持的数据库,上面的代码需要做略微修改后便可以使用(详情请看结尾Debug部分)。
如果,没有设置分表储存不同类型的数据。则需要先将move_df_to_mongodb
函数中的collection_name
参数删除,同时将上面代码的倒数第二行修改为:
database_manager.save_bar_data(bars)
如果,想要将数据储存储存在 SQLite 数据库中也很简单(默认数据库不是SQLite)。只需要两步就可以完成。
- 创建一个sqlite数据库连接对象:
from vnpy.trader.database.initialize import init_sql
from vnpy.trader.database.database import Driver
settings={
"database": "database.db",
"host": "localhost",
"port": 3306,
"user": "root",
"password": "",
"authentication_source": "admin"
}
sqlite_manager = init_sql(driver=Driver.SQLITE, settings=settings)
2.使用sqlite数据库连接对象将数据入库
# 替换函数 move_df_to_mongodb 的倒数第二行
sqlite_manager.save_bar_data(bars)
总结
本文尝试从数据库配置,数据入库基本流程,数据入库具体实现,三部分来帮助vn.py新用户解决编写python脚本实现数据入库这个难点。借助vn.py的database_manager
模块,用户基本上可以无缝切换SQLite,MongoDB等vn.py支持的数据库来读取和存入数据。希望这篇文章能帮助大家快速进入量化策略的研究和开发。
Debug
使用上述代码进行Sqlite
数据入库的时候,会出现peewee.InterfaceError: Error binding parameter 2 - probably unsupported type
错误,解决方法:
- 找到
imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
代码所在行 - 在该行代码下键入
imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')
详细的Debug过程记录在sqlite数据入库Debug. 将该文件夹内的内容下载到本地同一个位置,运行Jupyter Notebook就可以复现整个过程.
完整代码
from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)
# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
bars = []
start = None
count = 0
for row in imported_data.itertuples():
bar = BarData(
symbol=row.symbol,
exchange=row.exchange,
datetime=row.datetime,
interval=row.interval,
volume=row.volume,
open_price=row.open,
high_price=row.high,
low_price=row.low,
close_price=row.close,
open_interest=row.open_interest,
gateway_name="DB",
)
bars.append(bar)
# do some statistics
count += 1
if not start:
start = bar.datetime
end = bar.datetime
# insert into database
database_manager.save_bar_data(bars, collection_name)
print(f'Insert Bar: {count} from {start} - {end}')
if __name__ == "__main__":
# 读取需要入库的csv文件,该文件是用gbk编码
imported_data = pd.read_csv('D:/1分钟数据压缩包/FutAC_Min1_Std_2016/ag主力连续.csv',encoding='gbk')
# 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
imported_data['市场代码'] = Exchange.SHFE
# 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
imported_data['interval'] = Interval.MINUTE
# 明确需要是float数据类型的列
float_columns = ['开', '高', '低', '收', '成交量', '持仓量']
for col in float_columns:
imported_data[col] = imported_data[col].astype('float')
# 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y%m%d %H:%M:%S'
imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
# 因为没有用到 成交额 这一列的数据,所以该列列名不变
imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
imported_data['symbol'] ='ag88'
move_df_to_mongodb(imported_data,'ag88')