VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 141
声望: 57

shelve持久化有时会出错文件还大,打开也慢,今天刚上hdf5,分享下
首先pip install h5py
我把函数封装在utility.py

import os
import platform
import zlib
import pickle
import platform
import numpy as np
import h5py

if platform.uname().system == "Windows":
    LINK_SIGN = "\\"
elif platform.uname().system == "Linux":
    LINK_SIGN = "/"
#------------------------------------------------------------------------------------
def save_h5(filename:str,data:Any,overwrite:bool=False):
    """
    1.保存hdf5数据
    2.overwrite为True覆盖源文件,为False增量更新文件
    """
    contract_file_path = get_folder_path(filename)
    filepath =f"{contract_file_path}{LINK_SIGN}{filename}.h5"
    if overwrite:
        raw_data = data
    else:
        #增量更新数据
        raw_data = load_h5(filename)
        if isinstance(raw_data,dict):
            raw_data.update(data)
        elif isinstance(raw_data,list):
            for value in data:
                if value not in raw_data:
                    raw_data.append(value)        
    #循环写入h5数据直到写入成功或重试3次后退出循环
    count = 0
    while True:
        count += 1
        status = save_h5_status(filepath,raw_data)
        if status or count > 3:
            break
#------------------------------------------------------------------------------------
def save_h5_status(filepath:str,raw_data:Any):
    """
    获取H5保存数据状态
    """
    try:
        with h5py.File(filepath,"w") as file:
            data = zlib.compress(pickle.dumps(raw_data), 5)
            file["data"] =np.void(data)
        return True
    except:
        return False
#------------------------------------------------------------------------------------
def load_h5(filename:str):
    """
    读取hdf5数据
    """
    contract_file_path = get_folder_path(filename)
    filepath =f"{contract_file_path}{LINK_SIGN}{filename}.h5"
    if not os.path.exists(filepath):
        return {}
    count = 0
    while True:
        count += 1
        status,data = load_h5_status(filepath)
        if status or count > 3:
            return data
#------------------------------------------------------------------------------------        
def load_h5_status(filepath:str):
    """
    获取H5读取状态及数据
    """
    try:
        with  h5py.File(filepath,"r") as file:
            data = file["data"][()]
            data = pickle.loads(zlib.decompress(data))
            return True,data
    except:
        return False,{}
Member
avatar
加入于:
帖子: 141
声望: 57

vnpy.trader.engine.py里面修改如下

from vnpy.trader.utility import (save_h5,load_h5)
#--------------------------------------------------------------------------------------------------
class OmsEngine(BaseEngine):
    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
        self.ticks = {}
        self.orders = {}
        self.trades = {}
        self.positions = {}
        self.accounts = {}
        self.contracts = {}

        self.active_orders = {}
        self.add_function()
        self.load_contracts()
        self.register_event()

    #--------------------------------------------------------------------------------------------------
    def load_contracts(self):
        """
        读取合约数据
        """
        if not self.contracts:
            contract_data = load_h5("contract_data")
            for key, value in list(contract_data.items()):
                self.contracts[key] = value
        return self.contracts
    #--------------------------------------------------------------------------------------------------
    def save_contracts(self):
        """
        保存合约数据
        """
        save_h5("contract_data",self.contracts)
    #--------------------------------------------------------------------------------------------------
    def add_function(self):
        """
        为MainEngine添加OmsEngine函数
        """
        self.main_engine.save_contracts = self.save_contracts                   #保存合约参数到硬盘
        self.main_engine.load_contracts = self.load_contracts                   #读取硬盘合约数据
Member
avatar
加入于:
帖子: 141
声望: 57

我是在CLI子进程里面调用

from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
#----------------------------------------------------------------------
def run_child_process():
    """
    子进程运行函数
    """
    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    log_engine.info("portfolio策略启动成功") 
    print("-"*73)
    #定时保存合约数据可以在load_contracts循环读取出错5分钟后正常写入一次恢复正常读取
    while True:
        #保存合约数据到硬盘
        main_engine.save_contracts()
        sleep(300)
Administrator
avatar
加入于:
帖子: 4500
声望: 320

这个我们考虑后续版本也来支持下

Member
avatar
加入于:
帖子: 70
声望: 3

学习了,学习了!!
刚好有需要,兄台就上了!!!

Member
avatar
加入于:
帖子: 70
声望: 3

上弦之月 wrote:

我是在CLI子进程里面调用

from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    sleep(3)
    #保存合约数据到硬盘
    main_engine.save_contracts()

**请教老兄,hdf5持久化之后实测会比本地SQL数据库快多少呢?

目前使用mysql,加载30万数据,每次得30秒左右,每次调试时等待浪费不少时间,,,**

Member
avatar
加入于:
帖子: 141
声望: 57

@大王 我回测用redis序列化数据,不用hdf5

Member
avatar
加入于:
帖子: 70
声望: 0

上弦之月 wrote:

@大王 我回测用redis序列化数据,不用hdf5
+
月神,请教一下hdf5持久化和json方式保存的区别是什么?

Member
avatar
加入于:
帖子: 15
声望: 0

用Python的交易员 wrote:

这个我们考虑后续版本也来支持下
请问现在确实会有合约丢失的情况,连接不论快慢都会,所以其实connect之后去用sleep阻塞是没法解决的。而且第一次开启行情录制是要在交易时间,如果不是交易时间开启,行情录制就不会接收到数据,问题的原因还是这个吧

Member
avatar
加入于:
帖子: 20
声望: 2

H5存储存在随着更新次数增加,文件逐渐变大的问题,我基本舍弃了

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】