VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 2
声望: 3

下载通达信期货通,使用前需要先下载好分钟数据或者日线数据
代码支持导入分钟线数据或者日线数据
使用方法
导入分钟数据 : python loadtdx.py CL8 (导入玉米主连分钟线

导入日线数据:python loadtdx.py CL9 day(导入玉米日线数据)

import os
import struct
import math
import re
import datetime
import vnpy_datamanager.engine 
import csv
import sys
from typing import List, Optional

from vnpy.trader.engine import BaseEngine, MainEngine, EventEngine
from vnpy.trader.constant import Interval, Exchange
from vnpy.trader.object import BarData, TickData, ContractData, HistoryRequest
from vnpy.trader.database import BaseDatabase, get_database, BarOverview, DB_TZ
from vnpy.trader.datafeed import BaseDatafeed, get_datafeed
from vnpy.trader.utility import ZoneInfo


##保存分钟级CSV数据到数据库
def import_data_from_csv(
        file_path: str,
        symbol: str,
        exchange: Exchange,
        interval: Interval,
        tz_name: str,
        datetime_head: str,
        open_head: str,
        high_head: str,
        low_head: str,
        close_head: str,
        volume_head: str,
        turnover_head: str,
        open_interest_head: str,
        datetime_format: str
    ) -> tuple:
        """"""
        database: BaseDatabase = get_database()
        datafeed: BaseDatafeed = get_datafeed()
        with open(file_path, "rt") as f:
            buf: list = [line.replace("\0", "") for line in f]

        reader: csv.DictReader = csv.DictReader(buf, delimiter=",")

        bars: List[BarData] = []
        start: datetime.datetime = None
        count: int = 0
        tz = ZoneInfo(tz_name)

        for item in reader:
            if datetime_format:

                dt: datetime.datetime = datetime.datetime.strptime(item[datetime_head], datetime_format)
            else:
                dt: datetime.datetime = datetime.datetime.fromisoformat(item[datetime_head])
            dt = dt.replace(tzinfo=tz)

            turnover = item.get(turnover_head, 0)
            open_interest = item.get(open_interest_head, 0)

            bar: BarData = BarData(
                symbol=symbol,
                exchange=exchange,
                datetime=dt,
                interval=interval,
                volume=float(item[volume_head]),
                open_price=float(item[open_head]),
                high_price=float(item[high_head]),
                low_price=float(item[low_head]),
                close_price=float(item[close_head]),
                turnover=float(turnover),
                open_interest=float(open_interest),
                gateway_name="DB",
            )

            bars.append(bar)

            # do some statistics
            count += 1
            if not start:
                start = bar.datetime

        end: datetime = bar.datetime

        # insert into database
        database.save_bar_data(bars)
        return start, end, count

# 根据二进制前两段拿到日期分时
def get_date_str(h1, h2) -> str:  # H1->0,1字节; H2->2,3字节;
    year = math.floor(h1 / 2048) + 2004  # 解析出年
    month = math.floor(h1 % 2048 / 100)  # 月
    day = h1 % 2048 % 100  # 日
    hour = math.floor(h2 / 60)  # 小时
    minute = h2 % 60  # 分钟
    date_str = str(year) + "-" + str(month) + "-" + str(day)
    date_p = datetime.datetime.strptime(date_str,"%Y-%m-%d")
    week = date_p.weekday()  ##周几(0-6)
    if(hour >= 21) :
        if(week == 0):
            date_p = date_p - datetime.timedelta(days=3)
        else :
            date_p = date_p - datetime.timedelta(days=1)

    if hour < 10:  # 如果小时小于两位, 补0
        hour = "0" + str(hour)
    if minute < 10:  # 如果分钟小于两位, 补0
        minute = "0" + str(minute)
    return date_p.strftime("%Y-%m-%d") + " " + str(hour) + ":" + str(minute) + ":00"
    ##return str(year) + "-" + str(month) + "-" + str(day) + " " + str(hour) + ":" + str(minute) + ":00" 
def name_to_symbol(name) -> str :

    prefix = name[0:2]
    future_name = name[3:-4]

    exchange=Exchange.AMEX
    if  prefix == '28' :
        exchange = Exchange.CZCE
    if prefix == '29' :
        exchange = Exchange.DCE
        future_name=future_name.lower()
    if prefix == '30' :
        exchange = Exchange.SHFE
        future_name=future_name.lower()
    if prefix == '47' :
        exchange = Exchange.CFFEX
    if prefix == '74' :
        exchange = Exchange.AMEX

    if "L8" in future_name.upper()  or "L9" in future_name.upper():
        future_name=future_name.upper()
    return future_name,exchange

# 根据通达信.lc1文件,生成对应名称的csv文件
def lc1_csv(filepath, name, targetdir) -> None:
    # (通达信.lc1文件路径, 通达信.lc1文件名称, 处理后要保存到的文件夹)
    with open(filepath, 'rb') as f:  # 读取通达信.lc1文件,并处理
        file_object_path = targetdir + name + '.csv'  # 设置处理后保存文件的路径和名称
        file_object = open(file_object_path, 'w+')  # 打开新建的csv文件,开始写入数据
        title_list = "symbol,exchange,datetime,open,high,low,close,open_interest,volume,settlement_price\n"  # 定义csv文件标题
        file_object.writelines(title_list)  # 将文件标题写入到csv中
        symbol,exchange = name_to_symbol(name)

        while True:
            li2 = f.read(32)  # 读取一个1分钟数据
            if not li2:  # 如果没有数据了,就退出
                break
            data2 = struct.unpack('HHffffllf', li2)  # 解析数据
            date_str = get_date_str(data2[0], data2[1])  # 解析日期和分时

            data2_list = list(data2)  # 将数据转成list
            data2_list[1] = date_str  # 将list二个元素更改为日期 时:分
            del (data2_list[0])  # 删除list第一个元素
            for dl in range(len(data2_list)):  # 将list中的内容都转成字符串str
                data2_list[dl] = str(data2_list[dl])
            data2_str = ",".join(data2_list)  # 将list转换成字符串str
            data2_str += "\n"  # 添加换行
            data2_str =symbol + "," + str(exchange) + "," + data2_str
            if date_str !=  "":
                file_object.writelines(data2_str)  # 写入一行数据
        file_object.close()  # 完成数据写入
##日线文件转CSV
def day_csv(dirname,fname,targetDir):
    ofile=open(dirname+os.sep+fname,'rb')
    buf=ofile.read()
    ofile.close()

    ifile=open(targetDir+os.sep+fname+'.csv','w')
    num=len(buf)
    no=num/32
    b=0
    e=32
    line='' 
    linename=str('datetime')+','+str('open')+','+str('high')+','+str('low')+','+str('close')+','+str('open_interest')+','+str('volume')+','+str('settlement_price')+''+'\n'
      # print line
    ifile.write(linename)
    # for i in xrange(no):
    for i in range(int(no)):
       a=struct.unpack('IIIIIfII',buf[b:e])
       strdate = str(a[0])[0:4] + '-' + str(a[0])[4:6] + '-' + str(a[0])[6:8] + ' 00:00:00'
       line=strdate+','+str(a[1]/100.0)+', '+str(a[2]/100.0)+' ,'+str(a[3]/100.0)+', '+str(a[4]/100.0)+' ,'+str(a[5])+', '+str(a[6])+' ,'+str(a[7])+''+'\n'
      # print line
       ifile.write(line)
       b=b+32
       e=e+32
    ifile.close()
def csv2database(fname :str = "",target_dir :str = "",interval: Interval=Interval.MINUTE):

    future_name,exchange=name_to_symbol(fname)
    print(fname)
    print(future_name)
    print(exchange)
    import_data_from_csv(target_dir + fname+'.csv', future_name,exchange,interval,"Asia/Shanghai","datetime","open","high","low","close","volume","settlement_price","open_interest","%Y-%m-%d %H:%M:%S")



if __name__ == '__main__':
    argcount = len(sys.argv)
    if not len(sys.argv)>=2:
        print("请输入要导入数据的品种  例如IL8")
        exit(0)
    future = sys.argv[1].upper()

    if argcount == 2:
        future= '#' + future + '.lc1'
    elif argcount == 3:
        future= '#' + future + '.day'
    # 设置通达信.day文件所在的文件夹
    min_dir = 'C:\\new_tdxqh\\vipdoc\\ds\\minline\\'
    day_dir = 'C:\\new_tdxqh\\vipdoc\\ds\\lday'
    if argcount == 2 :
        path_dir = min_dir
    elif argcount == 3:
        path_dir = day_dir
    # 设置数据处理好后,要将csv文件保存的文件夹
    target_dir = './lc1/'
    # 读取文件夹下的通达信.day文件
    listfile = os.listdir(path_dir)
    # 逐个处理文件夹下的通达信.day文件,并生成对应的csv文件,保存到../day/文件夹下
    for fname in listfile:
        if future in fname:
            if argcount == 2:
                lc1_csv(path_dir + fname, fname, target_dir)
                csv2database(fname,target_dir,Interval.MINUTE)
            elif argcount == 3:
                day_csv(path_dir, fname, target_dir)
                csv2database(fname,target_dir,Interval.DAILY)
    else:
        print('The for ' + path_dir + ' to ' + target_dir + '  loop is over')
        print("文件转换已完成")
Member
avatar
加入于:
帖子: 2
声望: 0

多谢分享,让我终于实现了批量自动导入。

Member
avatar
加入于:
帖子: 14
声望: 1

谢谢分享,不过导入的日线数据有问题,是不是解析的有些问题呢?

Member
avatar
加入于:
帖子: 6
声望: 0

请问这个代码放在哪里执行呢?

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】