代码如下,不复杂
`
"""
获取币安交易所的历史行情数据(K线数据),包括:

  • 现货 spot
  • 合约:future
  • 币本位合约:coin_future
    """
    import requests
    import pytz
    import time
    from enum import Enum
    from dataclasses import dataclass
    from datetime import datetime
    import threading

from mongoengine import (
Document,
StringField,
DateTimeField,
FloatField,
connect
)

from vnpy.trader.object import BarData, Interval
from vnpy.trader.constant import Exchange
from vnpy.trader.database import database_manager

from vnpy.database.mongodb.mongodb_database import DbBarData

TRADING_PAIRS ={
'BTCUSDT':{'spot':'2017-8-14 8:00','future':'2019-9-8 8:00'}, # spot:现货行情开始时间,future:永续合约行情开始时间
'ETHUSDT':{'spot':'2017-8-14 8:00','future':'2019-11-27 8:00'},
'XRPUSDT':{'spot':'2018-4-30 8:00','future':'2020-1-6 8:00'},
'ADAUSDT':{'spot':'2018-4-16 8:00','future':'2021-4-20 8:00'},
}

class DbFundingRageData(Document):
symbol:str = StringField()
exchange:str = StringField()

fundingTime:datetime = DateTimeField()
fundingRate:float = FloatField()

meata = {
    "indexes":[
        {
            'fields':('symbol', 'exchange', 'fundingTime'),
            "unique": True,
        }
    ]
}

class DataType(Enum):
SPOT = 'spot' # 现货
FUTURE = 'future' # 合约
COINE_FUTURE = 'coin_future' # 币本位合约

BASE_URL_S = 'https://api.binance.com' # 现货
KLINE_URL_S = '/api/v3/klines' # K线数据

BASE_URL_F = 'https://fapi.binance.com' # U本位永续合约
KLINE_URL_F = '/fapi/v1/klines' # U本位永续合约K线
FUNDINGRATE_F ='/fapi/v1/fundingRate' # 资金费率

BINANCE_SPOT_LIMIT = 1000
BINANCE_FUTURE_LIMIT = 1500
FUNDINGRATE_LIMIT = 1000

proxies = {

# "http": "http://10.10.1.10:3128",

"https": "http://127.0.0.1:1080",

}

proxies = None

CHINA_TZ = pytz.timezone("Asia/Shanghai")

def convert_param_to_url(params:dict):
return '&'.join([f'{k}={v}' for k, v in params.items() if v])

def get_klines(data_type:DataType, symbol:str, interval:Interval, startTime, endTime=None):
exchange = Exchange.BINANCE
if isinstance(startTime, str):
startTime = int(datetime.strptime(startTime, '%Y-%m-%d %H:%M').timestamp() * 1000)

    # endTime = int(datetime.strptime(endTime, '%Y-%m-%d %H:%M').timestamp() * 1000)
    endTime = datetime.now().timestamp() * 1000

while True:
    start_time_str = datetime.fromtimestamp(startTime / 1000).strftime('%Y-%m-%d %H:%M')
    print(f"{symbol}.{exchange.value}:开始时间:{start_time_str}")

    kline_param = {
        'symbol':symbol.upper(),  # must
        'interval': interval.value, #must   ENUM    YES 
        'startTime': startTime, # 毫秒
    }

    param_url = convert_param_to_url(kline_param)

    if data_type == DataType.SPOT:
        kline_param['limit'] = BINANCE_SPOT_LIMIT
        url = f'{BASE_URL_S}{KLINE_URL_S}?{param_url}'

        symbol = kline_param['symbol'].lower()
        gateway_name = 'BINANCE'

    elif data_type == DataType.FUTURE:
        kline_param['limit'] = BINANCE_FUTURE_LIMIT
        url = f'{BASE_URL_F}{KLINE_URL_F}?{param_url}'        

        symbol = kline_param['symbol'].upper()
        gateway_name = 'BINANCES'

    elif data_type == DataType.COINE_FUTURE:
        pass

    try:       
        if proxies:
            resp = requests.get(url, proxies=proxies).json()
        else:
            resp = requests.get(url).json()

        buf = []
        for item in resp:
            bar = BarData(
                symbol = symbol,
                exchange = exchange,
                datetime =CHINA_TZ.localize(datetime.fromtimestamp(item[0] / 1000)),

                interval = Interval.MINUTE,
                volume = float(item[5]),
                open_interest = 0,
                open_price = float(item[1]),
                high_price = float(item[2]),
                low_price = float(item[3]),
                close_price = float(item[4]),
                gateway_name = gateway_name,   # 现货 'BINANCE' 期货'BINANCES'
            )
            # print(bar)
            buf.append(bar)

        database_manager.save_bar_data(buf)

        if (resp[-1][0] > endTime) or resp[-1][6] >= int(time.time() * 1000) - 60 * 1000:
            break

        startTime = resp[-1][0] # + 60 * 1000
    except Exception as e:
        print(e)
        print(resp)
        print(symbol)
        time.sleep(10)

def update_kline(data_type:DataType, symbol:str):

# 用于更新数据库的K线数据,主要是一分钟的K线数据
# step1:获取数据库中最新的数据的日期,作为startTime
symbol = symbol.upper() if data_type == DataType.FUTURE else symbol.lower()

latest_bar = DbBarData.objects(symbol=symbol, exchange='BINANCE').order_by('-datetime')[0]
start_time = latest_bar.datetime
start_time = int(start_time.timestamp() * 1000)
#print(f'{latest_bar.symbol}:{latest_bar.datetime}')

# step2: 获取当前时间,作为endTime
end_time = int(time.time())* 1000
# print(f'{start_time}:{end_time}')

# step3: 调用get_klines()更新数据
get_klines(data_type, symbol, Interval.MINUTE, start_time, end_time)

def save_to_db(objs):
connect(
db='vnpy',
host='localhost',
port=27017
)
for obj in objs:
dbObj = DbFundingRageData(
symbol=obj['symbol'],
exchange="BINANCE",
fundingTime=CHINA_TZ.localize(datetime.fromtimestamp(obj['fundingTime'] / 1000)),
fundingRate=obj['fundingRate']
)
dbObj.save()

def get_funding_rage(symbol:str, startTime:str, endTime:str):
symbol = symbol.upper()
startTime = int(datetime.strptime(startTime, '%Y-%m-%d %H:%M').timestamp() 1000) # 毫秒
endTime = int(datetime.strptime(endTime, '%Y-%m-%d %H:%M').timestamp()
1000) # 毫秒

params = {
    'symbol':symbol,
    'startTime':startTime,
    # 'endTime':,   
    'limit':FUNDINGRATE_LIMIT
}

params_url = convert_param_to_url(params)

url = f'{BASE_URL_F}{FUNDINGRATE_F}?{params_url}'

resp = requests.get(url).json()

save_to_db(resp)


def init_kline():
threads = []

for trading_pair in TRADING_PAIRS.keys():
    threads.append(threading.Thread(target=get_klines, args=(DataType.SPOT, Interval.MINUTE, TRADING_PAIRS[trading_pair]['spot'])))
    threads.append(threading.Thread(target=get_klines, args=(DataType.FUTURE, Interval.MINUTE, TRADING_PAIRS[trading_pair]['future'])))

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

# thread_btcusdt = threading.Thread(target=get_klines, args=(DataType.SPOT, 'BTCUSDT', Interval.MINUTE, '2017-8-14 8:00', '2021-4-20 8:00'))
# thread_ethusdt = threading.Thread(target=get_klines, args=(DataType.SPOT, 'ETHUSDT', Interval.MINUTE, '2017-8-14 8:00', '2021-4-20 8:00'))
# thread_xrpusdt = threading.Thread(target=get_klines, args=(DataType.SPOT, 'XRPUSDT', Interval.MINUTE, '2018-4-30 8:00', '2021-4-20 8:00'))
# thread_adapusdt = threading.Thread(target=get_klines, args=(DataType.SPOT, 'ADAUSDT', Interval.MINUTE, '2018-4-16 8:00', '2021-4-20 8:00'))

# thread_BTCUSDT = threading.Thread(target=get_klines, args=(DataType.FUTURE, 'BTCUSDT', Interval.MINUTE, '2019-9-8 8:00', '2021-4-20 8:00'))
# thread_ETHUSDT = threading.Thread(target=get_klines, args=(DataType.FUTURE, 'ETHUSDT', Interval.MINUTE, '2019-11-27 8:00', '2021-4-20 8:00'))
# thread_XRPUSDT = threading.Thread(target=get_klines, args=(DataType.FUTURE, 'XRPUSDT', Interval.MINUTE, '2020-1-6 8:00', '2021-4-20 8:00'))
# thread_ADAUSDT = threading.Thread(target=get_klines, args=(DataType.FUTURE, 'ADAUSDT', Interval.MINUTE, '2020-1-31 8:00', '2021-4-20 8:00'))

# thread_btcusdt.start()
# thread_ethusdt.start()
# thread_xrpusdt.start()
# thread_adapusdt.start()

# thread_BTCUSDT.start()
# thread_ETHUSDT.start()
# thread_XRPUSDT.start()
# thread_ADAUSDT.start()

# thread_btcusdt.join()
# thread_ethusdt.join()
# thread_xrpusdt.join()
# thread_adapusdt.join()

# thread_BTCUSDT.join()
# thread_ETHUSDT.join()
# thread_XRPUSDT.join()
# thread_ADAUSDT.join()

def update_klines():
threads = []

for trading_pair in TRADING_PAIRS.keys():
    threads.append(threading.Thread(target=update_kline, args=(DataType.SPOT, trading_pair)))
    threads.append(threading.Thread(target=update_kline, args=(DataType.FUTURE, trading_pair)))

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

if name == 'main':

# update_klines()
print(TRADING_PAIRS['BTCUSDT']['spot'])`