去年写了一个快期的Gateway,可以用快期模拟,当然也能用来实盘交易
代码写的时间太久,也没实盘交易过,或许有很多Bug,欢迎评论指正

"""
Author: Aaron Qiu
"""
from threading import Thread
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

from tqsdk import TqApi, TqAccount, TqSim
from vnpy.event import Event
from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.constant import (
    Direction,
    Exchange,
    Interval,
    Offset,
    OrderType,
    Status,
    OptionType,
)
from vnpy.trader.event import (
    EVENT_TIMER,
    EVENT_TICK,
    EVENT_TRADE,
    EVENT_ORDER,
    EVENT_POSITION,
    EVENT_ACCOUNT,
    EVENT_CONTRACT,
)
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
    BarData,
    CancelRequest,
    HistoryRequest,
    OrderData,
    OrderRequest,
    SubscribeRequest,
    ContractData,
    Product,
    TickData,
    AccountData,
    PositionData,
    OrderData,
)

TQ_TO_VT_TYPE = {
    "FUTURE_OPTION": Product.OPTION,
    "INDEX": Product.INDEX,
    "FUTURE_COMBINE": Product.SPREAD,
    "SPOT": Product.SPOT,
    "FUTURE_CONT": Product.INDEX,
    "FUTURE": Product.FUTURES,
    "FUTURE_INDEX": Product.INDEX,
    "OPTION": Product.OPTION,
}

EXCHANGE_KUAIQI2VT = {
    "CFFEX": Exchange.CFFEX,
    "SHFE": Exchange.SHFE,
    "CZCE": Exchange.CZCE,
    "DCE": Exchange.DCE,
    "INE": Exchange.INE,
    "CSI": Exchange.CSI,
    "SSWE": Exchange.SSWE,
}


def get_variety(symbol):
    for count, word in enumerate(symbol):
        if word.isdigit():
            break
    return symbol[:count]


def to_tq_symbol(symbol: str, exchange: Exchange) -> str:
    """
    TQSdk exchange first
    """
    if "88" in symbol:
        return f"KQ.m@{exchange.value}.{get_variety(symbol)}"
    if "99" in symbol:
        return f"KQ.i@{exchange.value}.{get_variety(symbol)}"
    return f"{exchange.value}.{symbol}"


def to_vt_symbol(tq_symbol) -> str:
    """"""
    if "KQ.m" in tq_symbol:
        ins_type, instrument = tq_symbol.split("@")
        exchange, symbol = instrument.split(".")
        return f"{symbol}88.{exchange}"
    elif "KQ.i" in tq_symbol:
        ins_type, instrument = tq_symbol.split("@")
        exchange, symbol = instrument.split(".")
        return f"{symbol}99.{exchange}"
    else:
        exchange, symbol = tq_symbol.split(".")
        return f"{symbol}.{exchange}"


GATEWAY_NAME = "快期"


class KuaiqiGateway(BaseGateway):
    default_setting = {
        "Broker": [
            "local",
            "simnow",
            "快期模拟",
            "A安粮期货",
            "B倍特期货",
            "B宝城期货",
            "C长江期货",
            "C创元期货",
            "D大地期货",
            "D大有期货",
            "G国海良时",
            "G广州金控期货",
            "G国金期货",
            "G国贸期货",
            "G国投安信",
            "G国泰君安",
            "H宏源期货",
            "H华安期货",
            "H华金期货",
            "H华信期货",
            "H海证期货",
            "H和融期货",
            "H海通期货",
            "H华闻期货",
            "H徽商期货",
            "H混沌天成",
            "J江苏东华",
            "N南华期货",
            "S上海东方",
            "S上海中期",
            "T铜冠金源",
            "W五矿经易",
            "X先锋期货",
            "X西南期货",
            "X先融期货",
            "X新纪元",
            "X新世纪",
            "Z招金期货",
            "Z中财期货",
            "Z中钢期货",
            "Z中天期货",
            "Z中信建投",
            "F方正中期",
            "H弘业期货",
            "Y银河期货",
            "G国联期货",
            "D东证期货",
        ],
        "账号": "",
        "密码": "",
        "FRONT_BROKER": "",  # 留空即可
        "FRONT_URL": "",  # 留空即可
    }

    exchanges = list(EXCHANGE_KUAIQI2VT.values())

    def __init__(self, event_engine):
        super().__init__(event_engine, GATEWAY_NAME)
        self.api = None
        self.borker = ""
        self.subscribe_contracts = []
        self.order_mapping = {}

    def connect(self, setting: dict):
        """"""
        self.borker = setting["Broker"]
        self.username = setting["账号"]
        password = setting["密码"]
        front_broker = setting["FRONT_BROKER"] or None
        front_url = setting["FRONT_URL"] or None
        self.write_log(f"{GATEWAY_NAME}开始连接")
        login_success = False
        try:
            if self.borker == "local":
                self.api = TqApi(TqSim())
            else:
                self.api = TqApi(
                    TqAccount(
                        self.borker, self.username, password, front_broker, front_url
                    )
                )
            login_success = True
            self.write_log(f"{GATEWAY_NAME}连接成功")
        except:
            self.write_log(f"{GATEWAY_NAME}连接失败")
            self.write_log("行情服务连不上, 或者期货公司服务器关了, 或者账号密码错了")
        if login_success:
            self.query_contract()
            Thread(target=self.query_account).start()
            Thread(target=self.query_position).start()
            Thread(target=self.query_order).start()

    def close(self) -> None:
        if self.api:
            self.api.close()

    def get_tick(self, contract):
        symbol, exchange = extract_vt_symbol(contract)
        last_quote = None
        while True:
            quote = self.api.get_quote(to_tq_symbol(symbol, exchange))
            if last_quote == None or last_quote["last_price"] != quote["last_price"]:
                tick = TickData(
                    symbol=symbol,
                    exchange=exchange,
                    datetime=datetime.strptime(
                        quote["datetime"], "%Y-%m-%d %H:%M:%S.%f"
                    ),
                    name=contract,
                    volume=quote["volume"],
                    open_interest=quote["open_interest"],
                    last_price=quote["last_price"],
                    limit_up=quote["upper_limit"],
                    limit_down=quote["lower_limit"],
                    open_price=quote["open"],
                    high_price=quote["lowest"],
                    low_price=quote["lowest"],
                    pre_close=quote["pre_close"],
                    bid_price_1=quote["bid_price1"],
                    bid_price_2=quote["bid_price2"],
                    bid_price_3=quote["bid_price3"],
                    bid_price_4=quote["bid_price4"],
                    bid_price_5=quote["bid_price5"],
                    ask_price_1=quote["ask_price1"],
                    ask_price_2=quote["ask_price2"],
                    ask_price_3=quote["ask_price3"],
                    ask_price_4=quote["ask_price4"],
                    ask_price_5=quote["ask_price5"],
                    bid_volume_1=quote["bid_volume1"],
                    bid_volume_2=quote["bid_volume2"],
                    bid_volume_3=quote["bid_volume3"],
                    bid_volume_4=quote["bid_volume4"],
                    bid_volume_5=quote["bid_volume5"],
                    ask_volume_1=quote["ask_volume1"],
                    ask_volume_2=quote["ask_volume2"],
                    ask_volume_3=quote["ask_volume3"],
                    ask_volume_4=quote["ask_volume4"],
                    ask_volume_5=quote["ask_volume5"],
                    gateway_name=GATEWAY_NAME,
                )
                self.on_tick(tick)
                last_quote = quote
            time.sleep(0.5)

    def subscribe(self, req: SubscribeRequest) -> None:
        t = Thread(target=self.get_tick, args=(req.vt_symbol,))
        t.start()
        self.write_log(f"订阅:{req.vt_symbol}")

    def send_order(self, req: OrderRequest) -> str:
        tq_order = self.api.insert_order(
            symbol=to_tq_symbol(req.symbol, req.exchange),
            direction="BUY" if req.direction == Direction.LONG else "SELL",
            offset=req.offset.name,
            limit_price=req.price,
            volume=req.volume,
        )
        # 缓存起来
        self.order_mapping[tq_order["order_id"]] = tq_order
        vt_order = OrderData(
            symbol=req.symbol,
            exchange=req.exchange,
            orderid=tq_order["order_id"],
            type=req.type,
            direction=req.direction,
            offset=req.offset,
            price=req.price,
            volume=req.volume,
            gateway_name=GATEWAY_NAME,
        )
        self.on_order(vt_order)

    def cancel_order(self, req: CancelRequest) -> None:
        # 缓存起来
        tq_order = self.order_mapping[req.orderid]
        if tq_order:
            self.api.cancel_order(tq_order)

    def query_account(self) -> None:
        last_account = None
        while True:
            account = self.api.get_account()
            if last_account != account:
                self.on_account(
                    AccountData(
                        accountid=self.username
                        if self.borker == "local"
                        else account["user_id"],
                        balance=account["balance"],
                        frozen=account["frozen_margin"],
                        gateway_name=GATEWAY_NAME,
                    )
                )
            time.sleep(0.5)

    def query_position(self) -> None:
        last_positions = None
        while True:
            positions = self.api.get_position()
            if last_positions != positions:
                for p in positions.values():
                    if p["volume_long"]:
                        self.on_position(
                            PositionData(
                                symbol=p["instrument_id"],
                                exchange=Exchange[p["exchange_id"]],
                                direction=Direction.LONG,
                                volume=p["volume_long"],
                                frozen=p["volume_long_frozen"],
                                price=p["open_price_long"],
                                pnl=p["position_profit_long"],
                                yd_volume=p["volume_long_yd"],
                                gateway_name=GATEWAY_NAME,
                            )
                        )
                    if p["volume_short"]:
                        self.on_position(
                            PositionData(
                                symbol=p["instrument_id"],
                                exchange=Exchange[p["exchange_id"]],
                                direction=Direction.SHORT,
                                volume=p["volume_short"],
                                frozen=p["volume_short_frozen"],
                                price=p["open_price_short"],
                                pnl=p["position_profit_short"],
                                yd_volume=p["volume_short_yd"],
                                gateway_name=GATEWAY_NAME,
                            )
                        )

    def query_order(self) -> None:
        last_orders = None
        while True:
            orders = self.api.get_order()
            if last_orders != orders:
                for o in orders.values():
                    self.on_order(
                        OrderData(
                            symbol=o["instrument_id"],
                            exchange=Exchange[o["exchange_id"]],
                            orderid=o["order_id"],
                            type=OrderType.MARKET
                            if o["price_type"] == "ANY"
                            else OrderType.LIMIT,
                            direction=Direction.LONG
                            if o["direction"] == "BUY"
                            else Direction.SHORT,
                            offset=Offset[o["offset"]],
                            price=o["limit_price"],
                            volume=o["volume_orign"],
                            gateway_name=GATEWAY_NAME,
                        )
                    )
            time.sleep(0.5)

    def query_contract(self) -> None:
        active_contract = [
            v for k, v in self.api._data["quotes"].items() if v["expired"] == False
        ]
        for contract in active_contract:
            vt_symbol = to_vt_symbol(contract["instrument_id"])
            symbol, exchange = extract_vt_symbol(vt_symbol)
            if TQ_TO_VT_TYPE[contract["ins_class"]] == Product.OPTION:
                contract_data = ContractData(
                    symbol=symbol,
                    exchange=exchange,
                    name=contract["product_id"],
                    product=TQ_TO_VT_TYPE[contract["ins_class"]],
                    size=contract["volume_multiple"],
                    pricetick=contract["price_tick"],
                    history_data=True,
                    option_strike=contract["strike_price"],
                    option_underlying=to_vt_symbol(contract["underlying_symbol"]),
                    option_type=OptionType[contract["option_class"]],
                    option_expiry=datetime.fromtimestamp(contract["expire_datetime"]),
                    option_index=to_vt_symbol(contract["underlying_symbol"]),
                    gateway_name=GATEWAY_NAME,
                )
            else:
                contract_data = ContractData(
                    symbol=symbol,
                    exchange=exchange,
                    name=contract["product_id"],
                    product=TQ_TO_VT_TYPE[contract["ins_class"]],
                    size=contract["volume_multiple"],
                    pricetick=contract["price_tick"],
                    history_data=True,
                    gateway_name=GATEWAY_NAME,
                )
            self.on_contract(contract_data)