vn.py官网
开源量化社区
Member
avatar
加入于:
帖子: 10
声望: 0

在该BinancesGateway 中添加定时任务请求交易对信息

    def process_timer_event(self, event: Event) -> None:
        """"""
        self.rest_api.keep_user_stream()

        self.timer_count += 1

        if self.timer_count > 120:
            self.timer_count = 0
            self.rest_api.query_contract()
            time.sleep(2)
            # for symbol in symbol_contract_map.keys():
            # self.rest_api.query_funding_rate(symbol)
            self.rest_api.query_funding_rate()

会导致有分钟一分钟请求次数过多导致IP受到权重限制,请问一下需要怎样解决?一般该请求一次只会消耗一次权重,也就是异常不断请求了N次导致权重增加

description

整体代码文件如下

"""
Gateway for Binance Crypto Exchange.
"""

import urllib
import hashlib
import hmac
import time
from copy import copy
from datetime import datetime, timedelta
from enum import Enum
from threading import Lock
from typing import Dict, List, Tuple
from vnpy.trader.utility import round_to
import pytz
import json
from requests.exceptions import SSLError

from vnpy.api.rest import RestClient, Request
# from vnpy_rest import RestClient, Request
from vnpy.api.websocket import WebsocketClient
# from vnpy_websocket import WebsocketClient
from vnpy.trader.constant import (
    Direction,
    Exchange,
    Product,
    Status,
    OrderType,
    Interval,
    Offset
)
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
    TickData,
    OrderData,
    TradeData,
    AccountData,
    ContractData,
    PositionData,
    BarData,
    OrderRequest,
    CancelRequest,
    SubscribeRequest,
    HistoryRequest,
    FundrateData,
    QueryRequest,
    TickRequest,
)
from vnpy.trader.event import EVENT_TIMER
from vnpy.event import Event, EventEngine

F_REST_HOST: str = "https://fapi.binance.com"
F_WEBSOCKET_TRADE_HOST: str = "wss://fstream.binance.com/ws/"
F_WEBSOCKET_DATA_HOST: str = "wss://fstream.binance.com/stream?streams="

F_TESTNET_RESTT_HOST: str = "https://testnet.binancefuture.com"
F_TESTNET_WEBSOCKET_TRADE_HOST: str = "wss://stream.binancefuture.com/ws/"
F_TESTNET_WEBSOCKET_DATA_HOST: str = "wss://stream.binancefuture.com/stream?streams="

D_REST_HOST: str = "https://dapi.binance.com"
D_WEBSOCKET_TRADE_HOST: str = "wss://dstream.binance.com/ws/"
D_WEBSOCKET_DATA_HOST: str = "wss://dstream.binance.com/stream?streams="

D_TESTNET_RESTT_HOST: str = "https://testnet.binancefuture.com"
D_TESTNET_WEBSOCKET_TRADE_HOST: str = "wss://dstream.binancefuture.com/ws/"
D_TESTNET_WEBSOCKET_DATA_HOST: str = "wss://dstream.binancefuture.com/stream?streams="

STATUS_BINANCES2VT: Dict[str, Status] = {
    "NEW": Status.NOTTRADED,
    "PARTIALLY_FILLED": Status.PARTTRADED,
    "FILLED": Status.ALLTRADED,
    "CANCELED": Status.CANCELLED,
    "REJECTED": Status.REJECTED,
    "EXPIRED": Status.CANCELLED
}

ORDERTYPE_VT2BINANCES: Dict[OrderType, Tuple[str, str]] = {
    OrderType.LIMIT: ("LIMIT", "GTC"),
    OrderType.MARKET: ("MARKET", "GTC"),
    OrderType.FAK: ("LIMIT", "IOC"),
    OrderType.FOK: ("LIMIT", "FOK"),
}
ORDERTYPE_BINANCES2VT: Dict[Tuple[str, str], OrderType] = {v: k for k, v in ORDERTYPE_VT2BINANCES.items()}

DIRECTION_VT2BINANCES: Dict[Direction, str] = {
    Direction.LONG: "BUY",
    Direction.SHORT: "SELL"
}
DIRECTION_BINANCES2VT: Dict[str, Direction] = {v: k for k, v in DIRECTION_VT2BINANCES.items()}

SIDE_VT2BINANCEF: Dict[Direction, str] = {
    Direction.LONG: "LONG",
    Direction.SHORT: "SHORT"
}
SIDE_BINACES2VT: Dict[str, Direction] = {v: k for k, v in SIDE_VT2BINANCEF.items()}

INTERVAL_VT2BINANCES: Dict[Interval, str] = {
    Interval.MINUTE: "1m",
    Interval.MINUTE5: "5m",
    Interval.MINUTE3: "3m",
    Interval.MINUTE15: "15m",
    Interval.MINUTE30: "30m",
    Interval.HOUR: "1h",
    Interval.HOUR2: "2h",
    Interval.HOUR4: "4h",
    Interval.DAILY: "1d",
}

TIMEDELTA_MAP: Dict[Interval, timedelta] = {
    Interval.MINUTE: timedelta(minutes=1),
    Interval.MINUTE3: timedelta(minutes=3),
    Interval.MINUTE5: timedelta(minutes=5),
    Interval.MINUTE15: timedelta(minutes=15),
    Interval.MINUTE30: timedelta(minutes=30),
    Interval.HOUR: timedelta(hours=1),
    Interval.HOUR2: timedelta(hours=2),
    Interval.HOUR4: timedelta(hours=4),
    Interval.DAILY: timedelta(days=1),
}

CHINA_TZ = pytz.timezone("Asia/Shanghai")


class Security(Enum):
    NONE: int = 0
    SIGNED: int = 1
    API_KEY: int = 2


symbol_contract_map: Dict[str, ContractData] = {}


class BinancesGateway(BaseGateway):
    """
    VN Trader Gateway for Binance connection.
    """

    default_setting = {
        "key": "",
        "secret": "",
        "会话数": 3,
        "服务器": ["TESTNET", "REAL"],
        "合约模式": ["反向", "正向"],
        "代理地址": "",
        "代理端口": 0,
    }

    exchanges: Exchange = [Exchange.BINANCE]

    def __init__(self, event_engine: EventEngine):
        """Constructor"""
        super().__init__(event_engine, "BINANCES")

        self.trade_ws_api = BinancesTradeWebsocketApi(self)
        self.market_ws_api = BinancesDataWebsocketApi(self)
        self.rest_api = BinancesRestApi(self)
        self.timer_count = 0

    def connect(self, setting: dict) -> None:
        """"""
        key = setting["key"]
        secret = setting["secret"]
        session_number = setting["会话数"]
        server = setting["服务器"]
        proxy_host = setting["代理地址"]
        proxy_port = setting["代理端口"]

        if setting["合约模式"] == "正向":
            usdt_base = True
        else:
            usdt_base = False

        self.rest_api.connect(usdt_base, key, secret, session_number, server,
                              proxy_host, proxy_port)
        self.market_ws_api.connect(usdt_base, proxy_host, proxy_port, server)

        self.event_engine.register(EVENT_TIMER, self.process_timer_event)

    def subscribe(self, req: SubscribeRequest) -> None:
        """"""
        self.market_ws_api.subscribe(req)

    def send_order(self, req: OrderRequest) -> str:
        """"""
        return self.rest_api.send_order(req)

    def cancel_order(self, req: CancelRequest) -> Request:
        """"""
        self.rest_api.cancel_order(req)

    def query_order(self, req: QueryRequest):
        self.rest_api.query_order(req)

    def query_account(self) -> None:
        """"""
        self.rest_api.query_account()

    def query_account_one(self):
        """"""
        return self.rest_api.query_account_one()

    def query_position(self) -> None:
        """"""
        self.rest_api.query_position()

    def query_position_one(self):
        """"""
        return self.rest_api.query_position_one()

    def query_history(self, req: HistoryRequest) -> List[BarData]:
        """"""
        return self.rest_api.query_history(req)

    def query_tick(self, req: TickRequest):
        """
        API 请求TICK数据
        """
        return self.rest_api.query_tick(req)

    def query_contract(self):
        """"""
        self.rest_api.query_contract()

    def close(self) -> None:
        """"""
        self.rest_api.stop()
        self.trade_ws_api.stop()
        self.market_ws_api.stop()

    def process_timer_event(self, event: Event) -> None:
        """"""
        self.rest_api.keep_user_stream()

        self.timer_count += 1

        if self.timer_count > 120:
            self.timer_count = 0
            self.rest_api.query_contract()
            time.sleep(2)
            # for symbol in symbol_contract_map.keys():
            # self.rest_api.query_funding_rate(symbol)
            self.rest_api.query_funding_rate()



class BinancesRestApi(RestClient):
    """
    BINANCE REST API
    """

    def __init__(self, gateway: BinancesGateway):
        """"""
        super().__init__()

        self.gateway: BinancesGateway = gateway
        self.gateway_name: str = gateway.gateway_name

        self.trade_ws_api: BinancesTradeWebsocketApi = self.gateway.trade_ws_api
        # self.market_ws_api: BinancesDataWebsocketApi = self.gateway.market_ws_api

        self.key: str = ""
        self.secret: str = ""

        self.user_stream_key: str = ""
        self.keep_alive_count: int = 0
        self.recv_window: int = 5000
        self.time_offset: int = 0

        self.order_count: int = 1_000_000
        self.order_count_lock: Lock = Lock()
        self.connect_time: int = 0
        self.usdt_base: bool = False

    def sign(self, request: Request) -> Request:
        """
        Generate BINANCE signature.
        """
        security = request.data["security"]
        if security == Security.NONE:
            request.data = None
            return request

        if request.params:
            path = request.path + "?" + urllib.parse.urlencode(request.params)
        else:
            request.params = dict()
            path = request.path

        if security == Security.SIGNED:
            timestamp = int(time.time() * 1000)

            if self.time_offset > 0:
                timestamp -= abs(self.time_offset)
            elif self.time_offset < 0:
                timestamp += abs(self.time_offset)

            request.params["timestamp"] = timestamp

            query = urllib.parse.urlencode(sorted(request.params.items()))
            signature = hmac.new(self.secret, query.encode(
                "utf-8"), hashlib.sha256).hexdigest()

            query += "&signature={}".format(signature)
            path = request.path + "?" + query

        request.path = path
        request.params = {}
        request.data = {}

        # Add headers
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": "application/json",
            "X-MBX-APIKEY": self.key,
            "Connection": "close"
        }

        if security in [Security.SIGNED, Security.API_KEY]:
            request.headers = headers

        return request

    def connect(
        self,
        usdt_base: bool,
        key: str,
        secret: str,
        session_number: int,
        server: str,
        proxy_host: str,
        proxy_port: int
    ) -> None:
        """
        Initialize connection to REST server.
        """
        self.usdt_base = usdt_base
        self.key = key
        self.secret = secret.encode()
        self.proxy_port = proxy_port
        self.proxy_host = proxy_host
        self.server = server

        self.connect_time = (
            int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count
        )

        if self.server == "REAL":
            if self.usdt_base:
                self.init(F_REST_HOST, proxy_host, proxy_port)
            else:
                self.init(D_REST_HOST, proxy_host, proxy_port)
        else:
            if self.usdt_base:
                self.init(F_TESTNET_RESTT_HOST, proxy_host, proxy_port)
            else:
                self.init(D_TESTNET_RESTT_HOST, proxy_host, proxy_port)

        self.start(session_number)

        self.gateway.write_log("REST API启动成功")

        self.query_time()
        time.sleep(1)
        self.query_account()
        time.sleep(1)
        self.query_position()
        time.sleep(1)
        self.query_orders()
        time.sleep(1)
        self.query_contract()
        time.sleep(1)
        self.start_user_stream()

    def query_time(self) -> Request:
        """"""
        data = {
            "security": Security.NONE
        }

        if self.usdt_base:
            path = "/fapi/v1/time"
        else:
            path = "/dapi/v1/time"

        return self.add_request(
            "GET",
            path,
            callback=self.on_query_time,
            data=data
        )

    def query_account(self) -> Request:
        """"""
        data = {"security": Security.SIGNED}

        if self.usdt_base:
            path = "/fapi/v1/account"
        else:
            path = "/dapi/v1/account"

        self.add_request(
            method="GET",
            path=path,
            callback=self.on_query_account,
            data=data
        )

    def query_account_one(self):
        """
        单次获取账户信息  主动请求  直接返回信息
        """
        data = {"security": Security.SIGNED}

        if self.usdt_base:
            path = "/fapi/v1/account"
        else:
            path = "/dapi/v1/account"

        res = self.request(method='GET', path=path, data=data)

        if res.status_code == 200:
            account_list = []
            data = json.loads(res.text)
            for asset in data["assets"]:
                account = AccountData(
                    accountid=asset["asset"],
                    balance=float(asset["marginBalance"]), # 保证金余额
                    frozen=float(asset["positionInitialMargin"]), # 维持保证金
                    gateway_name=self.gateway_name
                )
                account_list.append(account)
            return account_list
        else:
            return []


    def query_position(self) -> Request:
        """"""
        data = {"security": Security.SIGNED}

        if self.usdt_base:
            path = "/fapi/v2/positionRisk"
        else:
            path = "/dapi/v1/positionRisk"

        self.add_request(
            method="GET",
            path=path,
            callback=self.on_query_position,
            data=data
        )

    def query_position_one(self):
        """"""
        data = {"security": Security.SIGNED}

        if self.usdt_base:
            path = "/fapi/v2/positionRisk"
        else:
            path = "/dapi/v1/positionRisk"

        res = self.request(method='GET', path=path, data=data)
        if res.status_code == 200:
            position_list = []
            data = json.loads(res.text)
            for d in data:
                if d['positionSide'] == "BOTH":
                    direction = Direction.NET
                else:
                    direction = SIDE_BINACES2VT[d["positionSide"]]
                position = PositionData(
                    symbol=d["symbol"],
                    exchange=Exchange.BINANCE,
                    direction=direction,
                    volume=float(d["positionAmt"]),
                    price=float(d["entryPrice"]),
                    pnl=float(d["unRealizedProfit"]),
                    gateway_name=self.gateway_name,
                )
                position_list.append(position)
            return position_list
        else:
            return []

    def query_orders(self) -> None:
        """"""
        data = {"security": Security.SIGNED}

        if self.usdt_base:
            path = "/fapi/v1/openOrders"
        else:
            path = "/dapi/v1/openOrders"

        self.add_request(
            method="GET",
            path=path,
            callback=self.on_query_orders,
            data=data
        )

    def query_order(self, req: QueryRequest) -> None:
        """"""
        data = {
            "security": Security.SIGNED
        }

        params = {
            "symbol": req.symbol,
            "origClientOrderId": req.orderid
        }

        if self.usdt_base:
            path = "/fapi/v1/order"
        else:
            path = "/dapi/v1/order"

        self.add_request(
            method="GET",
            path=path,
            callback=self.on_query_order,
            params=params,
            data=data,
            extra=req
        )


    def query_contract(self) -> Request:
        """"""
        data = {
            "security": Security.NONE
        }

        if self.usdt_base:
            path = "/fapi/v1/exchangeInfo"
        else:
            path = "/dapi/v1/exchangeInfo"

        self.add_request(
            method="GET",
            path=path,
            callback=self.on_query_contract,
            data=data
        )

    # 资金费率"/fapi/v1/premiumIndex
    def query_funding_rate(self, symbol=None):
        # print("获取的请求数据", symbol)
        data = {
            "security": Security.NONE
        }
        if symbol:
            params = {
                "symbol": symbol,
            }
        else:
            params = {}
        if self.usdt_base:
            path = "/fapi/v1/premiumIndex"
        else:
            path = "/dapi/v1/premiumIndex"

        self.add_request(
            method="GET",
            path=path,
            callback=self.on_query_funding_rate,
            params=params,
            data=data
        )

    def query_tick(self, req: TickRequest):
        """
        请求tick API接口
        {
          "symbol": "LTCBTC",
          "bidPrice": "4.00000000",
          "bidQty": "431.00000000",
          "askPrice": "4.00000200",
          "askQty": "9.00000000"
        }
        """
        data = {
            "security": Security.NONE
        }
        params = {
            'symbol': req.symbol,

        }
        if self.usdt_base:
            path = "/fapi/v1/ticker/bookTicker"
        else:
            path = "/dapi/v1/ticker/bookTicker"
        # print("传入进来的req", req)
        res = self.request(method='GET', path=path, params=params, data=data)
        if res.status_code == 200:
            result = json.loads(res.text)
            if isinstance(result, list):
                return result[0]
            else:
                return result
        else:
            return None


    def _new_order_id(self) -> int:
        """"""
        with self.order_count_lock:
            self.order_count += 1
            return self.order_count

    def send_order(self, req: OrderRequest) -> str:
        """"""
        orderid = "wxx-" + str(self.connect_time + self._new_order_id())
        order = req.create_order_data(
            orderid,
            self.gateway_name
        )
        self.gateway.on_order(order)

        data = {
            "security": Security.SIGNED
        }

        order_type, time_condition = ORDERTYPE_VT2BINANCES[req.type]

        params = {
            "symbol": req.symbol,
            "side": DIRECTION_VT2BINANCES[req.direction],
            "type": order_type,
            "timeInForce": time_condition,
            "price": float(req.price),
            "quantity": float(req.volume),
            "newClientOrderId": orderid,
        }

        if req.offset == Offset.CLOSE:
            params["reduceOnly"] = True

        if self.usdt_base:
            path = "/fapi/v1/order"
        else:
            path = "/dapi/v1/order"

        self.add_request(
            method="POST",
            path=path,
            callback=self.on_send_order,
            data=data,
            params=params,
            extra=order,
            on_error=self.on_send_order_error,
            on_failed=self.on_send_order_failed
        )

        return order.vt_orderid

    def cancel_order(self, req: CancelRequest) -> Request:
        """"""
        data = {
            "security": Security.SIGNED
        }

        params = {
            "symbol": req.symbol,
            "origClientOrderId": req.orderid
        }

        if self.usdt_base:
            path = "/fapi/v1/order"
        else:
            path = "/dapi/v1/order"

        self.add_request(
            method="DELETE",
            path=path,
            callback=self.on_cancel_order,
            params=params,
            data=data,
            extra=req
        )

    def on_default_error(
            self, exception_type: type, exception_value: Exception, tb, request: Request
    ):
        """
        Callback when cancelling order failed on server.
        """
        # Record exception if not ConnectionError
        if not issubclass(exception_type, ConnectionError) and not issubclass(exception_type, ConnectionResetError):
            self.on_error(exception_type, exception_value, tb, request)

    def start_user_stream(self) -> Request:
        """"""
        data = {
            "security": Security.API_KEY
        }

        if self.usdt_base:
            path = "/fapi/v1/listenKey"
        else:
            path = "/dapi/v1/listenKey"

        self.add_request(
            method="POST",
            path=path,
            callback=self.on_start_user_stream,
            data=data
        )

    def keep_user_stream(self) -> Request:
        """"""
        self.keep_alive_count += 1
        if self.keep_alive_count < 300:
            return
        self.keep_alive_count = 0

        data = {
            "security": Security.API_KEY
        }

        params = {
            "listenKey": self.user_stream_key
        }

        if self.usdt_base:
            path = "/fapi/v1/listenKey"
        else:
            path = "/dapi/v1/listenKey"
        self.add_request(
            method="PUT",
            path=path,
            callback=self.on_keep_user_stream,
            on_error=self.on_keep_user_stream_error,
            params=params,
            data=data,
            on_failed=self.on_keep_user_stream_failed
        )

    def on_query_time(self, data: dict, request: Request) -> None:
        """"""
        local_time = int(time.time() * 1000)
        server_time = int(data["serverTime"])
        self.time_offset = local_time - server_time

    def on_query_account(self, data: dict, request: Request) -> None:
        """"""
        for asset in data["assets"]:
            account = AccountData(
                accountid=asset["asset"],
                # balance=float(asset["walletBalance"]),
                # frozen=float(asset["maintMargin"]),
                balance=float(asset["marginBalance"]),  # 保证金余额
                frozen=float(asset["positionInitialMargin"]),  # 维持保证金
                gateway_name=self.gateway_name
            )

            # if account.balance:
            self.gateway.on_account(account)

        self.gateway.write_log("账户资金查询成功")

    def on_query_position(self, data: list, request: Request) -> None:
        """"""
        #symbols = self.market_ws_api.ticks.keys()
        for d in data:
            if d['positionSide'] == "BOTH":
                direction = Direction.NET
            else:
                direction = SIDE_BINACES2VT[d["positionSide"]]
            position = PositionData(
                symbol=d["symbol"],
                exchange=Exchange.BINANCE,
                direction=direction,
                volume=float(d["positionAmt"]),
                price=float(d["entryPrice"]),
                pnl=float(d["unRealizedProfit"]),
                gateway_name=self.gateway_name,
            )

            if position.volume:
            #     volume = d["positionAmt"]
            #     if '.' in volume:
            #         position.volume = float(d["positionAmt"])
            #     else:
            #         position.volume = int(d["positionAmt"])

                self.gateway.on_position(position)

        self.gateway.write_log("持仓信息查询成功")

    def on_query_orders(self, data: dict, request: Request) -> None:
        """"""
        for d in data:
            key = (d["type"], d["timeInForce"])
            order_type = ORDERTYPE_BINANCES2VT.get(key, None)
            if not order_type:
                continue

            order = OrderData(
                orderid=d["clientOrderId"],
                symbol=d["symbol"],
                exchange=Exchange.BINANCE,
                price=float(d["price"]),
                volume=float(d["origQty"]),
                type=order_type,
                direction=DIRECTION_BINANCES2VT[d["side"]],
                traded=float(d["executedQty"]),
                status=STATUS_BINANCES2VT.get(d["status"], None),
                datetime=generate_datetime(d["time"]),
                gateway_name=self.gateway_name,
            )
            self.gateway.on_order(order)

        self.gateway.write_log("委托信息查询成功")

    def on_query_order(self, data: dict, request: Request) -> None:
        """"""

        key = (data["type"], data["timeInForce"])
        order_type = ORDERTYPE_BINANCES2VT.get(key, None)
        if not order_type:
            return

        order = OrderData(
            orderid=data["clientOrderId"],
            symbol=data["symbol"],
            exchange=Exchange.BINANCE,
            price=float(data["price"]),
            volume=float(data["origQty"]),
            type=order_type,
            direction=DIRECTION_BINANCES2VT[data["side"]],
            traded=float(data["executedQty"]),
            status=STATUS_BINANCES2VT.get(data["status"], None),
            datetime=generate_datetime(data["time"]),
            gateway_name=self.gateway_name,
        )
        self.gateway.on_order(order)

        self.gateway.write_log("订单信息查询成功")

    def on_query_contract(self, data: dict, request: Request) -> None:
        """"""
        for d in data["symbols"]:
            # if d["status"] == TransactionStatus.CLOSE:
            #     continue
            base_currency = d["baseAsset"]
            quote_currency = d["quoteAsset"]
            name = f"{base_currency.upper()}/{quote_currency.upper()}"

            pricetick = 1
            min_volume = 1

            for f in d["filters"]:
                if f["filterType"] == "PRICE_FILTER":
                    pricetick = float(f["tickSize"])
                elif f["filterType"] == "LOT_SIZE":
                    min_volume = float(f["stepSize"])

            contract = ContractData(
                symbol=d["symbol"],
                exchange=Exchange.BINANCE,
                name=name,
                pricetick=pricetick,
                size=1,
                min_volume=min_volume,
                product=Product.FUTURES,
                net_position=True,
                history_data=True,
                gateway_name=self.gateway_name,
            )
            self.gateway.on_contract(contract)

            symbol_contract_map[contract.symbol] = contract

        self.gateway.write_log("合约信息查询成功")


    def on_query_funding_rate(self, data, request: Request):
        # print("on_query_funding_rate:", data)
        # print("request 返回结果:", request)
        if isinstance(data, list): # 当币本位会返回list  或者当获取全局
            for d in data:
                fund_rate = FundrateData(
                    symbol=d['symbol'],
                    exchange=Exchange.BINANCE,
                    fund_rate=d['lastFundingRate'],
                    next_funding_time=d['nextFundingTime'],
                    datetime=d['time'],
                    gateway_name=self.gateway_name,
                )
                self.gateway.on_fundrate(fund_rate)
                # self.gateway.write_log("查询资金费率")
        else:
            fund_rate = FundrateData(
                symbol=data['symbol'],
                exchange=Exchange.BINANCE,
                fund_rate=data['lastFundingRate'],
                next_funding_time=data['nextFundingTime'],
                datetime=data['time'],
                gateway_name=self.gateway_name,
            )
            self.gateway.on_fundrate(fund_rate)
            # self.gateway.write_log("查询资金费率")

    def on_send_order(self, data: dict, request: Request) -> None:
        """"""
        pass

    def on_send_order_failed(self, status_code: str, request: Request) -> None:
        """
        Callback when sending order failed on server.
        """
        order = request.extra
        order.status = Status.REJECTED
        self.gateway.on_order(order)

        msg = f"委托失败,状态码:{status_code},信息:{request.response.text}"
        self.gateway.write_log(msg)

    def on_send_order_error(
        self, exception_type: type, exception_value: Exception, tb, request: Request
    ) -> None:
        """
        Callback when sending order caused exception.
        """
        order = request.extra
        order.status = Status.REJECTED
        self.gateway.on_order(order)

        # Record exception if not ConnectionError
        if not issubclass(exception_type, (ConnectionError, SSLError)):
            self.on_error(exception_type, exception_value, tb, request)

    def on_cancel_order(self, data: dict, request: Request) -> None:
        """"""
        pass

    def on_start_user_stream(self, data: dict, request: Request) -> None:
        """"""
        self.user_stream_key = data["listenKey"]
        self.keep_alive_count = 0

        if self.server == "REAL":
            url = F_WEBSOCKET_TRADE_HOST + self.user_stream_key
            if not self.usdt_base:
                url = D_WEBSOCKET_TRADE_HOST + self.user_stream_key
        else:
            url = F_TESTNET_WEBSOCKET_TRADE_HOST + self.user_stream_key
            if not self.usdt_base:
                url = D_TESTNET_WEBSOCKET_TRADE_HOST + self.user_stream_key

        self.trade_ws_api.connect(url, self.proxy_host, self.proxy_port)

    def on_keep_user_stream(self, data: dict, request: Request) -> None:
        """"""
        pass

    def on_keep_user_stream_error(
            self, exception_type: type, exception_value: Exception, tb, request: Request
    ):
        """
        Callback when cancelling order failed on server.
        """
        # Record exception if not ConnectionError
        if not issubclass(exception_type, ConnectionError) and not issubclass(exception_type, ConnectionResetError):
            # self.on_error(exception_type, exception_value, tb, request)
            print(f"币安接口: exception_type错误类型:{exception_type}, exception_value:{exception_value}, tb:{tb}, request:{request}")
            self.keep_user_stream()


    def on_keep_user_stream_failed(self, data, request: Request):
        """
        保持Listen Key失败
        """
        if data // 100 != 2:
            respone_data = request.response.json()
            error_msg = respone_data['msg']
            self.start_user_stream()
            if "This listenKey does not exist." == error_msg:
                print("保持Listen Key失败")
                # self.start_user_stream()
            pass

    def query_history(self, req: HistoryRequest) -> List[BarData]:
        """"""
        history = []
        limit = 999
        if self.usdt_base:
            start_time = int(datetime.timestamp(req.start))
        else:
            end_time = int(datetime.timestamp(req.end))

        while True:
            # Create query params
            params = {
                "symbol": req.symbol,
                "interval": INTERVAL_VT2BINANCES[req.interval],
                "limit": limit
            }

            if self.usdt_base:
                params["startTime"] = start_time * 1000
                path = "/fapi/v1/klines"
                if req.end:
                    end_time = int(datetime.timestamp(req.end))
                    params["endTime"] = end_time * 1000     # convert to millisecond

            else:
                params["endTime"] = end_time * 1000
                path = "/dapi/v1/klines"
                if req.start:
                    start_time = int(datetime.timestamp(req.start))
                    params["startTime"] = start_time * 1000     # convert to millisecond

            resp = self.request(
                "GET",
                path=path,
                data={"security": Security.NONE},
                params=params
            )

            # Break if request failed with other status code
            if resp.status_code // 100 != 2:
                msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}"
                self.gateway.write_log(msg)
                break
            else:
                data = resp.json()
                if not data:
                    msg = f"获取历史数据为空,开始时间:{start_time}"
                    self.gateway.write_log(msg)
                    break

                buf = []

                for l in data:
                    if generate_datetime(l[0]) < req.end:
                        bar = BarData(
                            symbol=req.symbol,
                            exchange=req.exchange,
                            datetime=generate_datetime(l[0]),
                            interval=req.interval,
                            volume=float(l[5]),
                            open_price=float(l[1]),
                            high_price=float(l[2]),
                            low_price=float(l[3]),
                            close_price=float(l[4]),
                            quote_volume=float(l[7]),
                            trade_num=int(l[8]),
                            taker_buy_base_asset_volume=float(l[9]),
                            taker_buy_quote_asset_volume=float(l[10]),
                            gateway_name=self.gateway_name
                        )
                        buf.append(bar)

                begin = buf[0].datetime
                end = buf[-1].datetime

                if not self.usdt_base:
                    buf = list(reversed(buf))
                history.extend(buf)
                msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}"
                self.gateway.write_log(msg)

                # Break if total data count less than limit (latest date collected)
                if len(data) < limit:
                    break

                # Update start time
                if self.usdt_base:
                    start_dt = bar.datetime + TIMEDELTA_MAP[req.interval]
                    start_time = int(datetime.timestamp(start_dt))
                # Update end time
                else:
                    end_dt = begin - TIMEDELTA_MAP[req.interval]
                    end_time = int(datetime.timestamp(end_dt))

        if not self.usdt_base:
            history = list(reversed(history))
        return history


class BinancesTradeWebsocketApi(WebsocketClient):
    """"""

    def __init__(self, gateway: BinancesGateway):
        """"""
        super().__init__()

        self.gateway: BinancesGateway = gateway
        self.gateway_name: str = gateway.gateway_name

    def connect(self, url: str, proxy_host: str, proxy_port: int) -> None:
        """"""
        self.init(url, proxy_host, proxy_port)
        self.start()

    def on_connected(self) -> None:
        """"""
        self.gateway.write_log("交易Websocket API连接成功")

    def on_packet(self, packet: dict) -> None:  # type: (dict)->None
        """"""
        if packet["e"] == "ACCOUNT_UPDATE":
            self.on_account(packet)
        elif packet["e"] == "ORDER_TRADE_UPDATE":
            self.on_order(packet)

    def on_account(self, packet: dict) -> None:
        """"""
        for acc_data in packet["a"]["B"]:
            account = AccountData(
                accountid=acc_data["a"],
                balance=float(acc_data["wb"]),
                frozen=float(acc_data["wb"]) - float(acc_data["cw"]),
                gateway_name=self.gateway_name
            )

            # if account.balance:
            self.gateway.on_account(account)

        for pos_data in packet["a"]["P"]:
            if pos_data['ps'] == "BOTH":
                direction = Direction.NET
            else:
                direction = SIDE_BINACES2VT[pos_data["ps"]]
            volume = pos_data["pa"]

            position = PositionData(
                symbol=pos_data["s"],
                exchange=Exchange.BINANCE,
                direction=direction,
                volume=float(volume),
                price=float(pos_data["ep"]),
                pnl=float(pos_data["cr"]),
                gateway_name=self.gateway_name,
            )
            self.gateway.on_position(position)

    def on_order(self, packet: dict) -> None:
        """"""
        ord_data = packet["o"]
        key = (ord_data["o"], ord_data["f"])
        order_type = ORDERTYPE_BINANCES2VT.get(key, None)
        if not order_type:
            return

        order = OrderData(
            symbol=ord_data["s"],
            exchange=Exchange.BINANCE,
            orderid=str(ord_data["c"]),
            type=order_type,
            direction=DIRECTION_BINANCES2VT[ord_data["S"]],
            price=float(ord_data["p"]),
            volume=float(ord_data["q"]),
            traded=float(ord_data["z"]),
            status=STATUS_BINANCES2VT[ord_data["X"]],
            datetime=generate_datetime(packet["E"]),
            gateway_name=self.gateway_name
        )

        self.gateway.on_order(order)

        # Round trade volume to minimum trading volume
        trade_volume = float(ord_data["l"])

        contract = symbol_contract_map.get(order.symbol, None)
        if contract:
            trade_volume = round_to(trade_volume, contract.min_volume)

        if not trade_volume:
            return

        # Push trade event
        trade = TradeData(
            symbol=order.symbol,
            exchange=order.exchange,
            orderid=order.orderid,
            tradeid=ord_data["t"],
            direction=order.direction,
            price=float(ord_data["L"]),
            volume=trade_volume,
            datetime=generate_datetime(ord_data["T"]),
            gateway_name=self.gateway_name,
        )
        self.gateway.on_trade(trade)


class BinancesDataWebsocketApi(WebsocketClient):
    """"""

    def __init__(self, gateway: BinancesGateway):
        """"""
        super().__init__()

        self.gateway: BinancesGateway = gateway
        self.gateway_name: str = gateway.gateway_name

        self.ticks: Dict[str, TickData] = {}
        self.bars: Dict[str, BarData] = {}
        self.usdt_base = False

    def connect(
        self,
        usdt_base: bool,
        proxy_host: str,
        proxy_port: int,
        server: str
    ) -> None:
        """"""
        self.usdt_base = usdt_base
        self.proxy_host = proxy_host
        self.proxy_port = proxy_port
        self.server = server

    def on_connected(self) -> None:
        """"""
        self.gateway.write_log("行情Websocket API连接刷新")

    def subscribe(self, req: SubscribeRequest) -> None:
        """"""
        if req.symbol not in symbol_contract_map:
            self.gateway.write_log(f"找不到该合约代码{req.symbol}")
            return

        # Create tick buf data
        tick = TickData(
            symbol=req.symbol,
            name=symbol_contract_map[req.symbol].name,
            exchange=Exchange.BINANCE,
            datetime=datetime.now(CHINA_TZ),
            gateway_name=self.gateway_name,
        )
        self.ticks[req.symbol.lower()] = tick
        # Create bar buf data
        bar = BarData(
            symbol=req.symbol,
            exchange=Exchange.BINANCE,
            datetime=datetime.now(CHINA_TZ),
            gateway_name=self.gateway_name,
            interval=Interval.MINUTE
        )

        self.bars[req.symbol.lower()] = bar

        # Close previous connection
        if self._active:
            self.stop()
            self.join()

        # Create new connection
        channels = []
        for ws_symbol in self.ticks.keys():
            channels.append(ws_symbol + "@ticker")
            channels.append(ws_symbol + "@depth5")
            channels.append(ws_symbol + "@kline_1m")

        if self.server == "REAL":
            url = F_WEBSOCKET_DATA_HOST + "/".join(channels)
            if not self.usdt_base:
                url = D_WEBSOCKET_DATA_HOST + "/".join(channels)
        else:
            url = F_TESTNET_WEBSOCKET_DATA_HOST + "/".join(channels)
            if not self.usdt_base:
                url = D_TESTNET_WEBSOCKET_DATA_HOST + "/".join(channels)

        self.init(url, self.proxy_host, self.proxy_port)
        self.start()

    def on_packet(self, packet: dict) -> None:
        """"""
        stream = packet["stream"]
        data = packet["data"]

        symbol, channel = stream.split("@")
        tick: TickData = self.ticks[symbol]
        bar: BarData = self.bars[symbol]


        if channel == "ticker":
            tick.volume = float(data['v'])
            tick.open_price = float(data['o'])
            tick.high_price = float(data['h'])
            tick.low_price = float(data['l'])
            tick.last_price = float(data['c'])
            tick.datetime = generate_datetime(float(data['E'])) # datetime.fromtimestamp(float(data['E']) / 1000)
        elif channel == "kline_1m":

            dt = generate_datetime(float(data['k']['t']))
            if dt < bar.datetime and bar.close_price > 0:
                # filter the older kline.
                return
            kline = data['k'] # 默认一分钟K线
            bar.open_price = float(kline['o'])
            bar.high_price = float(kline['h'])
            bar.low_price = float(kline['l'])
            bar.close_price = float(kline['c'])
            bar.volume = float(kline['v']) # 成交量
            bar.quote_volume = float(kline['q']) # 成交额
            bar.trade_num = float(kline['n']) # 成交笔数
            bar.taker_buy_base_asset_volume = float(kline["V"]) # 主动买入成交量
            bar.taker_buy_quote_asset_volume = float(kline["Q"]) # 主动买入成交额
            bar.kline_end = data['k']['x']
            bar.datetime = dt
            if data['k']['x']:  # bar finished
                self.gateway.on_bar(copy(bar))
        else:
            bids = data["b"]
            for n in range(min(5, len(bids))):
                price, volume = bids[n]
                tick.__setattr__("bid_price_" + str(n + 1), float(price))
                tick.__setattr__("bid_volume_" + str(n + 1), float(volume))

            asks = data["a"]
            for n in range(min(5, len(asks))):
                price, volume = asks[n]
                tick.__setattr__("ask_price_" + str(n + 1), float(price))
                tick.__setattr__("ask_volume_" + str(n + 1), float(volume))

        if tick.last_price:
            self.gateway.on_tick(copy(tick))


def generate_datetime(timestamp: float) -> datetime:
    """"""
    dt = datetime.fromtimestamp(timestamp / 1000)
    dt = CHINA_TZ.localize(dt)
    return dt
Member
avatar
加入于:
帖子: 3027
声望: 174

那请自己在接口层进行打印排查吧

Member
avatar
加入于:
帖子: 10
声望: 0

xiaohe wrote:

那请自己在接口层进行打印排查吧
主要是使用默认,只保持websocket连接的定时任务也会出现该情况,每次请求都输出了信息 但是突增的权重前面没有对应的请求连续进来

Member
avatar
加入于:
帖子: 10
声望: 0

xiaohe wrote:

那请自己在接口层进行打印排查吧
主要是使用默认,只保持websocket连接的定时任务也会出现该情况,每次请求都输出了信息 但是突增的权重前面没有对应的请求连续进来

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3

沪公网安备 31011502017034号