Python量化数据仓库搭建系列3:数据落库代码封装

本系列教程为量化开发者,提供本地量化金融数据仓库的搭建教程与全套源代码。我们以恒有数(UDATA)金融数据社区为数据源,将金融基础数据落到本地数据库。教程提供全套源代码,包括历史数据下载与增量数据更新,数据更新任务部署与日常监控等操作。

在上一节讲述中,我们封装了Python操作MySQL数据库的自定义类,存为MySQLOperation.py文件;本节内容操作数据库部分,将会调用MySQLOperation中的方法,以及pandas.to_sql和pandas.read_sql的操作。

一、恒有数(UDATA)操作简介

1、获取Token

A、在恒有数官网(https://udata.hs.net )注册并登录,在订阅页面,下单免费的体验套餐;

B、在右上角,头像下拉菜单中,进入总览页面,复制Token;

C、在数据页面,查看数据接口文档,获取接口名称、请求参数、返回参数和Python代码示例;

2、安装hs_udata

pip install hs_udata

使用示例如下:

import hs_udata as hs
# 设置Token
hs.set_token(token = 'xxxxxxxxxxxxxxx')  # 总览页面获取个人Token
# 以分钟线行情为例,获取000001.SZ在2021-05-01至2021-06-01期间的分钟线数据
# 接口文档见:https://udata.hs.net/datas/342/
df = hs.stock_quote_minutes(en_prod_code="000001.SZ",begin_date="20210501",end_date="20210601")
# 增加股票代码列
df['hs_code']='000001.SZ'
df.head()

description

其余接口使用过程与之类似;

二、数据落库示例

以股票列表(stock_list)为例,讲解建表、落库、查询等操作;全套代码见本文第三章;

1、准备工作

(1)在MySQL数据库中,创建数据库udata,创建过程见第一讲《Python量化数据仓库搭建系列1:数据库安装与操作》;

(2)在MySQL数据库中,创建数据更新记录表udata.tb_update_records,表结构如下:

description

建表SQL如下:

CREATE TABLE udata.tb_update_records (
                table_name CHAR(40),
                data_date CHAR(20),
                update_type CHAR(20),
                data_number INT(20),
                elapsed_time INT(20),
                updatetime CHAR(20)
                )

(3)将Token与数据库参数写入配置文件DB_MySQL.config,文件内容如下:

[udata]
token='你的Token'
host='127.0.0.1'
port=3306
user='root'
passwd='密码'
db='udata'

(4)读取配置文件中的参数

import configparser
# 读取配置文件中,恒有数和数据库参数
configFilePath = 'DB_MySQL.config'
section = 'udata'
config = configparser.ConfigParser()
config.read(configFilePath)
# 读取 恒有数(UData) 的 token
token = eval(config.get(section=section, option='token'))
# MySQL连接参数读取
host = eval(config.get(section=section, option='host'))
port = int(config.get(section=section, option='port'))
db = eval(config.get(section=section, option='db'))
user = eval(config.get(section=section, option='user'))
passwd = eval(config.get(section=section, option='passwd'))

2、建表

在社区数据页面,查看数据表返回参数字段,以股票列表(stock_list,https://udata.hs.net/datas/202/ )为例:

description

将建表、删除表格、清空数据的SQL,写入配置文件DB_Table.config,文件内容如下:

[tb_stock_list]
DROP_TABLE='DROP TABLE IF EXISTS tb_stock_list'
CREATE_TABLE='''CREATE TABLE tb_stock_list (
                secu_code CHAR(20),
                hs_code CHAR(20),
                secu_abbr CHAR(20),
                chi_name CHAR(40),
                secu_market CHAR(20), 
                listed_state CHAR(20),
                listed_sector CHAR(20),
                updatetime CHAR(20)
                )'''
DELETE_DATA = 'truncate table tb_stock_list'

建表代码如下:

# MySQL操作实例化,pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据
MySQL = MySQLOperation(host, port, db, user, passwd)
# 读取配置文件中,恒有数和数据库参数
configFilePath = 'DB_Table.config'
section = 'tb_stock_list'
config = configparser.ConfigParser()
config.read(configFilePath)
DROP_TABLE = eval(config.get(section=section, option='DROP_TABLE'))
CREATE_TABLE = eval(config.get(section=section, option='CREATE_TABLE'))
DELETE_DATA = eval(config.get(section=section, option='DELETE_DATA'))
# 删除表结构
MySQL.Execute_Code(DROP_TABLE)
# 创建表结构
MySQL.Execute_Code(CREATE_TABLE)

3、数据落库

import hs_udata as hs
from sqlalchemy import create_engine
from datetime import datetime
# 设置token
hs.set_token(token)
# 获取 股票列表 数据
df = hs.stock_list()
# 在最后一列增加系统时间戳
dt = datetime.now()
df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')
# 由于股票列表数据为全量更新,数据插入之前,先清空表中数据
MySQL.Execute_Code(DELETE_DATA)
# sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sql
engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user,passwd,host,port,db))
# 将数据写入到MySQL中的数据表
df.to_sql(name='tb_stock_list', con=engine, index=False, if_exists='append')

在数据库中查看结果如下:

description

4、读取数据

import pandas as pd
result = pd.read_sql('''SELECT * FROM tb_stock_list ''', con=engine)
print(result.head())

三、落库代码封装

将上述几步操作,封装到一起,定义并调用python中class类的属性和方法。代码中涉及主要技术点如下:

(1)使用pymysql、pandas.to_sql和pandas.read_sql操作MySQL数据库;

(2)使用class类的方法,集成建表、插入数据和查询数据的操作;

(3)使用配置文件的方式,从本地文件中,读取数据库参数与表操作的SQL代码;

(4)使用try容错机制,结合日志函数,将执行日志打印到本地的DB_MySQL_LOG.txt文件;

import pandas as pd
import hs_udata as hs
from MySQLOperation import *
from sqlalchemy import create_engine
from datetime import datetime
import time
import configparser
import logging
import traceback
import warnings
warnings.filterwarnings("ignore")

class TB_Stock_List:

    def __init__(self,MySQL_Config,BD_Name,Table_Config,Table_Name):
        # 创建日志
        self.logging = logging
        self.logging.basicConfig(filename='DB_MySQL_LOG.txt', level=self.logging.DEBUG
                                 , format='%(asctime)s - %(levelname)s - %(message)s')
        # 读取配置文件中,恒有数和数据库参数
        configFilePath = MySQL_Config
        self.section1 = BD_Name
        config = configparser.ConfigParser()
        config.read(configFilePath)
        # 读取 恒有数(UData) 的 token
        self.token = eval(config.get(section=self.section1, option='token'))
        # MySQL连接参数读取
        self.host = eval(config.get(section=self.section1, option='host'))
        self.port = int(config.get(section=self.section1, option='port'))
        self.db = eval(config.get(section=self.section1, option='db'))
        self.user = eval(config.get(section=self.section1, option='user'))
        self.passwd = eval(config.get(section=self.section1, option='passwd'))
        # pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据
        self.MySQL = MySQLOperation(self.host, self.port, self.db, self.user, self.passwd)
        # sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sql
        self.engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(self.user
                                                                                      , self.passwd
                                                                                      , self.host
                                                                                      , self.port
                                                                                      , self.db))
        # 读取配置文件中,恒有数和数据库参数
        configFilePath = Table_Config
        self.section2 = Table_Name
        config = configparser.ConfigParser()
        config.read(configFilePath)
        self.DROP_TABLE_SQL = eval(config.get(section=self.section2, option='DROP_TABLE'))
        self.CREATE_TABLE_SQL = eval(config.get(section=self.section2, option='CREATE_TABLE'))
        self.DELETE_DATA_SQL = eval(config.get(section=self.section2, option='DELETE_DATA'))

        self.logging.info('*********************{0}.{1}*********************'.format(self.section1, self.section2))

    def CREATE_TABLE(self):
        try:
            # 删除表结构
            self.MySQL.Execute_Code('SET FOREIGN_KEY_CHECKS = 0')
            self.MySQL.Execute_Code(self.DROP_TABLE_SQL)
            # 创建表结构
            self.MySQL.Execute_Code(self.CREATE_TABLE_SQL)
            self.logging.info('表{0}.{1},表格创建成功'.format(self.section1,self.section2))
        except:
            self.logging.info('表{0}.{1},表格创建失败'.format(self.section1,self.section2))
            self.logging.debug(traceback.format_exc())

    def UPDATE_DATA(self):
        try:
            # 设置token
            hs.set_token(self.token)
            time_start = time.time()  # 计时
            # 获取 股票列表 数据
            df = hs.stock_list()
            # 在最后一列增加系统时间戳
            dt = datetime.now()
            df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')
            # 由于股票列表数据为全量更新,数据插入之前,先清空表中数据
            self.MySQL.Execute_Code(self.DELETE_DATA_SQL)
            # 将数据写入到MySQL中的数据表
            df.to_sql(name='tb_stock_list', con=self.engine, index=False, if_exists='append')
            time_end = time.time()  # 计时
            elapsed_time = round(time_end-time_start,2)
            # 向mysql库中记录一条数据更新记录:表名,数据日期,更新方式,更新条数,更新耗时,系统时间
            self.RECORDS_SQL = '''INSERT INTO udata.tb_update_records 
            VALUES ('{0}','{1}','全量',{2},{3}, SYSDATE())'''.format(self.section2
                                                                   ,dt.strftime('%Y-%m-%d'),len(df),elapsed_time)
            self.MySQL.Execute_Code(self.RECORDS_SQL)
            self.logging.info('表{0}.{1},数据更新成功'.format(self.section1,self.section2))
        except:
            self.logging.info('表{0}.{1},数据更新失败'.format(self.section1,self.section2))
            self.logging.debug(traceback.format_exc())

    def READ_DATA(self, WhereCondition=''):
        try:
            result = pd.read_sql('''SELECT * FROM {0}.{1} '''.format(self.section1,self.section2)
                                 + WhereCondition, con=self.engine)
            self.logging.info('表{0}.{1},数据读取成功'.format(self.section1,self.section2))
            return result
        except:
            self.logging.info('表{0}.{1},数据读取失败'.format(self.section1,self.section2))
            self.logging.debug(traceback.format_exc())
            return 0

if __name__ == '__main__':
    MySQL_Config = 'DB_MySQL.config'
    BD_Name = 'udata'
    Table_Config = 'DB_Table.config'
    Table_Name = 'tb_stock_list'
    # 实例化
    TB_Stock_List_Main = TB_Stock_List(MySQL_Config,BD_Name,Table_Config,Table_Name)
    # 创建表结构
    TB_Stock_List_Main.CREATE_TABLE()
    # 更新数据
    TB_Stock_List_Main.UPDATE_DATA()
    # 读取数据
    data = TB_Stock_List_Main.READ_DATA()

四、代码及配置文件下载

(1)附件文件目录:

DB_MySQL.config

DB_Table.config

MySQLOperation.py

TB_Stock_List.py

(文件下载请前往 https://developer.hs.net/thread/1501 ,附件位于文章末端)

(2)下载附件文件后,将配置文件DB_MySQL.config中的Token与数据库参数,修改为自己的参数;将代码放置在同一目录下执行。

下一节《Python量化数据仓库搭建系列4:股票数据落库》