VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 70
声望: 3

五一,闲着没事,把VNPY升级到了2.2,,,,
数据库从mysql换到了influx,实测715万条数据使用VNPY读取,mysql读取18分钟左右,influx大概11分钟,没有预期中的快。

description

倒是使用pickle数据序列化以后,仅1分钟多就加载完了,占用磁盘1G...

description

感觉influx相对于mysql更大的优势在写入上面,写入时达到了10W+每秒的速度,不知道是不是influx默认配置的原因没有读取的没有预期的快,不过使用数据持久化到本地以后,读取速度提高10倍,对于相同数据搞回测极其值得...

二是升级到2.2以后,需要用到合成日线数据,发现trader\utility.py下面的关于K线合成的代码发生了变化。。。

依葫芦画瓢,
将update_bar下:

        if self.interval == Interval.MINUTE:
            self.update_bar_minute_window(bar)
        else:
            self.update_bar_hour_window(bar)

更新为

        if self.interval == Interval.MINUTE:
            self.update_bar_minute_window(bar)
        elif self.interval == Interval.HOUR:
            self.update_bar_hour_window(bar)
        elif self.interval == Interval.DAILY:
            self.update_bar_day_window(bar)

从update_bar_hour_window复制一份改成update_bar_day_window,实现逻辑加入之前的:

        day_end = datetime.time(14, 59)
        if bar.exchange == Exchange.CFFEX and not bar.symbol.startswith("I"):
            day_end = datetime.time(15, 14)
        if bar.datetime.time() == day_end:
            self.day_bar.high_price = max(self.day_bar.high_price,bar.high_price)
            self.day_bar.low_price = min(self.day_bar.low_price,bar.low_price)
            self.day_bar.close_price = bar.close_price
            self.day_bar.volume += int(bar.volume)
            self.day_bar.open_interest = bar.open_interest
            finished_bar = self.day_bar
            self.day_bar = None

将 on_hour_bar 复制一份改成on_day_bar:(合成多日的,基本用不到。。。)

测试居然不行!!!
分析发现需要在init里加入 self.day_bar: BarData = None

再测试,还是不行,居然连小时都生成不了。。。
然道哪里搞错了?
备份代码,还原至初始值,居然还是连小时图都没法生成。。。。

检查代码没有错,策略文件时有from trader.constant import Interval
BarGenerator里面的Interval.HOUR和Interval.DAILY均读取正常。。。
一点一点打印排查发现utility.py文件update_bar里面的self.interval == Interval.HOUR不执行?
为啥???明明self.interval的值 = Interval.HOUR,为啥 self.interval == Interval.HOUR却不执行?

难道我不是我?????

经过一顿对比,怀疑,排除。。。

终于发现问题在枚举值导入上面。。。。
trader\utility.py文件里Interval使用的是 from .constant import Exchange, Interval
而策略文件里是的Interval的使用的是 from trader.constant import Interval

最后将trader\utility.py里的from .constant import Exchange, Interval改为from trader.constant import Exchange, Interval

终于解决!!!

谁能想会在导入这块,,,卡半天。。。

不知道之前版本的trader\utility.py里的Interval里怎么导入的,如果是有变化,对于有日线合成需求的,升级以后,大概率也会卡在这里。。。

Member
avatar
加入于:
帖子: 70
声望: 3

顺便贴一个mysql转存influx的高效代码,8千多万条数据,一秒写入10万,很快就转完了....

import datetime
import random
from influxdb import InfluxDBClient
import pymysql
import time
#键接influxdb
client = InfluxDBClient('localhost', 8086, 'root', '******', 'mytestdb',timeout=10)

#链接myslq
conn = pymysql.connect(host="localhost", port=3306, user="root",passwd="******",db="demo")
cursor = pymysql.cursors.Cursor(conn)

#查询mysql列表
sql = "select distinct symbol,exchange from dbbardata"
cursor.execute(sql)

#将mysql标的信息写入列表供循环写入influx用
symbol_list= []

#处理mysql获取的信息供influx写入使用.
def to_influx(info):
    info_influx = {"measurement": "bar_data","tags": {"interval": "1m","vt_symbol": info[0]+"."+info[1]},"time": info[-1],"fields": {"open_price": info[2],"high_price": info[3],"low_price": info[4],"close_price": info[5],"open_interest":info[6],"volume": info[7]}}
    return info_influx


#获取所有商品列表
while True:
    row = cursor.fetchone()
    if not row:
        break
    # print(row[0])
    new_info = {"symbol":row[0],"exchange":row[1]}
    symbol_list.append(new_info)

#循环写入
for i in symbol_list:
    print("开始读取:",i["symbol"])
    start_time = time.time()
    cursor.execute("select symbol,exchange,open_price,high_price,low_price,close_price,open_interest,volume,datetime from dbbardata where symbol='%s'" % i["symbol"])
    row = [to_influx(i) for i in cursor.fetchall()]
    time_end = time.time()- start_time
    print("读取完毕,耗时:",time_end,"秒,现在开始写入:",i["symbol"],len(row))
    row_count = len(row) #总数
    start_count = 0 #开始点
    end_count = 100000 #初始结束点
    count = 100000 #每次数量
    range_count = (row_count // count) + 1 #写入次数
    for ii in range(range_count):
        if ii == range_count - 1:
            client.write_points(row[start_count:-1], database="demo")
            print(i["symbol"], "写入完成", start_count,len(row))
            continue
        print(i["symbol"], "写入", start_count, end_count)
        client.write_points(row[start_count:end_count], database="demo")
        row_count -= count
        start_count += count
        end_count = start_count + count


conn.commit()

# 关闭mysql游标
cursor.close()

# 关闭mysql连接
conn.close()

#关闭influx链接
client.close()
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】