VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 6
声望: 1

之前已经有小伙伴分享过天勤替换米筐接口的办法,由于他只是使用了一个最近数据接口导致无法下载更久的数据来做回测。我的实现是可以完整地使用他来下载过去的历史数据,暂时还没有实现tick数据的处理

import pandas as pd
from tqsdk import TqApi
from datetime import timedelta
from typing import List, Optional
import os

from tqsdk.auth import TqAuth
from tqsdk.tools import DataDownloader

from .setting import SETTINGS
from .constant import Exchange, Interval
from .object import BarData, HistoryRequest

# 时间戳对齐
TIME_GAP = 8 * 60 * 60 * 1000000000
INTERVAL_VT2TQ = {
    Interval.TICK: 0,
    Interval.MINUTE: 60,
    Interval.HOUR: 60 * 60,
    Interval.DAILY: 60 * 60 * 24,
}

class TianqinClient:
    """
    Client for querying history data from Tianqin.
    """
    def __init__(self):
        """"""
        self.inited: bool = False
        self.symbols: set = set()
        self.api = None

        self.username: str = SETTINGS["tqdata.username"]
        self.password: str = SETTINGS["tqdata.password"]

    def init(self) -> bool:
        """"""
        if self.inited:
            return True

        if not self.username or not self.password:
            return False

        try:
            self.api = TqApi(auth=TqAuth(self.username, self.password))
            # 获得全部合约
            self.symbols = [k for k, v in self.api._data["quotes"].items()]
        except:
            return False

        self.inited = True
        return True

    def to_tq_symbol(self, symbol: str, exchange: Exchange) -> str:
        """
        TQSdk exchange first
        """
        for count, word in enumerate(symbol):
            if word.isdigit():
                break

        # Check for index symbol
        time_str = symbol[count:]
        if "88" in time_str:
            return f"KQ.m@{exchange.value}.{symbol[:count]}"
        if "99" in time_str:
            return f"KQ.i@{exchange.value}.{symbol[:count]}"

        return f"{exchange.value}.{symbol}"

    def query_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
        """
        Query history bar data from TqSdk.
        """
        symbol = req.symbol
        exchange = req.exchange
        interval = req.interval
        start = req.start
        end = req.end

        tq_symbol = self.to_tq_symbol(symbol, exchange)
        if tq_symbol not in self.symbols:
            return None

        tq_interval = INTERVAL_VT2TQ.get(interval)
        if not tq_interval:
            return None

        # For querying night trading period data
        end += timedelta(1)

        total_num = int((end - start).total_seconds() / tq_interval)
        if total_num > 8964:
            df = self.download_history(start, end, tq_symbol, tq_interval)
        else:
            # 只能用来补充最新的数据,无法指定日期
            df = self.api.get_kline_serial(tq_symbol, tq_interval, 8000).sort_values(by=["datetime"])

            # 时间戳对齐
            df["datetime"] = pd.DatetimeIndex(pd.to_datetime(df["datetime"] + TIME_GAP)).tz_localize('UTC').tz_convert('Asia/Shanghai')

            # 过滤开始结束时间
            df = df[(df['datetime'] >= start - timedelta(days=1)) & (df['datetime'] < end)]

        data: List[BarData] = []

        if df is not None:
            for ix, row in df.iterrows():
                bar = BarData(
                    symbol=symbol,
                    exchange=exchange,
                    interval=interval,
                    datetime=row["datetime"].to_pydatetime(),
                    open_price=row["open"],
                    high_price=row["high"],
                    low_price=row["low"],
                    close_price=row["close"],
                    volume=row["volume"],
                    open_interest=row.get("open_oi", 0),
                    gateway_name="TQ",
                )
                data.append(bar)
        return data

    def download_history(self, start, end, symbol, interval):
        """
        下载CSV文件回来,并转换成dataframe
        """
        csv_file = "tqdata.csv"
        status = DataDownloader(api=self.api, symbol_list=[symbol], start_dt=start, end_dt=end, dur_sec=interval, csv_file_name=csv_file)
        while not status.is_finished():
            self.api.wait_update()

        if os.path.exists(csv_file):
            df = pd.read_csv(csv_file)
            df["datetime"] = pd.to_datetime(df["datetime"])

            if interval > 0:
                df = df.rename(columns={f"{symbol}.open": "open"})
                df = df.rename(columns={f"{symbol}.high": "high"})
                df = df.rename(columns={f"{symbol}.low": "low"})
                df = df.rename(columns={f"{symbol}.close": "close"})
                df = df.rename(columns={f"{symbol}.volume": "volume"})
                df = df.rename(columns={f"{symbol}.open_oi": "open_oi"})

            os.unlink(csv_file)
            return df
        else:
            return None

tqdata_client = TianqinClient()

回测以及CTA引擎,我都加了处理,可以在setting上面配置完成就直接使用。由于改的代码有点多,我懒得一一地将相关的代码放到这边。

有兴趣的可以到我的Github那边去看一下,我具体改了什么咯。

https://github.com/KimChow/vnpy

Member
avatar
加入于:
帖子: 35
声望: 1

点赞!正需要,感谢

Member
avatar
加入于:
帖子: 32
声望: 0

谢谢分享

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】