下载通达信期货通,使用前需要先下载好分钟数据或者日线数据
代码支持导入分钟线数据或者日线数据
使用方法
导入分钟数据 : python loadtdx.py CL8 (导入玉米主连分钟线
)
导入日线数据:python loadtdx.py CL9 day(导入玉米日线数据)
import os
import struct
import math
import re
import datetime
import vnpy_datamanager.engine
import csv
import sys
from typing import List, Optional
from vnpy.trader.engine import BaseEngine, MainEngine, EventEngine
from vnpy.trader.constant import Interval, Exchange
from vnpy.trader.object import BarData, TickData, ContractData, HistoryRequest
from vnpy.trader.database import BaseDatabase, get_database, BarOverview, DB_TZ
from vnpy.trader.datafeed import BaseDatafeed, get_datafeed
from vnpy.trader.utility import ZoneInfo
##保存分钟级CSV数据到数据库
def import_data_from_csv(
file_path: str,
symbol: str,
exchange: Exchange,
interval: Interval,
tz_name: str,
datetime_head: str,
open_head: str,
high_head: str,
low_head: str,
close_head: str,
volume_head: str,
turnover_head: str,
open_interest_head: str,
datetime_format: str
) -> tuple:
""""""
database: BaseDatabase = get_database()
datafeed: BaseDatafeed = get_datafeed()
with open(file_path, "rt") as f:
buf: list = [line.replace("\0", "") for line in f]
reader: csv.DictReader = csv.DictReader(buf, delimiter=",")
bars: List[BarData] = []
start: datetime.datetime = None
count: int = 0
tz = ZoneInfo(tz_name)
for item in reader:
if datetime_format:
dt: datetime.datetime = datetime.datetime.strptime(item[datetime_head], datetime_format)
else:
dt: datetime.datetime = datetime.datetime.fromisoformat(item[datetime_head])
dt = dt.replace(tzinfo=tz)
turnover = item.get(turnover_head, 0)
open_interest = item.get(open_interest_head, 0)
bar: BarData = BarData(
symbol=symbol,
exchange=exchange,
datetime=dt,
interval=interval,
volume=float(item[volume_head]),
open_price=float(item[open_head]),
high_price=float(item[high_head]),
low_price=float(item[low_head]),
close_price=float(item[close_head]),
turnover=float(turnover),
open_interest=float(open_interest),
gateway_name="DB",
)
bars.append(bar)
# do some statistics
count += 1
if not start:
start = bar.datetime
end: datetime = bar.datetime
# insert into database
database.save_bar_data(bars)
return start, end, count
# 根据二进制前两段拿到日期分时
def get_date_str(h1, h2) -> str: # H1->0,1字节; H2->2,3字节;
year = math.floor(h1 / 2048) + 2004 # 解析出年
month = math.floor(h1 % 2048 / 100) # 月
day = h1 % 2048 % 100 # 日
hour = math.floor(h2 / 60) # 小时
minute = h2 % 60 # 分钟
date_str = str(year) + "-" + str(month) + "-" + str(day)
date_p = datetime.datetime.strptime(date_str,"%Y-%m-%d")
week = date_p.weekday() ##周几(0-6)
if(hour >= 21) :
if(week == 0):
date_p = date_p - datetime.timedelta(days=3)
else :
date_p = date_p - datetime.timedelta(days=1)
if hour < 10: # 如果小时小于两位, 补0
hour = "0" + str(hour)
if minute < 10: # 如果分钟小于两位, 补0
minute = "0" + str(minute)
return date_p.strftime("%Y-%m-%d") + " " + str(hour) + ":" + str(minute) + ":00"
##return str(year) + "-" + str(month) + "-" + str(day) + " " + str(hour) + ":" + str(minute) + ":00"
def name_to_symbol(name) -> str :
prefix = name[0:2]
future_name = name[3:-4]
exchange=Exchange.AMEX
if prefix == '28' :
exchange = Exchange.CZCE
if prefix == '29' :
exchange = Exchange.DCE
future_name=future_name.lower()
if prefix == '30' :
exchange = Exchange.SHFE
future_name=future_name.lower()
if prefix == '47' :
exchange = Exchange.CFFEX
if prefix == '74' :
exchange = Exchange.AMEX
if "L8" in future_name.upper() or "L9" in future_name.upper():
future_name=future_name.upper()
return future_name,exchange
# 根据通达信.lc1文件,生成对应名称的csv文件
def lc1_csv(filepath, name, targetdir) -> None:
# (通达信.lc1文件路径, 通达信.lc1文件名称, 处理后要保存到的文件夹)
with open(filepath, 'rb') as f: # 读取通达信.lc1文件,并处理
file_object_path = targetdir + name + '.csv' # 设置处理后保存文件的路径和名称
file_object = open(file_object_path, 'w+') # 打开新建的csv文件,开始写入数据
title_list = "symbol,exchange,datetime,open,high,low,close,open_interest,volume,settlement_price\n" # 定义csv文件标题
file_object.writelines(title_list) # 将文件标题写入到csv中
symbol,exchange = name_to_symbol(name)
while True:
li2 = f.read(32) # 读取一个1分钟数据
if not li2: # 如果没有数据了,就退出
break
data2 = struct.unpack('HHffffllf', li2) # 解析数据
date_str = get_date_str(data2[0], data2[1]) # 解析日期和分时
data2_list = list(data2) # 将数据转成list
data2_list[1] = date_str # 将list二个元素更改为日期 时:分
del (data2_list[0]) # 删除list第一个元素
for dl in range(len(data2_list)): # 将list中的内容都转成字符串str
data2_list[dl] = str(data2_list[dl])
data2_str = ",".join(data2_list) # 将list转换成字符串str
data2_str += "\n" # 添加换行
data2_str =symbol + "," + str(exchange) + "," + data2_str
if date_str != "":
file_object.writelines(data2_str) # 写入一行数据
file_object.close() # 完成数据写入
##日线文件转CSV
def day_csv(dirname,fname,targetDir):
ofile=open(dirname+os.sep+fname,'rb')
buf=ofile.read()
ofile.close()
ifile=open(targetDir+os.sep+fname+'.csv','w')
num=len(buf)
no=num/32
b=0
e=32
line=''
linename=str('datetime')+','+str('open')+','+str('high')+','+str('low')+','+str('close')+','+str('open_interest')+','+str('volume')+','+str('settlement_price')+''+'\n'
# print line
ifile.write(linename)
# for i in xrange(no):
for i in range(int(no)):
a=struct.unpack('IIIIIfII',buf[b:e])
strdate = str(a[0])[0:4] + '-' + str(a[0])[4:6] + '-' + str(a[0])[6:8] + ' 00:00:00'
line=strdate+','+str(a[1]/100.0)+', '+str(a[2]/100.0)+' ,'+str(a[3]/100.0)+', '+str(a[4]/100.0)+' ,'+str(a[5])+', '+str(a[6])+' ,'+str(a[7])+''+'\n'
# print line
ifile.write(line)
b=b+32
e=e+32
ifile.close()
def csv2database(fname :str = "",target_dir :str = "",interval: Interval=Interval.MINUTE):
future_name,exchange=name_to_symbol(fname)
print(fname)
print(future_name)
print(exchange)
import_data_from_csv(target_dir + fname+'.csv', future_name,exchange,interval,"Asia/Shanghai","datetime","open","high","low","close","volume","settlement_price","open_interest","%Y-%m-%d %H:%M:%S")
if __name__ == '__main__':
argcount = len(sys.argv)
if not len(sys.argv)>=2:
print("请输入要导入数据的品种 例如IL8")
exit(0)
future = sys.argv[1].upper()
if argcount == 2:
future= '#' + future + '.lc1'
elif argcount == 3:
future= '#' + future + '.day'
# 设置通达信.day文件所在的文件夹
min_dir = 'C:\\new_tdxqh\\vipdoc\\ds\\minline\\'
day_dir = 'C:\\new_tdxqh\\vipdoc\\ds\\lday'
if argcount == 2 :
path_dir = min_dir
elif argcount == 3:
path_dir = day_dir
# 设置数据处理好后,要将csv文件保存的文件夹
target_dir = './lc1/'
# 读取文件夹下的通达信.day文件
listfile = os.listdir(path_dir)
# 逐个处理文件夹下的通达信.day文件,并生成对应的csv文件,保存到../day/文件夹下
for fname in listfile:
if future in fname:
if argcount == 2:
lc1_csv(path_dir + fname, fname, target_dir)
csv2database(fname,target_dir,Interval.MINUTE)
elif argcount == 3:
day_csv(path_dir, fname, target_dir)
csv2database(fname,target_dir,Interval.DAILY)
else:
print('The for ' + path_dir + ' to ' + target_dir + ' loop is over')
print("文件转换已完成")