VeighNa量化社区
你的开源社区量化交易平台
Super Moderator
avatar
加入于:
帖子: 31
声望: 13

 

如何编写一个python脚本将本地.csv 文件导入数据库是vn.py论坛中新用户提问较多的问题之一。本文的主要目的是帮助新用户解决数据入库问题,以便用户可以快速进入量化策略的开发。
 

本文主要分为三大部分,第一部分介绍了在vn.py中使用MongoDB数据库所需要进行的配置 (只打算使用vn.py默认SQLite数据库的用户,可以简单了解一下 )。第二部分介绍了数据入库的基本流程 (适用于vn.py支持的所有数据库)。最后一部分则是具体的实现:分别将数据导入MongoDB和SQLite数据库(适用于vn.py支持的所有数据库)。
 

另外,在正文开始之前,还需要提醒大家:在将数据导入数据库之前,务必要确保这些数据已经是被清洗过干净 的数据。如果将没有被清洗过,质量差的数据直接入库进行回测,可能会导致各种问题,影响策略开发的效率。因此,建议大家使用 高质量 的数据源。
 

配置数据库

 

vn.py 中默认使用 SQLite 数据库。 因此,如果需要使用 MongoDB 数据库,则需要修改 vn.py 的全局配置。具体流程如下:
 

  1. C:\Users\你的用户名\.vntrader 目录下找到 vt_setting.json 文件

  2. vt_setting.json 文件中的database.driver,database.database,database.host,database.port 进行修改。
     

下图是我自己设置的数据库配置信息:
 

description

 

上图中的两个 "mongodb" 可能让人会有些困扰。实际上第一个"mongodb"是告诉 vn.py 我们使用的数据库类型是 MongoDB 数据库而不是默认的 SQLite 数据库。 第二个 "mongodb" 是告诉 vn.py 回测所需要的数据储存在 MongoDB 数据库中一个叫做 "mongodb" 的 database 中。这样说可能有些绕口,请看下图:
 

description

 

上图是 MongoDB 数据库的官方图形界面。我们可以清楚的看到在该 MongoDB 数据库中一共有四个 database, 分别是 admin,config,local, mongodb。 另外,mongodb database 中分别储存了不同类型的期货数据。比如,AP888,IF888, RB888等。在该 database 中,不同类型的期货数据,分别 储存在不同的 collection(表)中。vn.py默认是将所有的期货数据储存在同一个collection中并进行读取。 如果,你想让 vn.py 将不同类型的期货数据分别储存在不同的collection中。请阅读这两篇文章vn.py社区精选18 - 老用户福音,MongDB分表重构vn.py社区精选19 - 福音收尾,MongoDB分表读取数据
 

数据入库基本流程

 

在配置 MongoDB 数据库和设置好 MongoDB 分表读取之后(不配置数据库或设置分表读取,不影响后面的内容),我们可以正式开始讨论数据入库的基本流程。vn.py 提供了很多工具使数据入库这个过程变得简单,快捷。下面是数据入库的基本流程:
 

  1. 先确定要入库的数据是 Tick 还是 Bar (K线) 类型数据

  2. 将需要入库的数据转化成 vn.py 定义的 TickDataBarData 数据类型

  3. 使用 vn.py 提供的数据入库工具函数 database_manager.save_tick_datadatabase_manager.save_bar_data 将相应的 TickDataBarData 入库
     

从上面的数据入库流程中可以看出,在 vn.py 中数据入库的流程还是比较简单的。难点集中在第二步(将本地数据转换成vn.py 定义的 TickDataBarData)。一般来说,就是对数据的时间戳处理上。另外,如果数据本身的质量不高,比如数据的整个时间戳格式前后不一致,那需要先对数据的时间戳格式进行统一。时间戳相关知识请查看 Python时间日期格式化之time与datetime模块 这篇文章。
 

数据入库具体实现

 

在了解了数据入库的基本流程之后,我们来实现一次数据入库的过程。首先,来看一看我们要入库的数据:
 

description

 

从上图可以看出,C 列储存的是表示时间的数据且 C2 和 C3 的时间间隔是1分钟。所以,要入库的数据是1分钟的 Bar (K线)数据类型。下面我们进行第二步:将需要入库的数据转化成 vn.py 定义的 BarData
 

首先,我们先来认识一下 vn.py 中的BarData
 

description

 

从上图中可以看出,BarData 一共有11个属性。其中,BarData.vt_symbol 会在 BarData 实例化的时候自动生成。另外,需要指出的是BarData.exchangeBarData.inteval 的数据类型分别是 vn.py 中定义好的枚举常量 ExchangeIntevalBarData.datetime 则是 python 标准库 datetime 中的 datetime 数据类型。
 

Exchange 数据结构的代码:
 

description

 

Interval 数据结构的代码:
 

description

 

在认识了vn.py 中的 BarData 之后,我们开始着手将需要入库的数据转化成 BarData类型数据。再来重温一下,需要入库数据的格式:
 

description

 

通过和上文 BarData 的数据结构对比,我们有以下几个发现:
 

  1. csv文件中的 合约代码 时间 开 高 低 收 成交量 持仓量BarData中的 symbol datetime open_price high_price low_price close_price volume open_interest 一一对应(从名称就就可以看出)。

  2. csv文件中 市场代码 没办法和 BarData 中的 exchange 对应。因为csv文件中 市场代码 都是 SC ,而在上图Exchange 数据结构代码截图中找不到和 SC 对应的枚举常量的绑定值。从合约代码 ag1608(沪银1608) 可以推断出这里的 SC 指的就是上海期货交易所,对应的枚举常量是 Exchang.SHFE

  3. csv文件中缺少了和 BarData 中的 interval 相对应的数据。上文我们已经发现了 csv文件中储存的是1分钟的BarData,对应的枚举常量是 Interval.MINUTE
     

基于上面的发现,很自然的,我们需要进行如下的操作:
 

  1. 将csv文件中 市场代码SC 替换成 Exchang.SHFE

  2. 增加一列数据,且该列数据的所有值都是 Interval.MINUTE

 

一般情况下,使用 python 的 pandas 库可以方便的完成上面的操作。如果数据的质量较差,比如数据的分隔符设置存在问题,会使得pd.read_csv函数没办法正确的读取.csv文件。这时则需要使用python的 csv 库。本文的数据入库过程统一使用 pandas 来完成。 具体操作,如下:

 

from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
# 读取需要入库的csv文件,该文件是用gbk编码
imported_data = pd.read_csv('需要入库的数据的绝对路径',encoding='gbk')

# 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
imported_data['市场代码'] = Exchange.SHFE

# 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
imported_data['interval'] = Interval.MINUTE

 

接下来,我们还需要对每列数据的数据类型进行修改,确保和 BarData 中各个属性的数据类型一致。BarData中属性的数据类型可以分为三大类:float 类, datetime 类 和 自定义枚举类 (IntervalExchange)。因为,上面已经修改过了IntervalExchange,下面只需要修改 floatdatetime 类。
 

修改 float 类代码:
 

# 明确需要是float数据类型的列
float_columns = ['开', '高', '低', '收', '成交量', '持仓量']

for col in float_columns:
  imported_data[col] = imported_data[col].astype('float')

 

修改 datatime 类代码:
 

# 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y%m%d %H:%M:%S'

imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)

 

下一步,我们还需要对列名进行修改:
 

# 因为没有用到 成交额 这一列的数据,所以该列列名不变
imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']

 

另外,因为该csv文件储存的是ag的主力连续数据,即多张ag合约的拼接。因此,symbol列中有多个不同到期日的ag合约代码,这里需要将合约代码统一为ag88
 

imported_data['symbol'] ='ag88'

最后,我们使用 vn.py 封装好的 database_manager.save_bar_data 将数据入库:
 

# 导入 database_manager 模块
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)
# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
    bars = []
    start = None
    count = 0

    for row in imported_data.itertuples():

        bar = BarData(

              symbol=row.symbol,
              exchange=row.exchange,
              datetime=row.datetime,
              interval=row.interval,
              volume=row.volume,
              open_price=row.open,
              high_price=row.high,
              low_price=row.low,
              close_price=row.close,
              open_interest=row.open_interest,
              gateway_name="DB",

        )


        bars.append(bar)

        # do some statistics
        count += 1
        if not start:
            start = bar.datetime
    end = bar.datetime

    # insert into database
    database_manager.save_bar_data(bars, collection_name)
    print(f"Insert Bar: {count} from {start} - {end}")

 

如果,默认的数据库是其它vn.py支持的数据库,上面的代码需要做略微修改后便可以使用(详情请看结尾Debug部分)。
 

如果,没有设置分表储存不同类型的数据。则需要先将move_df_to_mongodb函数中的collection_name参数删除,同时将上面代码的倒数第二行修改为:
 

database_manager.save_bar_data(bars)

 

如果,想要将数据储存储存在 SQLite 数据库中也很简单(默认数据库不是SQLite)。只需要两步就可以完成。
 

  1. 创建一个sqlite数据库连接对象:

 

from vnpy.trader.database.initialize import init_sql
from vnpy.trader.database.database import Driver

settings={

    "database": "database.db",
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "",
    "authentication_source": "admin"
}
sqlite_manager = init_sql(driver=Driver.SQLITE, settings=settings)

 

2.使用sqlite数据库连接对象将数据入库

 

# 替换函数 move_df_to_mongodb 的倒数第二行
sqlite_manager.save_bar_data(bars)

 

总结

 

本文尝试从数据库配置,数据入库基本流程,数据入库具体实现,三部分来帮助vn.py新用户解决编写python脚本实现数据入库这个难点。借助vn.py的database_manager模块,用户基本上可以无缝切换SQLite,MongoDB等vn.py支持的数据库来读取和存入数据。希望这篇文章能帮助大家快速进入量化策略的研究和开发。
 

Debug

 

使用上述代码进行Sqlite数据入库的时候,会出现peewee.InterfaceError: Error binding parameter 2 - probably unsupported type错误,解决方法:

  1. 找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)代码所在行
  2. 在该行代码下键入imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')

详细的Debug过程记录在sqlite数据入库Debug. 将该文件夹内的内容下载到本地同一个位置,运行Jupyter Notebook就可以复现整个过程.
 

完整代码

 

from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)

# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
    bars = []
    start = None
    count = 0

    for row in imported_data.itertuples():

        bar = BarData(

              symbol=row.symbol,
              exchange=row.exchange,
              datetime=row.datetime,
              interval=row.interval,
              volume=row.volume,
              open_price=row.open,
              high_price=row.high,
              low_price=row.low,
              close_price=row.close,
              open_interest=row.open_interest,
              gateway_name="DB",

        )


        bars.append(bar)

        # do some statistics
        count += 1
        if not start:
            start = bar.datetime
    end = bar.datetime

    # insert into database
    database_manager.save_bar_data(bars, collection_name)
    print(f'Insert Bar: {count} from {start} - {end}')


if __name__ == "__main__":

    # 读取需要入库的csv文件,该文件是用gbk编码
    imported_data = pd.read_csv('D:/1分钟数据压缩包/FutAC_Min1_Std_2016/ag主力连续.csv',encoding='gbk')
    # 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
    imported_data['市场代码'] = Exchange.SHFE
    # 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
    imported_data['interval'] = Interval.MINUTE
    # 明确需要是float数据类型的列
    float_columns = ['开', '高', '低', '收', '成交量', '持仓量']
    for col in float_columns:
      imported_data[col] = imported_data[col].astype('float')
    # 明确时间戳的格式
    # %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
    datetime_format = '%Y%m%d %H:%M:%S'
    imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
    # 因为没有用到 成交额 这一列的数据,所以该列列名不变
    imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
    imported_data['symbol'] ='ag88'
    move_df_to_mongodb(imported_data,'ag88')
Member
avatar
加入于:
帖子: 25
声望: 0

谢谢分享!这篇文章对我这个小白来说真是太有用了!以前一直不知道如何将数据入库。再次感谢!

Member
avatar
加入于:
帖子: 1
声望: 0

谢谢分享,我用的csv导入到sqlite数据库,一直报错peewee.InterfaceError: Error binding parameter 2 - probably unsupported type。但不知道是哪个字段的类型不符合。

Super Moderator
avatar
加入于:
帖子: 31
声望: 13

非常抱歉,如果上述代码直接用于Sqlite数据库会出现问题。出现问题的是datetime字段的类型。
需要进行如下操作:

  1. 找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)代码
  2. 在上述代码下增加imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')

完整的可交互Debug过程请参考sqlite数据入库Debug。将该文件夹内的内容下载到本地同一个位置,运行Jupyter Notebook 就可以使用.

Member
avatar
加入于:
帖子: 12
声望: 1

你好,可以提供一份处理好格式的csv文件吗?便于手动点击导入。

Member
avatar
加入于:
帖子: 8
声望: 0

找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)代码所在行
在该行代码下键入imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')

请问,第二行后面的,这里面的 dt 在哪里定义的,直接调用会报错吗?

Member
avatar
加入于:
帖子: 4622
声望: 284

原帖给的debug过程记录里应该有吧

Member
avatar
加入于:
帖子: 8
声望: 0

你好,多谢回复。
但是,我提的这个问题,你说的原帖给的debug过程记录里应该有吧,这个里面好像没有。

其实我的问题是:
imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')
这行代码是csv导入sqlite的功能脚本文件里面的程序,dt 在哪里定义的,好像没找到,直接引用语法错误吗,为什么这样语句,程序编译正常呢?

Member
avatar
加入于:
帖子: 4622
声望: 284

dt就是pands.series.dt.strftime的dt,官方文档在https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.strftime.html?highlight=dt%20strft#pandas.Series.dt.strftime
不然你可以尝试一下把dt换成datetime, 应该会报错

Member
avatar
加入于:
帖子: 13
声望: 0

up

Member
avatar
加入于:
帖子: 8
声望: 0

好的,感谢指导了,自己有点想歪了,刚开始以为dt是VN相关的对象属性的调用了,其实是pandas里面关于datetime的部分!

Member
avatar
加入于:
帖子: 8
声望: 2

2020/09/09更新
原来代码

bar = BarData(

              symbol=row.symbol,
              exchange=row.exchange,
              datetime=row.datetime,#报错的地方,需要增加时区信息
              interval=row.interval,
              volume=row.volume,
              open_price=row.open,
              high_price=row.high,
              low_price=row.low,
              close_price=row.close,
              open_interest=row.open_interest,
              gateway_name="DB",

        )

运行会报错:

AttributeError: 'str' object has no attribute 'astimezone'

原因是新版本vnpy支持了时区数据,所以datetime=row.datetime需要加上时区信息

from datetime import datetime, timedelta, timezone

# 中国时区是+8,对应参数hours=8
# 日本时区是+9,hours=9
utc_8 = timezone(timedelta(hours=8))
datetime=row.datetime.replace(tzinfo=utc_8)

一个小技巧,把bar里面的数据打印出来,看看都是什么格式。

sql_manager.get_oldest_bar_data()
Member
avatar
加入于:
帖子: 70
声望: 0

callingpulse wrote:

2020/09/09更新
原来代码

bar = BarData(

              symbol=row.symbol,
              exchange=row.exchange,
              datetime=row.datetime,#报错的地方,需要增加时区信息
              interval=row.interval,
              volume=row.volume,
              open_price=row.open,
              high_price=row.high,
              low_price=row.low,
              close_price=row.close,
              open_interest=row.open_interest,
              gateway_name="DB",

        )

运行会报错:

AttributeError: 'str' object has no attribute 'astimezone'

原因是新版本vnpy支持了时区数据,所以datetime=row.datetime需要加上时区信息

from datetime import datetime, timedelta, timezone

# 中国时区是+8,对应参数hours=8
# 日本时区是+9,hours=9
utc_8 = timezone(timedelta(hours=8))
datetime=row.datetime.replace(tzinfo=utc_8)

一个小技巧,把bar里面的数据打印出来,看看都是什么格式。

sql_manager.get_oldest_bar_data()

大佬牛!!!出错后发现真是这个问题导致的,必须加时区

Member
avatar
加入于:
帖子: 2
声望: 0

Traceback (most recent call last):
File "d:/study/论坛的入库程序.py", line 43, in <module>
move_df_to_mongodb(imported_data)
File "d:/study/论坛的入库程序.py", line 26, in move_df_to_mongodb
gateway_name="DB",
File "<string>", line 14, in init
File "D:\Python\Miniconda3\lib\site-packages\vnpy\trader\object.py", line 99, in post_init
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
AttributeError: 'str' object has no attribute 'value'
奇怪我的报错为啥和别人不同?版本2.3

Member
avatar
加入于:
帖子: 4622
声望: 284

你的exchange是一个str,没有value。检查一下格式吧

Member
avatar
加入于:
帖子: 2
声望: 0

xiaohe wrote:

你的exchange是一个str,没有value。检查一下格式吧
谢谢,好像是格式问题,虽然我赋值的时候是类属性,但是我没有直接导入,而是重新写回了CSV文件,结果写回之后变成了str类,这点我还没搞明白。
现在又有新问题了,
```
from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData, TickData)

封装函数

def move_df_to_mongodb(imported_data: pd.DataFrame):
bars = []
start = None
count = 0

for row in imported_data.itertuples():

    bar = BarData(
        symbol=row.symbol,
        exchange=row.exchange,
        datetime=row.datetime,
        interval=row.interval,
        volume=row.volume,
        open_price=row.open,
        high_price=row.high,
        low_price=row.low,
        close_price=row.close,
        open_interest=row.open_interest,
        gateway_name="DB",
    )
    bars.append(bar)

    # do some statistics
    count += 1
    if not start:
        start = bar.datetime
end = bar.datetime

# insert into database
database_manager.save_bar_data(bars)
print(f'Insert Bar: {count} from {start} - {end}')


if name == "main":
datatype = 1 # 1分钟数据用,2日线数据用

csvfilepath = 'e:\\test\\1test.csv'
# 读取需要入库的csv文件,该文件是用gbk编码
imported_data = pd.read_csv(csvfilepath, encoding='gbk')
# 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
imported_data.loc[imported_data['exchange'].values == 'SSE', 'exchange'] = Exchange.SSE
imported_data.loc[imported_data['exchange'].values == 'SZSE', 'exchange'] = Exchange.SZSE
imported_data.loc[imported_data['exchange'].values == 'CFFEX', 'exchange'] = Exchange.CFFEX
# 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
if datatype == 1:
    imported_data['interval'] = Interval.MINUTE
elif datatype == 2:
    imported_data['interval'] = Interval.DAILY
# 明确需要是float数据类型的列
float_columns = ['open', 'high', 'low', 'close', 'volume', 'open_interest']
for col in float_columns:
    imported_data[col] = imported_data[col].astype('float')
# 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y/%m/%d %H:%M:%S'
imported_data['datetime'] = pd.to_datetime(imported_data['datetime'], format=datetime_format)
# 因为没有用到 成交额 这一列的数据,所以该列列名不变
imported_data.columns = ['exchange', 'symbol', 'name', 'datetime', 'open', 'high', 'low', 'close', 'volume',
                         'open_interest','interval']

move_df_to_mongodb(imported_data)

```
我因为是导入A股数据,本地时间格式2021/6/1 9:31:00 这样的,csv格式如下

description

VNPY的版本是2.3,现在出了
Traceback (most recent call last):
File "D:/study/论坛的入库程序.py", line 68, in <module>
move_df_to_mongodb(imported_data)
File "D:/study/论坛的入库程序.py", line 37, in move_df_to_mongodb
database_manager.save_bar_data(bars)
File "D:\Python\Miniconda3\lib\site-packages\vnpy\database\mongodb\mongodb_database.py", line 163, in save_bar_data
bar.datetime = convert_tz(bar.datetime)
File "D:\Python\Miniconda3\lib\site-packages\vnpy\trader\database.py", line 20, in convert_tz
dt = dt.astimezone(DB_TZ)
File "pandas/_libs/tslibs/timestamps.pyx", line 884, in pandas._libs.tslibs.timestamps.Timestamp.tz_convert
TypeError: Cannot convert tz-naive Timestamp, use tz_localize to localize

我看到楼上有兄弟同样的问题说是换了下时间格式可以了,麻烦问下,我这种情况,时间格式应该用什么的?
补充:我后来又print了一下datetime字段 print(imported_data['datetime'])
居然显示是3359 2021-06-09 15:00:00
Name: datetime, Length: 3360, dtype: object
于是后来我又试着把 # 明确时间戳的格式

# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y/%m/%d %H:%M:%S'

这句修改成datetime_format = '%Y-%m-%d %H:%M:%S' 结果依然报错TypeError: Cannot convert tz-naive Timestamp, use tz_localize to localize
甚至也改过datetime_format = '%Y%m%d %H:%M:%S' 还是不对
我也不知道怎么办了,麻烦哪位能帮我解答一下,谢谢

Member
avatar
加入于:
帖子: 4622
声望: 284

那就用localize函数处理一下你的datetime吧

Member
avatar
加入于:
帖子: 1
声望: 0

太感谢了我来试试

Member
avatar
加入于:
帖子: 5
声望: 2

"" move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str)""
这里怎么看出来是 用 MongoDB 数据库的 .

Member
avatar
加入于:
帖子: 4622
声望: 284

ruzwdy wrote:

"" move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str)""
这里怎么看出来是 用 MongoDB 数据库的 .
请问你的问题是?
楼主写这个函数因为他自己用的是mongodb

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】