vn.py官网
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 308
声望: 90

1. 当前vnpy的CTP网关没有考虑CTP流控

1.1 什么是CTP流控?

详细见 报单流控、查询流控和会话数控制,这里不再赘述,有兴趣的读者可以去看看。

1.2 目前vnpy的ctp_gateway 不符合CTP流控

  1. 目前的vnpy的ctp_gateway完全看不到CTP接口的报单流控和查询流控的特点,只有一些API和SPI的函数实现
  2. 执行报单和查询相关操作时,网关应该有两个统一的队列来调度,以便满足CTP接口对流控的要求,而不是把流控留给上层的APP去处理
  3. 当上层app和ctp_gateway都不管报单流控,就会出现多个用户策略同时发出报单而超越流控限制现象,这在实际交易中经常出现,问题可能就在这里!

让我们看看下面的ctp_gateway的td_api.send_order()代码:

    def send_order(self, req: OrderRequest) -> str:
        """委托下单"""
       # .... 这里省略
        self.reqid += 1
        self.reqOrderInsert(ctp_req, self.reqid)            # 这里有问题

        orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
        order: OrderData = req.create_order_data(orderid, self.gateway_name)
        self.gateway.on_order(order)

        return order.vt_orderid

上面代码中self.reqOrderInsert(ctp_req, self.reqid)就没有考虑报单流控,它有4个返回值:
0,代表成功。
-1,表示网络连接失败;
-2,表示未处理请求超过许可数;
-3,表示每秒发送请求数超过许可数。
其中两个是有关流控的,如果是-2,-3我们是可以采用类似等待、重试、队列等方法的,可是都没有。
这导致的问题是如果是返回了-2,-3,那么,报单是没有成功的,上层app还以为已经成功了,也不会采用什么其它逻辑来补救。

1.3 不符合CTP流控,有什么问题?

 CTP交易接口一定的报单和查询必须符合CTP流控规则,默认FTD报文流控如果是每秒6次,报单和查询都会形成FTD报文,查询流控为1秒1次,报单流控最大是每秒6次,假如当前秒已经有了1次查询,那么报单的流控就只有5次了。
如果报单超越了当前的流控,那么报单坑定是失败的,交易服务器是不会接受报单请求的,无论你是开仓还是平仓,CTP交易接口根本没有成交的可能!
这样就造成我们以为发出去的符合条件成交条件委托是成功的,可是却是石沉大海,没有了下文。
实际的危害可能是:机会来了无法入场,灾难来临无法逃离 !

2. 如何改造当前的CTP网关?

改造的主要思路是:只要没有违反流控,直接快速执行报单请求和查询请求,如遇违反流控,让CTP网关缓冲报单请求和查询请求,延后执行。

  • 构造报单请求队列,用来缓存遇到流控问题的报单请求;
  • 构造查询请求队列,用来缓存遇到流控问题的查询请求;
  • 对于来自上层app的调用CTP交易接口中的 报单请求,保持与之前相同接口样式,立即执行。如果遇到返回值为流控错误,则将报单请求及参数压入报单请求队列;
  • 对于来自上层app的调用CTP交易接口中的 查询请求,保持与之前相同接口样式,立即执行。如果遇到返回值为流控错误,则将查询请求及参数压入查询请求队列;
  • 定时每1/6秒检查报单请求队列一次,从队列的头部取出报单请求并且执行。如果执行成功将该报单请求丢弃,如果遇到返回值为流控错误,再次将报单请求及参数压入报单请求队列;
  • 定时每1/6秒检查查询请求队列一次,从队列的头部取出查询请求并且执行。如果执行成功将该查询请求丢弃,如果遇到返回值为流控错误,再次将查询请求及参数压入查询请求队列。

3. 符合CTP流控的ctp_gateway 实现代码

本次分享的ctp_gateway比当前vnpy系统的ctp_gateway具备更为丰富的接口,其中包括:

  • 查询投资者
  • 查询投资单元
  • 查询经纪公司交易参数
  • 查询产品
  • 查询交易所保证金比率
  • 查询交易所保证金比率调整
  • 查询合约保证金比率
  • 查询手续费率
  • 查询报单手续费率
  • 合约交易状态推送
  • 交易所保证金比率推送

为了提高接口推送消息的能力,在ctp_gateway登录交易接口时,先连续不停地接收并解码合约状态、交易所保证金和合约信息这三个数量巨大的推送数据,然后分别一次性推送这些数据到系统的消息引擎中。这些消息接收端应该一次性地对这些消息进行处理。

3.1 修改vnpy\trader\constant,py,加入下面的代码:


class HedgeType(Enum):
    """
    投机/套保/备兑类型  hxxjava add
    """
    SEPCULATION = '1' #"投机"  
    ARBITRAGE = '2'   #"套利"   
    HEDGE = '3'       #"套保"     
    MARKETMAKER = '5' #"做市商"  
    SPECHEDGE = '6'   # 第一腿投机第二腿套保 大商所专用  
    HEDGESPEC = '7'   # 第一腿套保第二腿投机  大商所专用  


class InstrumentStatus(Enum):
    """
    合约交易状态类型 hxxjava debug
    """
    BEFORE_TRADING = "开盘前"
    NO_TRADING = "非交易"
    CONTINOUS = "连续交易" 
    AUCTION_ORDERING = "集合竞价报单"
    AUCTION_BALANCE = "集合竞价价格平衡"
    AUCTION_MATCH = "集合竞价撮合"
    CLOSE = "收盘"


# 有效交易状态
VALID_TRADE_STATUSES = [
    InstrumentStatus.CONTINOUS,
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]

# 集合竞价交易状态
AUCTION_STATUS = [
    InstrumentStatus.AUCTION_ORDERING,
    InstrumentStatus.AUCTION_BALANCE,
    InstrumentStatus.AUCTION_MATCH
]


class StatusEnterReason(Enum):
    """
    品种进入交易状态原因类型 hxxjava debug
    """
    AUTOMATIC = "自动切换"
    MANUAL = "手动切换"
    FUSE = "熔断" 


class MarginPriceType(Enum):
    """
    保证金价格类型  
    """
    # 昨结算价
    PRE_SETTLEMENT_PRICE = '1'
    # 最新价
    SETTLEMENT_PRICE = '2'
    # 成交均价
    $ERAGE_PRICE = '3'
    # 开仓价
    OPEN_PRICE = '4'


class AlgorithmType(Enum):
    """
    盈亏算法类型
    """
    # 浮盈浮亏都计算
    ALL = '1'
    # 浮盈不计,浮亏计
    ONLY_LOST ='2'
    # 浮盈计,浮亏不计
    ONLY_GAIN = '3'
    # 浮盈浮亏都不计算
    NONE = '4'


class IncludeCloseProfitType(Enum):
    """
    是否包含平仓盈利类型
    """
    # 包含平仓盈利
    INCLUDE = '0'
    # 不包含平仓盈利
    NOT_INCLUDE = '2'


class OptionRoyaltyPriceType(Enum):
    """
    期权权利金价格类型类型
    """
    # 昨结算价
    PRE_SETTLEMENT_PRICE = '1'
    # 开仓价
    OPEN_PRICE = '4'
    # 最新价与昨结算价较大值
    MAX_PRE_SETTLEMENT_PRICE = '5'


class IdCardType(Enum):
    """ $类型 """
    # 组织机构代码
    EID = '0'
    # 中国公民$
    IDCard = '1'
    # $
    OfficerIDCard = '2'
    # $
    PoliceIDCard = '3'
    # $
    SoldierIDCard = '4'
    # 户口簿
    HouseholdRegister = '5'
    # $
    Passport = '6'
    # 台胞证
    TaiwanCompatriotIDCard  = '7'
    # 回乡证
    HomeComingCard = '8'
    # 营业执照号
    LicenseNo = '9'
    # 税务登记号/当地纳税ID
    TaxNo = 'A'
    # 港澳居民来往内地通行证
    HMMainlandTravelPermit = 'B'
    # 台湾居民来往大陆通行证
    TwMainlandTravelPermit = 'C'
    # $
    DrivingLicense = 'D'
    # 当地社保ID
    SocialID = 'F'
    # 当地$
    LocalID = 'G'
    # 商业登记证
    BusinessRegistration = 'H'
    # 港澳永久性居民$
    HKMCIDCard = 'I'
    # 人行开户许可证
    AccountsPermits = 'J'
    # 外国人永久居留证
    FrgPrmtRdCard = 'K'
    # 资管产品备案函
    CptMngPrdLetter = 'L'
    # 统一社会信用代码
    UniformSocialCreditCode = 'N'
    # 机构成立证明文件
    CorporationCertNo = 'O'
    # 其他$
    OtherCard = 'x'


class ProductClass(Enum):
    # 期货
    FUTURES = '1'
    # 期货期权
    OPTIONS = '2'
    # 组合
    COMBINATION = '3'
    # 即期
    SPOT = '4'
    # 期转现
    EFP = '5'
    # 现货期权
    SPOT_OPTION = '6'
    # TAS合约
    TAS = '7'
    # 金属指数
    MI = 'I'

因为本文的内容比较长,一共分为5个帖子,其他内容在下方帖子中 ... ...

Member
avatar
加入于:
帖子: 308
声望: 90

3.2 修改vnpy\trader\object.py,内容如下:

"""
Basic data structure used for general trading function in VN Trader.
"""

from dataclasses import dataclass
from datetime import datetime,date          # hxxjava add date
from logging import INFO

from .constant import AlgorithmType, Direction, Exchange, Interval, Offset, Status, Product, OptionType, OrderType
from .constant import HedgeType,InstrumentStatus,MarginPriceType,AlgorithmType,IncludeCloseProfitType,Currency,OptionRoyaltyPriceType,IdCardType,ProductClass # hxxjava add

ACTIVE_STATUSES = set([Status.SUBMITTING, Status.NOTTRADED, Status.PARTTRADED])


@dataclass
class BaseData:
    """
    Any data object needs a gateway_name as source
    and should inherit base data.
    """

    gateway_name: str


@dataclass
class TickData(BaseData):
    """
    Tick data contains information about:
        * last trade in market
        * orderbook snapshot
        * intraday market statistics.
    """

    symbol: str
    exchange: Exchange
    datetime: datetime

    name: str = ""
    volume: float = 0
    turnover: float = 0
    open_interest: float = 0
    last_price: float = 0
    last_volume: float = 0
    limit_up: float = 0
    limit_down: float = 0

    open_price: float = 0
    high_price: float = 0
    low_price: float = 0
    pre_close: float = 0

    bid_price_1: float = 0
    bid_price_2: float = 0
    bid_price_3: float = 0
    bid_price_4: float = 0
    bid_price_5: float = 0

    ask_price_1: float = 0
    ask_price_2: float = 0
    ask_price_3: float = 0
    ask_price_4: float = 0
    ask_price_5: float = 0

    bid_volume_1: float = 0
    bid_volume_2: float = 0
    bid_volume_3: float = 0
    bid_volume_4: float = 0
    bid_volume_5: float = 0

    ask_volume_1: float = 0
    ask_volume_2: float = 0
    ask_volume_3: float = 0
    ask_volume_4: float = 0
    ask_volume_5: float = 0

    localtime: datetime = None

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"


@dataclass
class BarData(BaseData):
    """
    Candlestick bar data of a certain trading period.
    """

    symbol: str
    exchange: Exchange
    datetime: datetime

    interval: Interval = None
    volume: float = 0
    turnover: float = 0
    open_interest: float = 0
    open_price: float = 0
    high_price: float = 0
    low_price: float = 0
    close_price: float = 0

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"


@dataclass
class OrderData(BaseData):
    """
    Order data contains information for tracking lastest status
    of a specific order.
    """

    symbol: str
    exchange: Exchange
    orderid: str

    type: OrderType = OrderType.LIMIT
    direction: Direction = None
    offset: Offset = Offset.NONE
    price: float = 0.0      # hxxjava change
    volume: float = 0.0     # hxxjava change
    traded: float = 0.0     # hxxjava change
    status: Status = Status.SUBMITTING
    datetime: datetime = None
    tradingday:date = None  # hxxjava add
    reference:str = ""      # hxxjava add

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
        self.vt_orderid = f"{self.gateway_name}.{self.orderid}"

    def is_active(self) -> bool:
        """
        Check if the order is active.
        """
        return self.status in ACTIVE_STATUSES

    def create_cancel_request(self) -> "CancelRequest":
        """
        Create cancel request object from order.
        """
        req = CancelRequest(
            orderid=self.orderid, symbol=self.symbol, exchange=self.exchange
        )
        return req


@dataclass
class TradeData(BaseData):
    """
    Trade data contains information of a fill of an order. One order
    can have several trade fills.
    """

    symbol: str
    exchange: Exchange
    orderid: str
    tradeid: str
    direction: Direction = None

    offset: Offset = Offset.NONE
    price: float = 0.0              # hxxjava change
    volume: float = 0.0             # hxxjava change
    datetime: datetime = None
    tradingday:date = None          # hxxjava add

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
        self.vt_orderid = f"{self.gateway_name}.{self.orderid}"
        self.vt_tradeid = f"{self.gateway_name}.{self.tradeid}"


@dataclass
class PositionData(BaseData):
    """
    Positon data is used for tracking each individual position holding.
    """

    symbol: str
    exchange: Exchange
    direction: Direction

    volume: float = 0
    frozen: float = 0
    price: float = 0
    pnl: float = 0
    yd_volume: float = 0

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
        self.vt_positionid = f"{self.vt_symbol}.{self.direction.value}"


@dataclass
class AccountData(BaseData):
    """
    Account data contains information about balance, frozen and
    available.
    """

    accountid: str

    balance: float = 0
    frozen: float = 0

    def __post_init__(self):
        """"""
        self.available = self.balance - self.frozen
        self.vt_accountid = f"{self.gateway_name}.{self.accountid}"


@dataclass
class LogData(BaseData):
    """
    Log data is used for recording log messages on GUI or in log files.
    """

    msg: str
    level: int = INFO

    def __post_init__(self):
        """"""
        self.time = datetime.now()


@dataclass
class ContractData(BaseData):
    """
    Contract data contains basic information about each contract traded.
    """

    symbol: str
    exchange: Exchange
    name: str
    product: Product
    size: float
    pricetick: float

    min_volume: float = 1           # minimum trading volume of the contract
    stop_supported: bool = False    # whether server supports stop order
    net_position: bool = False      # whether gateway uses net position volume
    history_data: bool = False      # whether gateway provides bar history data

    option_strike: float = 0
    option_underlying: str = ""     # vt_symbol of underlying contract
    option_type: OptionType = None
    option_expiry: datetime = None
    option_portfolio: str = ""
    option_index: str = ""          # for identifying options with same strike price

    # hxxjava add start
    max_market_order_volume: int = 0   
    min_market_order_volume: int = 0   
    max_limit_order_volume: int = 0    
    min_limit_order_volume: int = 0   
    open_date : str = ""            
    expire_date : str = ""          
    is_trading : bool = False       
    long_margin_ratio:float = 0     
    short_margin_ratio:float = 0   
    # hxxjava add end

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

def left_alphas(instr:str):
    """ get lefe alphas of a string """
    ret_str = ''
    for s in instr:
        if s.isalpha():
            ret_str += s
        else:
            break
    return ret_str

@dataclass
class StatusData(BaseData):
    """
    hxxjava debug
    """
    symbol:str       
    exchange : Exchange    
    settlement_group_id : str = ""  
    instrument_status : InstrumentStatus = None   
    trading_segment_sn : int = None 
    enter_time : str = ""      
    enter_reason : str = ""  
    exchange_inst_id : str = ""     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def belongs_to(self,vt_symbol:str):
        symbol,exchange_str = vt_symbol.split(".")
        instrument = left_alphas(symbol).upper()
        return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)


@dataclass
class MarginData(BaseData):     # hxxjava add
    """
    Exchange margin rate data for the instrument. 
    """    
    symbol: str
    exchange: str
    hedge_flag:HedgeType = HedgeType.SEPCULATION
    long_margin_rate:float = 0.0        
    long_margin_perlot:float = 0.0    
    short_margin_rate:float = 0.0      
    short_margin_perlot:float = 0.0     

    def __post_init__(self):
        """  """
        self.vt_symbol = f"{self.symbol}.{self.exchange}"


@dataclass
class MarginRateAdjustData(BaseData):     # hxxjava add
    """
    Exchange margin rate adjust data for the instrument. 
    """
    symbol: str = ""
    hedge_flag:HedgeType = HedgeType.SEPCULATION

    # 跟随保证金(率)
    LongMarginRatioByMoney : float = 0.0
    LongMarginRatioByVolume : float = 0.0
    ShortMarginRatioByMoney : float = 0.0
    ShortMarginRatioByVolume : float = 0.0

    # 交易所保证金(率)
    ExchLongMarginRatioByMoney : float = 0.0
    ExchLongMarginRatioByVolume : float = 0.0
    ExchShortMarginRatioByMoney : float = 0.0
    ExchShortMarginRatioByVolume : float = 0.0

    # 不跟随保证金(率)
    NoLongMarginRatioByMoney : float = 0.0
    NoLongMarginRatioByVolume : float = 0.0
    NoShortMarginRatioByMoney : float = 0.0
    NoShortMarginRatioByVolume : float = 0.0


@dataclass
class InstrumentMarginData(BaseData):     # hxxjava add
    """
    Instrument margin rate data for the contract.
    """
    symbol: str = ""
    exchange: str = ""
    hedge_flag:HedgeType = HedgeType.SEPCULATION
    long_margin_rate:float = 0.0        
    long_margin_perlot:float = 0.0    
    short_margin_rate:float = 0.0      
    short_margin_perlot:float = 0.0    
    is_ralative:bool = False            
    investor_range:str = ""    
    investor_id:str=""       
    invest_unit_id:str=""    

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange}"


@dataclass
class CommissionData(BaseData):     # hxxjava add
    """
    Commssion rate data for the instrument.
    """
    symbol: str
    exchange: str = ""                    
    open_ratio_bymoney:float = 0.0        
    open_ratio_byvolume:float = 0.0       
    close_ratio_bymoney:float = 0.0      
    close_ratio_byvolume:float = 0.0      
    close_today_ratio_bymoney:float=0.0   
    close_today_ratio_byvolume:float=0.0   
    investor_range:str = ""
    biz_type: str = ""                    
    invest_unit_id: str = ""                    

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange}"


@dataclass
class OrderCommRateData(BaseData):     # hxxjava add
    """
    Ordering commission rate data for the contract .
    """
    symbol:str
    exchange:str
    investor_range:str
    invest_unit_id:str
    hedge_flag:HedgeType = HedgeType.SEPCULATION
    order_comm_byvolume:float = 0.0
    order_action_comm_byvolume:float = 0.0

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange}"

@dataclass
class BrokerTradingParamsData(BaseData):     # hxxjava add
    """
    Broker trading params data.
    """
    margin_price_type:MarginPriceType
    algorithm:AlgorithmType
    avail_include_close_profit:IncludeCloseProfitType
    currency:Currency
    option_royalty_price_type:OptionRoyaltyPriceType
    account:str


@dataclass
class InvestorData(BaseData):     # hxxjava add
    """
    Investor information data.
    """
    id:str
    name:str
    broker:str
    group:str
    identifiedCardType:IdCardType
    identifiedCardNo:str
    is_active:bool
    telephone:str
    address:str
    open_date:date
    mobile:str
    commission_model:str
    margin_model:str


@dataclass
class ProductData(BaseData):     # hxxjava add
    """
    Product data.
    """
    id:str
    name:str
    exchange:Exchange
    product_class:ProductClass
    size:float = 0.0
    price_tick:float = 0.0
    MaxMarketOrderVolume:float = 0.0
    MinMarketOrderVolume:float = 0.0
    MaxLimitOrderVolume:float = 0.0
    MinLimitOrderVolume:float = 0.0
    PositionType:str = ""
    PositionDateType:str = ""
    CloseDealType:str = ""
    TradeCurrencyID:str = ""
    MortgageFundUseRange:str = ""
    ExchangeProductID:str = ""
    UnderlyingMultiple:float = 0.0


@dataclass
class GatewayData():     # hxxjava add
    """
    Gateway data
    """
    name:str = ""   
    type:str = ""     # 'TD'/'MD'
    reason:int = 0  


@dataclass
class QuoteData(BaseData):
    """
    Quote data contains information for tracking lastest status
    of a specific quote.
    """

    symbol: str
    exchange: Exchange
    quoteid: str

    bid_price: float = 0.0
    bid_volume: int = 0
    ask_price: float = 0.0
    ask_volume: int = 0
    bid_offset: Offset = Offset.NONE
    ask_offset: Offset = Offset.NONE
    status: Status = Status.SUBMITTING
    datetime: datetime = None
    reference: str = ""

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
        self.vt_quoteid = f"{self.gateway_name}.{self.quoteid}"

    def is_active(self) -> bool:
        """
        Check if the quote is active.
        """
        return self.status in ACTIVE_STATUSES

    def create_cancel_request(self) -> "CancelRequest":
        """
        Create cancel request object from quote.
        """
        req = CancelRequest(
            orderid=self.quoteid, symbol=self.symbol, exchange=self.exchange
        )
        return req


@dataclass
class SubscribeRequest:
    """
    Request sending to specific gateway for subscribing tick data update.
    """

    symbol: str
    exchange: Exchange

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"


@dataclass
class OrderRequest:
    """
    Request sending to specific gateway for creating a new order.
    """

    symbol: str
    exchange: Exchange
    direction: Direction
    type: OrderType
    volume: float
    price: float = 0
    offset: Offset = Offset.NONE
    reference: str = ""

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def create_order_data(self, orderid: str, gateway_name: str) -> OrderData:
        """
        Create order data from request.
        """
        order = OrderData(
            symbol=self.symbol,
            exchange=self.exchange,
            orderid=orderid,
            type=self.type,
            direction=self.direction,
            offset=self.offset,
            price=self.price,
            volume=self.volume,
            reference=self.reference,
            gateway_name=gateway_name,
        )
        return order


@dataclass
class MarginRequest:    # hxxjava add
    """
    Request sending to specific margin rate for a contract.
    """
    symbol: str                 
    exchange: Exchange = None          
    hedge_type:HedgeType = HedgeType.SEPCULATION    


@dataclass
class MarginRateAdjustRequest:    # hxxjava add
    """
    Request sending to specific margin rate adjust for an instrument.
    """
    symbol: str                 
    hedge_type:HedgeType = HedgeType.SEPCULATION    

@dataclass
class CommissionRequest:    # hxxjava add
    """
    Request sending to specific commission for a contract.
    """
    symbol: str = ""
    exchange: Exchange = None
    invest_unit:str = ""

@dataclass
class OrderCommRateRequest:    # hxxjava add
    """
    Request sending order commission rate for an instrument.
    """
    symbol: str = ""


@dataclass
class BrokerTradingParamsRequest:      # hxxjava add
    """ Broker trading parameters request. """
    CurrencyID:str = "CNY"     # default is "CNY"
    AccountID:str = ""      # acount id


@dataclass
class ProductRequst:    # hxxjava add
    product:str
    product_class:ProductClass
    exchange:Exchange = None


@dataclass
class CancelRequest:
    """
    Request sending to specific gateway for canceling an existing order.
    """

    orderid: str
    symbol: str
    exchange: Exchange

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"


@dataclass
class HistoryRequest:
    """
    Request sending to specific gateway for querying history data.
    """

    symbol: str
    exchange: Exchange
    start: datetime
    end: datetime = None
    interval: Interval = None

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"


@dataclass
class QuoteRequest:
    """
    Request sending to specific gateway for creating a new quote.
    """

    symbol: str
    exchange: Exchange
    bid_price: float
    bid_volume: int
    ask_price: float
    ask_volume: int
    bid_offset: Offset = Offset.NONE
    ask_offset: Offset = Offset.NONE
    reference: str = ""

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

    def create_quote_data(self, quoteid: str, gateway_name: str) -> QuoteData:
        """
        Create quote data from request.
        """
        quote = QuoteData(
            symbol=self.symbol,
            exchange=self.exchange,
            quoteid=quoteid,
            bid_price=self.bid_price,
            bid_volume=self.bid_volume,
            ask_price=self.ask_price,
            ask_volume=self.ask_volume,
            bid_offset=self.bid_offset,
            ask_offset=self.ask_offset,
            reference=self.reference,
            gateway_name=gateway_name,
        )
        return quote
Member
avatar
加入于:
帖子: 308
声望: 90

3.3 修改vnpy\trader\event.py,内容如下:

"""
Event type string used in VN Trader.
"""

from vnpy.event import EVENT_TIMER  # noqa

EVENT_ORIGIN_TICK = "eOriginTick."              # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick."            # hxxjava debug
EVENT_TICK = "eTick."
EVENT_TRADE = "eTrade."
EVENT_ORDER = "eOrder."
EVENT_POSITION = "ePosition."
EVENT_ACCOUNT = "eAccount."
EVENT_QUOTE = "eQuote."
EVENT_CONTRACT = "eContract."
EVENT_STATUS = "eStatus."                        # hxxjava debug
EVENT_LOG = "eLog"

EVENT_CONNECT = "eConnected"                    # hxxjava add
EVENT_DISCONNECT = "eDisconnected"              # hxxjava add

EVENT_INSTRUMENT_MARGIN_RATE = "eInstrumentMarginRate."             # hxxjava add
EVENT_EXCHANGE_MARGIN_RATE = "eExchaneMarginRate."                  # hxxjava add
EVENT_EXCHANGE_MARGIN_RATE_ADJUST = "eExchaneMarginRateAdjust."     # hxxjava add
EVENT_COMMISSION = "eCommission."                                   # hxxjava add
EVENT_ORDER_COMMISSION = "eOrderCommission."                        # hxxjava add

EVENT_CONTRACT_END = "eContractEnd."                                # hxxjava debug
EVENT_STATUS_END = "eStatusEnd."                                    # hxxjava debug
EVENT_EXCHANGE_MARGIN_RATE_END = "eExchaneMarginRateEnd."           # hxxjava add

EVENT_BROKER_TRADING_PARAMS = "eBrokerTradingParams."   # hxxjava add
EVENT_QUERY_INVESTOR = "eQueryInvestor."                # hxxjava add
EVENT_QUERY_PRODUCT = "eQueryProduct."                  # hxxjava add
Member
avatar
加入于:
帖子: 308
声望: 90

3.4 修改vnpy\trader\gateway.py,内容如下:

该文件中的LocalOrderManager保持不动,其它内容修改如下:

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
from copy import copy

from vnpy.event import Event, EventEngine

from .event import (
    EVENT_ORIGIN_TICK,
    EVENT_ORDER,
    EVENT_TRADE,
    EVENT_POSITION,
    EVENT_ACCOUNT,
    EVENT_CONTRACT,
    EVENT_STATUS,       # hxxjava debug
    EVENT_EXCHANGE_MARGIN_RATE,         # hxxjava debug
    EVENT_EXCHANGE_MARGIN_RATE_ADJUST,  # hxxjava debug
    EVENT_INSTRUMENT_MARGIN_RATE,       # hxxjava add
    EVENT_COMMISSION,   # hxxjava add
    EVENT_ORDER_COMMISSION, # hxxjava add
    EVENT_EXCHANGE_MARGIN_RATE_END,         # hxxjava debug
    EVENT_CONTRACT_END,       # hxxjava debug    
    EVENT_STATUS_END,         # hxxjava debug
    EVENT_BROKER_TRADING_PARAMS, # hxxjava add
    EVENT_QUERY_INVESTOR,       # hxxjava add
    EVENT_QUERY_PRODUCT,        # hxxjava add
    EVENT_CONNECT,      # hxxjava add
    EVENT_DISCONNECT,   # hxxjava add
    EVENT_LOG,
    EVENT_QUOTE,
)
from .object import (
    TickData,
    OrderData,
    TradeData,
    PositionData,
    AccountData,
    ContractData,
    MarginRequest,      # hxxjava debug 2021-9-16
    MarginRateAdjustRequest, # hxxjava debug 2021-10-13
    CommissionRequest,  # hxxjava debug 2021-9-16
    OrderCommRateRequest, # hxxjava debug 2021-9-16
    BrokerTradingParamsRequest, # hxxjava debug 2021-9-22
    ProductRequst,  # hxxjava debug
    StatusData,     # hxxjava debug
    MarginData, # hxxjava debug
    MarginRateAdjustData,   # hxxjava add
    InstrumentMarginData,   # hxxjava add
    CommissionData, # hxxjava add
    OrderCommRateData, # hxxjava add
    BrokerTradingParamsData, # hxxjava add
    InvestorData,   # hxxjava add
    ProductData,    # hxxjava add
    GatewayData,    # hxxjava add
    LogData,
    QuoteData,
    OrderRequest,
    CancelRequest,
    SubscribeRequest,
    HistoryRequest,
    QuoteRequest,
    Exchange,
    BarData
)
from enum import Enum

class Function():    # hxxjava add
    """
    Function and it's parameters description.
    """
    func: Callable = None
    param: dict = {}
    once: bool = False
    maxcount:int = 1

    def __init__(
        self,
        func:Callable,
        param:Dict={},
        once: bool=False,
        maxcount:int = 1
    ) -> None:
        """ """
        self.func = func
        self.param = param
        self.once = once
        self.maxcount = maxcount
        self.count = 0

    def exec(self):
        """ """
        self.count += 1
        if not self.once:
            # 如果时循环执行的
            if self.count >= self.maxcount:
                # 超循环计数执行一次
                self.count = 0
                return self.func(**self.param)
            else:
                return -5
        else:
            # 单次执行的
            return self.func(**self.param)

    def to_str(self):
        return f"Function(func:{self.func},param:{self.param},once:{self.once},maxcount:{self.maxcount},count:{self.count})"

class BaseGateway(ABC):
    """
    Abstract gateway class for creating gateways connection
    to different trading systems.

    # How to implement a gateway:

    ---
    ## Basics
    A gateway should satisfies:
    * this class should be thread-safe:
        * all methods should be thread-safe
        * no mutable shared properties between objects.
    * all methods should be non-blocked
    * satisfies all requirements written in docstring for every method and callbacks.
    * automatically reconnect if connection lost.

    ---
    ## methods must implements:
    all @abstractmethod

    ---
    ## callbacks must response manually:
    * on_tick
    * on_trade
    * on_order
    * on_position
    * on_account
    * on_contract

    All the XxxData passed to callback should be constant, which means that
        the object should not be modified after passing to on_xxxx.
    So if you use a cache to store reference of data, use copy.copy to create a new object
    before passing that data into on_xxxx



    """

    # Fields required in setting dict for connect function.
    default_setting: Dict[str, Any] = {}

    # Exchanges supported in the gateway.
    exchanges: List[Exchange] = []

    def __init__(self, event_engine: EventEngine, gateway_name: str):
        """"""
        self.event_engine: EventEngine = event_engine
        self.gateway_name: str = gateway_name

    def on_event(self, type: str, data: Any = None) -> None:
        """
        General event push.
        """
        event = Event(type, data)
        self.event_engine.put(event)

    def on_tick(self, tick: TickData) -> None:
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """     
        # self.on_event(EVENT_TICK, tick)
        # self.on_event(EVENT_TICK + tick.vt_symbol, tick)
        self.on_event(EVENT_ORIGIN_TICK, tick)

    def on_trade(self, trade: TradeData) -> None:
        """
        Trade event push.
        Trade event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_TRADE, trade)
        self.on_event(EVENT_TRADE + trade.vt_symbol, trade)

    def on_order(self, order: OrderData) -> None:
        """
        Order event push.
        Order event of a specific vt_orderid is also pushed.
        """
        self.on_event(EVENT_ORDER, order)
        self.on_event(EVENT_ORDER + order.vt_orderid, order)

    def on_position(self, position: PositionData) -> None:
        """
        Position event push.
        Position event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_POSITION, position)
        self.on_event(EVENT_POSITION + position.vt_symbol, position)

    def on_account(self, account: AccountData) -> None:
        """
        Account event push.
        Account event of a specific vt_accountid is also pushed.
        """
        self.on_event(EVENT_ACCOUNT, account)
        self.on_event(EVENT_ACCOUNT + account.vt_accountid, account)

    def on_quote(self, quote: QuoteData) -> None:
        """
        Quote event push.
        Quote event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_QUOTE, quote)
        self.on_event(EVENT_QUOTE + quote.vt_symbol, quote)

    def on_log(self, log: LogData) -> None:
        """
        Log event push.
        """
        self.on_event(EVENT_LOG, log)

    def on_contract(self, contract: ContractData) -> None:
        """
        Contract event push.
        """
        self.on_event(EVENT_CONTRACT, contract)

    def on_contract_end(self, contracts: List[ContractData]) -> None:
        """
        Contract event push.
        """
        self.on_event(EVENT_CONTRACT_END, contracts)

    def on_status(self, status: StatusData) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS, status)
        self.on_event(EVENT_STATUS + status.vt_symbol, status)

    def on_exchange_margin_rate(self, margin: MarginData) -> None:        # hxxjava add
        """
        Exchange margin event push.
        """
        self.on_event(EVENT_EXCHANGE_MARGIN_RATE,margin)

    def on_exchange_margin_rate_adjust(self, adjust: MarginRateAdjustData) -> None:        # hxxjava add
        """
        Exchange margin event push.
        """
        self.on_event(EVENT_EXCHANGE_MARGIN_RATE_ADJUST,adjust)

    def on_instrument_margin_rate(self, margin: InstrumentMarginData) -> None:        # hxxjava add
        """
        Margin event push.
        """
        self.on_event(EVENT_INSTRUMENT_MARGIN_RATE,margin)

    def on_commission(self, commission: CommissionData) -> None:    # hxxjava add   
        """
        Commission event push.
        """
        self.on_event(EVENT_COMMISSION, commission)

    def on_order_commission(self, commission: OrderCommRateData) -> None:    # hxxjava add   
        """
        Commission event push.
        """
        self.on_event(EVENT_ORDER_COMMISSION, commission)

    def on_status_end(self, stats: List[StatusData]) -> None:    # hxxjava debug
        """
        Instrument Status event push.
        """
        self.on_event(EVENT_STATUS_END, stats)

    def on_exchange_margin_rate_end(self, EMRs: List[MarginData]) -> None:        # hxxjava add
        """
        Exchange margin event push.
        """
        self.on_event(EVENT_EXCHANGE_MARGIN_RATE_END,EMRs)

    def on_broker_trading_params(self, broker_trading_params: BrokerTradingParamsData) -> None:    # hxxjava add   
        """
        Broker trading param event push.
        """
        self.on_event(EVENT_BROKER_TRADING_PARAMS, broker_trading_params)

    def on_query_investor(self, investor: InvestorData) -> None:    # hxxjava add   
        """
        Broker trading param event push.
        """
        self.on_event(EVENT_QUERY_INVESTOR, investor)

    def on_query_product(self,product:ProductData)->None: # hxxjava add
        """
        Query product event push.
        """        
        self.on_event(EVENT_QUERY_PRODUCT,product)

    def on_connect(self,gateway:GatewayData) -> None:    # hxxjava add   
        """
        gateway connect enent
        """
        self.on_event(EVENT_CONNECT, gateway)

    def on_disconnect(self,gateway:GatewayData) -> None:    # hxxjava add   
        """
        gateway disconnect enent
        """
        self.on_event(EVENT_DISCONNECT, gateway)

    def write_log(self, msg: str) -> None:
        """
        Write a log event from gateway.
        """
        log = LogData(msg=msg, gateway_name=self.gateway_name)
        self.on_log(log)

    @abstractmethod
    def connect(self, setting: dict) -> None:
        """
        Start gateway connection.

        to implement this method, you must:
        * connect to server if necessary
        * log connected if all necessary connection is established
        * do the following query and response corresponding on_xxxx and write_log
            * contracts : on_contract
            * account asset : on_account
            * account holding: on_position
            * orders of account: on_order
            * trades of account: on_trade
        * if any of query above is failed,  write log.

        future plan:
        response callback/change status instead of write_log

        """
        pass

    @abstractmethod
    def close(self) -> None:
        """
        Close gateway connection.
        """
        pass

    @abstractmethod
    def subscribe(self, req: SubscribeRequest) -> None:
        """
        Subscribe tick data update.
        """
        pass

    @abstractmethod
    def send_order(self, req: OrderRequest) -> str:
        """
        Send a new order to server.

        implementation should finish the tasks blow:
        * create an OrderData from req using OrderRequest.create_order_data
        * assign a unique(gateway instance scope) id to OrderData.orderid
        * send request to server
            * if request is sent, OrderData.status should be set to Status.SUBMITTING
            * if request is failed to sent, OrderData.status should be set to Status.REJECTED
        * response on_order:
        * return vt_orderid

        :return str vt_orderid for created OrderData
        """
        pass

    @abstractmethod
    def cancel_order(self, req: CancelRequest) -> None:
        """
        Cancel an existing order.
        implementation should finish the tasks blow:
        * send request to server
        """
        pass

    def send_quote(self, req: QuoteRequest) -> str:
        """
        Send a new two-sided quote to server.

        implementation should finish the tasks blow:
        * create an QuoteData from req using QuoteRequest.create_quote_data
        * assign a unique(gateway instance scope) id to QuoteData.quoteid
        * send request to server
            * if request is sent, QuoteData.status should be set to Status.SUBMITTING
            * if request is failed to sent, QuoteData.status should be set to Status.REJECTED
        * response on_quote:
        * return vt_quoteid

        :return str vt_quoteid for created QuoteData
        """
        return ""

    def cancel_quote(self, req: CancelRequest) -> None:
        """
        Cancel an existing quote.
        implementation should finish the tasks blow:
        * send request to server
        """
        pass

    @abstractmethod
    def query_account(self) -> None:
        """
        Query account balance.
        """
        pass

    @abstractmethod
    def query_position(self) -> None:
        """
        Query holding positions.
        """
        pass

    @abstractmethod
    def query_exchange_margin_rate(self,req:MarginRequest) -> None:
        """
        Query contract's margin fee or ratio.  # hxxjava add 2021-9-16
        """
        pass

    @abstractmethod
    def query_exchange_margin_rate_adjust(self,req:MarginRateAdjustRequest) -> None:
        """
        Query instrument's margin rate adjust.  # hxxjava add 2021-10-13
        """
        pass

    @abstractmethod
    def query_margin_ratio(self,req:MarginRequest) -> None:
        """
        Query contract's margin fee or ratio.  # hxxjava add 2021-9-16
        """
        pass

    @abstractmethod
    def query_commission(self,req:CommissionRequest) -> None:
        """
        Query contract's commission fee or ratio.  # hxxjava add 2021-9-16
        """
        pass

    @abstractmethod
    def query_order_commission(self,req:OrderCommRateRequest) -> None:
        """
        Query contract's commission fee or ratio.  # hxxjava add 2021-9-16
        """
        pass

    @abstractmethod
    def query_broker_trading_params(self,req:BrokerTradingParamsRequest) -> None:
        """
        Query user's broker trading params.  # hxxjava add 2021-9-22
        """
        pass

    @abstractmethod
    def query_investor(self) -> None:
        """
        Query investor .  # hxxjava add 2021-10-12
        """
        pass

    @abstractmethod
    def query_product(self,req:ProductRequst) -> None:
        """
        Query product .  # hxxjava add 2021-10-12
        """
        pass

    def query_history(self, req: HistoryRequest) -> List[BarData]:
        """
        Query bar history data.
        """
        pass

    def get_default_setting(self) -> Dict[str, Any]:
        """
        Return default setting dict.
        """
        return self.default_setting
Member
avatar
加入于:
帖子: 308
声望: 90

3.5 修改vnpy_ctp\gateway\ctp_gateway.py,内容如下:

from pathlib import Path
import sys
import pytz
from datetime import datetime
from time import sleep, time
from vnpy.event.engine import EventEngine
from typing import Callable, Dict, List

import threading    # hxxjava debug
from vnpy.trader.constant import (
    Direction,
    HedgeType,
    Offset,
    Exchange,
    OrderType,
    Product,
    Status,
    OptionType,
    InstrumentStatus,
    StatusEnterReason,
    IdCardType,
    ProductClass,
)
from vnpy.trader.gateway import BaseGateway,Function

from vnpy.trader.object import (
    TickData,
    OrderData,
    TradeData,
    PositionData,
    AccountData,
    ContractData,
    StatusData,                 # hxxjava add
    MarginData,                 # hxxjava add
    MarginRateAdjustData,       # hxxjava add
    InstrumentMarginData,       # hxxjava add
    CommissionData,             # hxxjava add
    OrderCommRateData,          # hxxjava add
    BrokerTradingParamsData,    # hxxjava add
    InvestorData,               # hxxjava add   
    ProductData,                # hxxjava add
    MarginRequest,              # hxxjava add
    MarginRateAdjustRequest,    # hxxjava add
    CommissionRequest,          # hxxjava add
    OrderCommRateRequest,       # hxxjava add
    BrokerTradingParamsRequest, # hxxjava add
    ProductRequst,              # hxxjava add
    GatewayData,                # hxxjava add    
    OrderRequest,
    CancelRequest,
    SubscribeRequest,
)

from vnpy.trader.utility import get_folder_path
from vnpy.trader.event import EVENT_TIMER
from threading import Timer  # hxxjava add

from ..api import (
    MdApi,
    TdApi,
    THOST_FTDC_OAS_Submitted,
    THOST_FTDC_OAS_Accepted,
    THOST_FTDC_OAS_Rejected,
    THOST_FTDC_OST_NoTradeQueueing,
    THOST_FTDC_OST_PartTradedQueueing,
    THOST_FTDC_OST_AllTraded,
    THOST_FTDC_OST_Canceled,
    THOST_FTDC_D_Buy,
    THOST_FTDC_D_Sell,
    THOST_FTDC_PD_Long,
    THOST_FTDC_PD_Short,
    THOST_FTDC_OPT_LimitPrice,
    THOST_FTDC_OPT_AnyPrice,
    THOST_FTDC_OF_Open,
    THOST_FTDC_OFEN_Close,
    THOST_FTDC_OFEN_CloseYesterday,
    THOST_FTDC_OFEN_CloseToday,
    THOST_FTDC_PC_Futures,
    THOST_FTDC_PC_Options,
    THOST_FTDC_PC_SpotOption,
    THOST_FTDC_PC_Combination,
    THOST_FTDC_CP_CallOptions,
    THOST_FTDC_CP_PutOptions,
    THOST_FTDC_HF_Speculation,
    THOST_FTDC_CC_Immediately,
    THOST_FTDC_FCC_NotForceClose,
    THOST_FTDC_TC_GFD,
    THOST_FTDC_VC_$,
    THOST_FTDC_TC_IOC,
    THOST_FTDC_VC_CV,
    THOST_FTDC_AF_Delete
)


# 委托状态映射
STATUS_CTP2VT: Dict[str, Status] = {
    THOST_FTDC_OAS_Submitted: Status.SUBMITTING,
    THOST_FTDC_OAS_Accepted: Status.SUBMITTING,
    THOST_FTDC_OAS_Rejected: Status.REJECTED,
    THOST_FTDC_OST_NoTradeQueueing: Status.NOTTRADED,
    THOST_FTDC_OST_PartTradedQueueing: Status.PARTTRADED,
    THOST_FTDC_OST_AllTraded: Status.ALLTRADED,
    THOST_FTDC_OST_Canceled: Status.CANCELLED
}

# 多空方向映射
DIRECTION_VT2CTP: Dict[Direction, str] = {
    Direction.LONG: THOST_FTDC_D_Buy,
    Direction.SHORT: THOST_FTDC_D_Sell
}
DIRECTION_CTP2VT: Dict[str, Direction] = {v: k for k, v in DIRECTION_VT2CTP.items()}
DIRECTION_CTP2VT[THOST_FTDC_PD_Long] = Direction.LONG
DIRECTION_CTP2VT[THOST_FTDC_PD_Short] = Direction.SHORT

# 委托类型映射
ORDERTYPE_VT2CTP: Dict[OrderType, str] = {
    OrderType.LIMIT: THOST_FTDC_OPT_LimitPrice,
    OrderType.MARKET: THOST_FTDC_OPT_AnyPrice
}
ORDERTYPE_CTP2VT: Dict[str, OrderType] = {v: k for k, v in ORDERTYPE_VT2CTP.items()}

# 开平方向映射
OFFSET_VT2CTP: Dict[Offset, str] = {
    Offset.OPEN: THOST_FTDC_OF_Open,
    Offset.CLOSE: THOST_FTDC_OFEN_Close,
    Offset.CLOSETODAY: THOST_FTDC_OFEN_CloseToday,
    Offset.CLOSEYESTERDAY: THOST_FTDC_OFEN_CloseYesterday,
}
OFFSET_CTP2VT: Dict[str, Offset] = {v: k for k, v in OFFSET_VT2CTP.items()}

# 交易所映射
EXCHANGE_CTP2VT: Dict[str, Exchange] = {
    "CFFEX": Exchange.CFFEX,
    "SHFE": Exchange.SHFE,
    "CZCE": Exchange.CZCE,
    "DCE": Exchange.DCE,
    "INE": Exchange.INE
}

# 产品类型映射
PRODUCT_CTP2VT: Dict[str, Product] = {
    THOST_FTDC_PC_Futures: Product.FUTURES,
    THOST_FTDC_PC_Options: Product.OPTION,
    THOST_FTDC_PC_SpotOption: Product.OPTION,
    THOST_FTDC_PC_Combination: Product.SPREAD
}

# 期权类型映射
OPTIONTYPE_CTP2VT: Dict[str, OptionType] = {
    THOST_FTDC_CP_CallOptions: OptionType.CALL,
    THOST_FTDC_CP_PutOptions: OptionType.PUT
}



# 品种状态进入原因映射  hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
    "0": InstrumentStatus.BEFORE_TRADING,
    "1": InstrumentStatus.NO_TRADING,
    "2": InstrumentStatus.CONTINOUS,
    "3": InstrumentStatus.AUCTION_ORDERING,
    "4": InstrumentStatus.AUCTION_BALANCE,
    "5": InstrumentStatus.AUCTION_MATCH,
    "6": InstrumentStatus.CLOSE,
    "7": InstrumentStatus.CLOSE
}


# 品种状态进入原因映射  hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
    "1": StatusEnterReason.AUTOMATIC,
    "2": StatusEnterReason.MANUAL,
    "3": StatusEnterReason.FUSE
}

# 其他常量
MAX_FLOAT = sys.float_info.max                  # 浮点数极限值
CHINA_TZ = pytz.timezone("Asia/Shanghai")       # 中国时区

# 合约数据全局缓存字典
symbol_contract_map: Dict[str, ContractData] = {}

class CtpGateway(BaseGateway):
    """
    vn.py用于对接期货CTP柜台的交易接口。
    """

    default_setting: Dict[str, str] = {
        "用户名": "",
        "密码": "",
        "经纪商代码": "",
        "交易服务器": "",
        "行情服务器": "",
        "产品名称": "",
        "授权编码": ""
    }

    exchanges: List[str] = list(EXCHANGE_CTP2VT.values())

    def __init__(self, event_engine: EventEngine, gateway_name: str = "CTP") -> None:
        """构造函数"""
        super().__init__(event_engine, gateway_name)

        self.td_api: "CtpTdApi" = CtpTdApi(self)
        self.md_api: "CtpMdApi" = CtpMdApi(self)

    def connect(self, setting: dict) -> None:
        """连接交易接口"""
        userid: str = setting["用户名"]
        password: str = setting["密码"]
        brokerid: str = setting["经纪商代码"]
        td_address: str = setting["交易服务器"]
        md_address: str = setting["行情服务器"]
        appid: str = setting["产品名称"]
        auth_code: str = setting["授权编码"]

        if (
            (not td_address.startswith("tcp://"))
            and (not td_address.startswith("ssl://"))
        ):
            td_address = "tcp://" + td_address

        if (
            (not md_address.startswith("tcp://"))
            and (not md_address.startswith("ssl://"))
        ):
            md_address = "tcp://" + md_address

        self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid)
        self.md_api.connect(md_address, userid, password, brokerid)

        self.init_query()

    def subscribe(self, req: SubscribeRequest) -> None:
        """订阅行情"""
        self.md_api.subscribe(req)

    def send_order(self, req: OrderRequest) -> str:
        """委托下单"""
        # 期权询价
        if req.type == OrderType.RFQ:
            vt_orderid: str = self.td_api.send_rfq(req)
        # 其他委托
        else:
            vt_orderid: str = self.td_api.send_order(req)
        return vt_orderid

    def cancel_order(self, req: CancelRequest) -> None:
        """委托撤单"""
        self.td_api.cancel_order(req)

    def query_account(self) -> None:
        """查询资金"""
        self.td_api.query_account()

    def query_position(self) -> None:
        """查询持仓"""
        self.td_api.query_position()

    def queryInvestUnit(self):  # hxxjava add 
        """查询投资单元"""
        self.td_api.queryInvestUnit()

    def query_broker_trading_params(self,req:BrokerTradingParamsRequest):   # hxxjava add 
        """ 查询经纪公司交易参数 """
        self.td_api.query_broker_trading_params(req)

    def query_investor(self) -> None:
        """
        查询投资者 
        """
        self.td_api.query_investor()

    def query_product(self,req:ProductRequst) -> None:
        """ 查询产品信息 """
        self.td_api.query_product(req)

    def query_commission(self,req:CommissionRequest) -> None:   # hxxjava add 
        """查询手续费数据"""       
        self.td_api.query_commission(req)

    def query_order_commission(self,req:OrderCommRateRequest): # hxxjava add
        """查询报单手续费数据"""       
        self.td_api.query_order_commission(req)

    def query_margin_ratio(self,req:MarginRequest): # hxxjava add 
        """查询保证金率数据"""
        self.td_api.query_margin_ratio(req)

    def query_exchange_margin_rate(self,req:MarginRequest): # hxxjava add 
        """查询交易所保证金率数据"""
        self.td_api.query_exchange_margin_rate(req)

    def query_exchange_margin_rate_adjust(self,req:MarginRateAdjustRequest): # hxxjava add 
        """查询交易所保证金率调整数据"""
        self.td_api.query_exchange_margin_rate_adjust(req)

    def close(self):
        """"""
        self.td_api.close() 
        self.md_api.close() 

    def write_error(self, msg: str, error: dict) -> None:
        """输出错误信息日志"""
        error_id: int = error["ErrorID"]
        error_msg: str = error["ErrorMsg"]
        msg = f"{msg},代码:{error_id},信息:{error_msg}"
        self.write_log(msg)

    def process_timer_event(self, event) -> None:
        """定时事件处理"""
        self.count += 1
        if self.count < 2:
            return
        self.count = 0

        if self.td_api.connect_status:
            print(
                "交易所保证金:",len(self.td_api.ex_margin_rate_data),
                "合约状态:",len(self.td_api.status_data),
                "合约信息:",len(symbol_contract_map.keys()),
                "委托单数:",len(self.td_api.order_data),
                "成交单数:",len(self.td_api.trade_data),
                "报单+查询:",(len(self.td_api.order_functions),len(self.td_api.query_functions))
            )

        self.md_api.update_date()

    def init_query(self) -> None:
        """初始化查询任务"""
        self.count: int = 0
        self.event_engine.register(EVENT_TIMER, self.process_timer_event)

class CtpMdApi(MdApi):
    """"""

    def __init__(self, gateway: CtpGateway) -> None:
        """构造函数"""
        super().__init__()

        self.gateway: CtpGateway = gateway
        self.gateway_name: str = gateway.gateway_name

        self.reqid: int = 0

        self.connect_status: bool = False
        self.login_status: bool = False
        self.subscribed: List[str] = set()

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""

        self.current_date: str = datetime.now().strftime("%Y%m%d")

    def onFrontConnected(self) -> None:
        """服务器连接成功回报"""

        self.gateway.on_connect(GatewayData(name="CTP",type='MD'))    # hxxjava add
        self.gateway.write_log("行情服务器连接成功")
        self.login()

    def onFrontDisconnected(self, reason: int) -> None:
        """服务器连接断开回报"""
        self.login_status = False
        self.gateway.on_disconnect(GatewayData(name="CTP",type='MD',reason=reason)) # hxxjava add
        self.gateway.write_log(f"行情服务器连接断开,原因{reason}")

    def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """用户登录请求回报"""
        if not error["ErrorID"]:
            self.login_status = True
            self.gateway.write_log("行情服务器登录成功")

            for symbol in self.subscribed:
                self.subscribeMarketData(symbol)
        else:
            self.gateway.write_error("行情服务器登录失败", error)

    def onRspError(self, error: dict, reqid: int, last: bool) -> None:
        """请求报错回报"""
        self.gateway.write_error("行情接口报错", error)

    def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """订阅行情回报"""
        if not error or not error["ErrorID"]:
            return

        self.gateway.write_error("行情订阅失败", error)

    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        timestamp: str = f"{self.current_date} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt = CHINA_TZ.localize(dt)

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]

        self.gateway.on_tick(tick)

    def connect(self, address: str, userid: str, password: str, brokerid: int) -> None:
        """连接服务器"""
        self.userid = userid
        self.password = password
        self.brokerid = brokerid

        # 禁止重复发起连接,会导致异常崩溃
        if not self.connect_status:
            path: Path = get_folder_path(self.gateway_name.lower())
            self.createFtdcMdApi((str(path) + "\\Md").encode("GBK"))

            self.registerFront(address)
            self.init()

            self.connect_status = True

    def login(self) -> None:
        """用户登录"""
        ctp_req: dict = {
            "UserID": self.userid,
            "Password": self.password,
            "BrokerID": self.brokerid
        }

        self.reqid += 1
        self.reqUserLogin(ctp_req, self.reqid)

    def subscribe(self, req: SubscribeRequest) -> None:
        """订阅行情"""
        if self.login_status:
            self.subscribeMarketData(req.symbol)
        self.subscribed.add(req.symbol)

    def close(self) -> None:
        """关闭连接"""
        if self.connect_status:
            self.exit()

    def update_date(self) -> None:
        """更新当前日期"""
        self.current_date = datetime.now().strftime("%Y%m%d")


class CtpTdApi(TdApi):
    """"""

    def __init__(self, gateway: CtpGateway) -> None:
        """构造函数"""
        super().__init__()

        # hxxjava start    
        self.order_functions:List[Function] = []
        self.query_functions:List[Function] = []
        self.inited = False
        self.debug = True
        self.td_lock = threading.Lock()
        # hxxjava end

        self.gateway: CtpGateway = gateway
        self.gateway_name: str = gateway.gateway_name

        self.reqid: int = 0
        self.order_ref: int = 0

        self.connect_status: bool = False
        self.login_status: bool = False
        self.auth_status: bool = False
        self.login_failed: bool = False
        self.contract_inited: bool = False

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""
        self.auth_code: str = ""
        self.appid: str = ""

        self.frontid: int = 0
        self.sessionid: int = 0

        # hxxjava start      
        self.constract_data: List[dict] = []
        self.status_data: List[dict] = []
        self.ex_margin_rate_data: List[dict] = []
        # hxxjava end

        self.order_data: List[dict] = []
        self.trade_data: List[dict] = []
        self.positions: Dict[str, PositionData] = {}
        self.sysid_orderid_map: Dict[str, str] = {}

        self.init_order_query_functions()

    def init_order_query_functions(self) -> None:
        """初始化查询任务"""
        self.order_functions.clear()
        self.query_functions.clear()

        self.query_functions.append(Function(func=self.query_investor,once=True))
        self.query_functions.append(Function(func=self.query_broker_trading_params,param={'req':BrokerTradingParamsRequest()},once=True))

        self.query_functions.append(Function(func=self.query_account,maxcount=3))
        self.query_functions.append(Function(func=self.query_position,maxcount=3))

        self.exec_order_cmds()
        self.exec_query_cmds()    

    def exec_order_cmds(self):    # hxxjava add
        """ 执行报单命令 """
        if self.inited and self.order_functions:
            # 报单命令列表中有内容
            order_func:Function = self.order_functions.pop(0)
            ret_val = order_func.exec()
            if ret_val == 0:
                # 报单命令执行成功
                if not order_func.once:
                    self.order_functions.append(order_func)

            else:
                # 报单命令执行失败,添加到命令列表的头部
                self.order_functions.insert(0,order_func)

        self.order_timer = Timer(1.0/6, self.exec_order_cmds)
        self.order_timer.start()

    def exec_query_cmds(self):  # hxxjava add
        """ 执行查询命令 """
        if self.inited and self.query_functions:
            # 查询命令列表中有内容
            func:Function = self.query_functions.pop(0)
            ret_val = func.exec()

            # print(f"{func.to_str()} exec()={ret_val}")

            if ret_val == 0:
                # 查询命令执行成功
                if not func.once:
                    # 周期性查询命令,添加到命令列表的尾部
                    self.query_functions.append(func)

            elif ret_val == -5:
                # 循环计数未达到的,添加到命令列表的尾部
                self.query_functions.append(func)

            elif ret_val in [-2,-3]:
                # 查询命令执行失败,添加到命令列表的头部
                self.query_functions.insert(0,func)

        self.query_timer = Timer(1.0, self.exec_query_cmds)
        self.query_timer.start()

    def execute_func(self,func:Callable,req:Dict,prompt:str="") -> int:
        """ 
        统一的查询执行函数,
        # 返回值:
        #  0:成功;-1:网络连接失败
        # -2:未处理请求超过许可数;-3:每秒发送请求数超过许可数
        """
        self.td_lock.acquire()

        self.reqid += 1
        result = func(req,self.reqid)     

        self.td_lock.release()

        if prompt and self.debug:            
            print(f"{prompt} excuted at {datetime.now()},result={result}")

        return result

    def query_investor(self):
        """"  请求查询投资者 """    
        req = {"BrokerID":self.brokerid,"InvestorID":self.userid}

        if self.execute_func(func=self.reqQryInvestor,req=req,prompt="请求查询投资者")  in [-2,-3]:
            # 违反查询流控,添加到查询函数队列的尾部
            self.query_functions.append(Function(func=self.query_investor,once=True))

    def onRspQryInvestor(self, data: dict, error: dict, reqid: int, last: bool):  # -> None:
        """
        请求查询投资者响应,当执行ReqQryInvestor后,该方法被调用。
        """
        # print(f"onRspQryInvestor data={data},error={error},reqid={reqid},last={last}")
        if data:
            investor = InvestorData(
                id = data['InvestorID'],
                name = data['InvestorName'],
                broker = data['BrokerID'],
                group = data['InvestorGroupID'],
                identifiedCardType = IdCardType(data['IdentifiedCardType']),
                identifiedCardNo = data['IdentifiedCardNo'],
                is_active = data['IsActive'],
                telephone  = data['Telephone'],
                address = data['Address'],
                open_date = data['OpenDate'],
                mobile = data['Mobile'],
                commission_model = data['CommModelID'],
                margin_model = data['MarginModelID'],
                gateway_name=self.gateway_name                
            )
            self.gateway.on_query_investor(investor)

    def queryInvestUnit(self): # hxxjava add
        """
        # 查询投资单元
        """
        if not self.connect_status:
            return
        req = {}
        req["BrokerID"]=self.brokerid
        req["InvestorID"]=self.userid
        req["InvestUnitID"]=""

        if self.execute_func(func=self.reqQryInvestUnit,req=req,prompt="查询投资单元") in [-2,-3]:
            # 违反查询流控,添加到查询函数队列的尾部
            self.query_functions.append(Function(func=self.queryInvestUnit,once=True))

    def onRspQryInvestUnit(self, data: dict, error: dict, reqid: int, last: bool):  # -> None:
        """
        # 查询投资单元应答
        """
        print(f"onRspQryInvestUnit data={data} error={error}")

    def onRspQryInstrumentCommissionRate(self, data: dict, error: dict, reqid: int, last: bool):    # hxxjava add
        """
        查询合约手续费率
        """
        if data:
            commission = CommissionData(
                    symbol = data['InstrumentID'],
                    exchange = data["ExchangeID"], # EXCHANGE_CTP2VT[data["ExchangeID"]]
                    open_ratio_bymoney=data['OpenRatioByMoney'],
                    open_ratio_byvolume=data['OpenRatioByVolume'],
                    close_ratio_bymoney=data['CloseRatioByMoney'],
                    close_ratio_byvolume=data['CloseRatioByVolume'],
                    close_today_ratio_bymoney=data['CloseTodayRatioByMoney'],
                    close_today_ratio_byvolume=data['CloseTodayRatioByVolume'], 
                    investor_range = data['InvestorRange'],
                    biz_type = data['BizType'],
                    invest_unit_id = data['InvestUnitID'],
                    gateway_name=self.gateway_name     
                )
            # print(f"onRspQryInstrumentCommissionRate = {data}")
            self.gateway.on_commission(commission)

    def onRspQryInstrumentOrderCommRate(self, data: dict, error: dict, reqid: int, last: bool):    # hxxjava add
        """
        查询报单手续费
        """
        if data:
            commission = OrderCommRateData(
                    symbol = data['InstrumentID'],
                    exchange = data["ExchangeID"], # EXCHANGE_CTP2VT[data["ExchangeID"]]
                    investor_range = data['InvestorRange'],
                    invest_unit_id = data['InvestUnitID'],
                    hedge_flag = data['HedgeFlag'],
                    order_comm_byvolume=data['OrderCommByVolume'],
                    order_action_comm_byvolume=data['OrderActionCommByVolume'],
                    gateway_name=self.gateway_name     
                )

            self.gateway.on_order_commission(commission)

    def onRspQryInstrumentMarginRate(self, data: dict, error: dict, reqid: int, last: bool):    # hxxjava add
        """ 
        查询合约保证金率
        """
        if data:
            margin_rate = InstrumentMarginData(
                    symbol = data['InstrumentID'],
                    exchange = data["ExchangeID"], # EXCHANGE_CTP2VT[data["ExchangeID"]]
                    long_margin_rate = data["LongMarginRatioByMoney"],
                    long_margin_perlot = data["LongMarginRatioByVolume"],
                    short_margin_rate = data["ShortMarginRatioByMoney"],
                    short_margin_perlot = data["ShortMarginRatioByVolume"],
                    is_ralative=data['IsRelative'],
                    investor_id=data['InvestorID'],
                    invest_unit_id=data['InvestUnitID'],
                    gateway_name=self.gateway_name
                )
            self.gateway.on_instrument_margin_rate(margin_rate)

    def onRspQryExchangeMarginRate(self, data: dict, error: dict, reqid: int, last: bool):    # hxxjava add
        """ 
        收到查询交易所保证金率结果
        """
        if not self.contract_inited:
            # 合约数据还没有初始化
            if error:
                print(f"查询交易所保证金率 错误={error}")

            if data:
                self.ex_margin_rate_data.append(data)

            if last: 
                self.gateway.write_log(f"开始查询合约信息")
                if self.execute_func(func=self.reqQryInstrument,req={},prompt="查询合约信息") in [-2,-3]:
                    self.timeContract = Timer(interval=1.0,function = self.onRspQryExchangeMarginRate,
                                kwargs={'data':None,'error':None,'reqid':reqid,'last':True})
                    self.timeContract.start()
            return

        elif error:
            pass
        elif data:
            margin_rate = self.extractExchangeMarginRate(data)
            self.gateway.on_exchange_margin_rate(margin_rate)
            # print(f"查询交易所保证金率结果 :data={data} error={error}")

    def extractExchangeMarginRate(self,data:dict): # hxxjava add
        """ 提取交易所保证金率并且发送给网关 """
        return MarginData(
                    symbol = data['InstrumentID'],
                    exchange = data["ExchangeID"],  # 总是为空 EXCHANGE_CTP2VT[data["ExchangeID"]],
                    hedge_flag = HedgeType(data["HedgeFlag"]),
                    long_margin_rate = data["LongMarginRatioByMoney"],
                    long_margin_perlot = data["LongMarginRatioByVolume"],
                    short_margin_rate = data["ShortMarginRatioByMoney"],
                    short_margin_perlot = data["ShortMarginRatioByVolume"],
                    gateway_name=self.gateway_name
                )   

    def onRspQryExchangeMarginRateAdjust(self, data: dict, error: dict, reqid: int, last: bool):    # hxxjava add
        """ 
        收到查询交易所保证金率结果
        """               
        if error:
            pass

        if data:
            adjust = MarginRateAdjustData(
                    symbol = data["InstrumentID"],
                    hedge_flag = HedgeType(data["HedgeFlag"]),
                    LongMarginRatioByMoney = data["LongMarginRatioByMoney"],
                    LongMarginRatioByVolume = data["LongMarginRatioByVolume"],
                    ShortMarginRatioByMoney = data["ShortMarginRatioByMoney"],
                    ShortMarginRatioByVolume = data["ShortMarginRatioByVolume"],
                    ExchLongMarginRatioByMoney = data["ExchLongMarginRatioByMoney"],
                    ExchLongMarginRatioByVolume = data["ExchLongMarginRatioByVolume"],
                    ExchShortMarginRatioByMoney = data["ExchShortMarginRatioByMoney"],
                    ExchShortMarginRatioByVolume = data["ExchShortMarginRatioByVolume"],
                    NoLongMarginRatioByMoney = data["NoLongMarginRatioByMoney"],
                    NoLongMarginRatioByVolume = data["NoLongMarginRatioByVolume"],
                    NoShortMarginRatioByMoney = data["NoShortMarginRatioByMoney"],
                    NoShortMarginRatioByVolume = data["NoShortMarginRatioByVolume"],
                    gateway_name=self.gateway_name
                )
            # 并且发送给网关
            self.gateway.on_exchange_margin_rate_adjust(adjust)

    def onRspQryBrokerTradingParams(self, data: dict, error: dict, reqid: int, last: bool):    # hxxjava add
        """ 请求查询经纪公司交易参数响应 """
        from vnpy.trader.constant import HedgeType,InstrumentStatus,MarginPriceType,AlgorithmType,IncludeCloseProfitType,Currency,OptionRoyaltyPriceType # hxxjava add
        if data:
            # print(f"onRspQryBrokerTradingParams : {data}")
            broker_trading_param = BrokerTradingParamsData(
                margin_price_type = MarginPriceType(data["MarginPriceType"]),
                algorithm = AlgorithmType(data["Algorithm"]),
                avail_include_close_profit = IncludeCloseProfitType(data["AvailIncludeCloseProfit"]),
                currency = Currency(data["CurrencyID"]) if data["CurrencyID"] else "",
                option_royalty_price_type = OptionRoyaltyPriceType(data["OptionRoyaltyPriceType"]),
                account = data["AccountID"],
                gateway_name=self.gateway_name
            )
            self.gateway.on_broker_trading_params(broker_trading_param)

    def onRspQryProduct(self, data: dict, error: dict, reqid: int, last: bool):    # hxxjava add
        """ 请求查询产品响应 """
        print(f"请求查询产品响应 :data={data} error={error}")
        if data:
            product = ProductData(
                id = data["ProductID"],
                name = data["ProductName"],
                exchange = Exchange(data["ExchangeID"]),
                product_class = data["ProductClass"],
                size = data["VolumeMultiple"],
                price_tick = data["PriceTick"],
                MaxMarketOrderVolume = data["MaxMarketOrderVolume"],
                MinMarketOrderVolume = data["MinMarketOrderVolume"],
                MaxLimitOrderVolume = data["MaxLimitOrderVolume"],
                MinLimitOrderVolume = data["MinLimitOrderVolume"],
                PositionType = data["PositionType"],
                PositionDateType = data["PositionDateType"],
                TradeCurrencyID = data["TradeCurrencyID"],
                CloseDealType = data["CloseDealType"],
                MortgageFundUseRange = data["MortgageFundUseRange"],
                ExchangeProductID = data["ExchangeProductID"],
                UnderlyingMultiple = data["UnderlyingMultiple"],
                gateway_name=self.gateway_name
            )
            self.gateway.on_query_product(product)

    def query_broker_trading_params(self,req:BrokerTradingParamsRequest): # hxxjava add
        """ 请求查询经纪公司交易参数 """
        if not self.connect_status:
            return  
        req_dict = {}

        req_dict["BrokerID"] = self.brokerid  
        req_dict["InvestorID"] = self.userid
        req_dict["CurrencyID"] = req.CurrencyID
        req_dict["AccountID"] = req.AccountID

        if self.execute_func(func=self.reqQryBrokerTradingParams,req=req_dict,prompt="查询经纪公司交易参数") in [-2,-3]:
            # 违反查询流控,添加到查询函数队列的尾部
            self.query_functions.append(Function(func=self.query_broker_trading_params,once=True,param={'req':req}))

    def query_product(self,req:ProductRequst) -> None:
        """ 请求查询产品 """
        if not self.connect_status:
            return
        req_dict = {}
        req_dict["ProductID"] = req.product
        req_dict["ProductClass"] = req.product_class.value
        req_dict["ExchangeID"] = "" if not req.exchange else req.exchange.value 

        if self.execute_func(func=self.reqQryProduct,req=req_dict,prompt="请求查询产品") in [-2,-3]:
            # 违反查询流控,添加到查询函数队列的尾部
            self.query_functions.append(Function(func=self.query_product,once=True,param={'req':req}))


    def query_commission(self,req:CommissionRequest):   # hxxjava add
        """ 查询手续费率 """
        if not self.connect_status:
            return         
        commission_req = {}
        commission_req['BrokerID'] = self.brokerid
        commission_req['InvestorID'] = self.userid
        commission_req['InvestorUnitID'] = self.userid  #hxxjava debug char[31]
        commission_req['InstrumentID'] = req.symbol
        commission_req['ExchangeID'] = req.exchange.value if req.exchange else ""

        if self.execute_func(func=self.reqQryInstrumentCommissionRate,req=commission_req,prompt="查询手续费率") in [-2,-3]:
            # 违反查询流控,添加到查询函数队列的尾部
            self.query_functions.append(Function(func=self.query_commission,once=True,param={'req':req}))

    def query_order_commission(self,req:OrderCommRateRequest):   # hxxjava add
        """ 查询报单手续费 """
        if not self.connect_status:
            return          
        req_dict = {}
        req_dict['BrokerID'] = self.brokerid
        req_dict['InvestorID'] = self.userid
        req_dict['InstrumentID'] = req.symbol

        if self.execute_func(func=self.reqQryInstrumentOrderCommRate,req=req_dict,prompt="查询报单手续费") in [-2,-3]:
            # 违反查询流控,添加到查询函数队列的尾部
            self.query_functions.append(Function(func=self.query_commission,once=True,param={'req':req}))

    def query_margin_ratio(self,req:MarginRequest): # hxxjava add
        """ 合约保证金率查询 """       
        if not self.connect_status:
            return
        ctp_req = {}
        ctp_req['BrokerID'] = self.brokerid
        ctp_req['InvestorID'] = self.userid
        ctp_req['HedgeFlag'] = req.hedge_type.value
        ctp_req['ExchangeID'] = req.exchange.value if req.exchange else ""
        ctp_req['InvestUnitID'] = self.userid  # hxxjava debug char[81]
        ctp_req['InstrumentID'] = req.symbol

        if self.execute_func(func=self.reqQryInstrumentMarginRate,req=ctp_req,prompt="合约保证金率查询") in [-2,-3]:
            # 违反查询流控,添加到查询函数队列的尾部
            self.query_functions.append(Function(func=self.query_margin_ratio,once=True,param={'req':req}))

    def query_exchange_margin_rate(self,req:MarginRequest): # hxxjava add
        """ 交易所保证金率查询 """       
        if not self.connect_status:
            return
        out_req = {}
        out_req['BrokerID'] = self.brokerid
        out_req['InstrumentID'] = req.symbol
        out_req['HedgeFlag'] = req.hedge_type.value
        out_req['ExchangeID'] = req.exchange.value if req.exchange else "" 

        if self.execute_func(func=self.reqQryExchangeMarginRate,req=out_req,prompt="交易所保证金率查询") in [-2,-3]:
            # 违反查询流控,添加到查询函数队列的尾部
            self.query_functions.append(Function(func=self.query_exchange_margin_rate,once=True,param={'req':req}))

    def query_exchange_margin_rate_adjust(self,req:MarginRateAdjustRequest): # hxxjava add
        """ 保证金率调整查询 """       
        if not self.connect_status:
            return
        out_req = {}
        out_req['BrokerID'] = self.brokerid
        out_req['InstrumentID'] = req.symbol
        out_req['HedgeFlag'] = req.hedge_type.value

        if self.execute_func(func=self.reqQryExchangeMarginRateAdjust,req=out_req,prompt="交易所保证金率调整查询") in [-2,-3]:
            # 违反查询流控,添加到查询函数队列的尾部
            self.query_functions.append(Function(func=self.query_exchange_margin_rate_adjust,once=True,param={'req':req}))

    def onFrontConnected(self):
        """"""
        self.gateway.on_connect(GatewayData(name="CTP",type='TD')) # hxxjava add
        self.gateway.write_log("交易服务器连接成功")

        if self.auth_code:
            self.authenticate()
        else:
            self.login()

    def onFrontDisconnected(self, reason: int) -> None:
        """ 服务器连接断开回报 """
        self.login_status = False
        self.gateway.on_disconnect(GatewayData(name="CTP",type='TD',reason=reason)) # hxxjava add
        self.gateway.write_log(f"交易服务器连接断开,原因{reason}")

    def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """用户授权验证回报"""
        if not error['ErrorID']:
            self.auth_status = True
            self.gateway.write_log("交易服务器授权验证成功")
            self.login()
        else:
            self.gateway.write_error("交易服务器授权验证失败", error)

    def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """用户登录请求回报"""
        if not error["ErrorID"]:
            self.frontid = data["FrontID"]
            self.sessionid = data["SessionID"]
            self.login_status = True
            self.gateway.write_log("交易服务器登录成功")

            # 自动确认结算单
            ctp_req: dict = {
                "BrokerID": self.brokerid,
                "InvestorID": self.userid
            }
            self.reqid += 1
            self.reqSettlementInfoConfirm(ctp_req, self.reqid)
        else:
            self.login_failed = True

            self.gateway.write_error("交易服务器登录失败", error)

    def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """委托下单失败回报"""
        order_ref: str = data["OrderRef"]
        orderid: str = f"{self.frontid}_{self.sessionid}_{order_ref}"

        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map[symbol]

        order: OrderData = OrderData(
            symbol=symbol,
            exchange=contract.exchange,
            orderid=orderid,
            direction=DIRECTION_CTP2VT[data["Direction"]],
            offset=OFFSET_CTP2VT.get(data["CombOffsetFlag"], Offset.NONE),
            price=data["LimitPrice"],
            volume=float(data["VolumeTotalOriginal"]), # hxxjava change
            status=Status.REJECTED,
            gateway_name=self.gateway_name
        )
        self.gateway.on_order(order)

        self.gateway.write_error("交易委托失败", error)

    def onRspOrderAction(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """委托撤单失败回报"""
        self.gateway.write_error("交易撤单失败", error)

    def onRspSettlementInfoConfirm(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """确认结算单回报"""
        self.gateway.write_log("结算信息确认成功")

        # # 由于流控,单次查询可能失败,通过while循环持续尝试,直到成功发出请求
        # while True:
        #     self.reqid += 1
        #     n: int = self.reqQryInstrument({}, self.reqid)

        #     if not n:
        #         break
        #     else:
        #         sleep(1)

        # hxxjava change start
        self.gateway.write_log(f"查询交易所保证金")
        self.query_exchange_margin_rate(MarginRequest(symbol="",exchange=None))
        # hxxjava change end

    def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """持仓查询回报"""
        if not data:
            return

        # print(f"持仓查询回报:{data}")

        # 必须已经收到了合约信息后才能处理
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)

        if contract:
            # 获取之前缓存的持仓数据缓存
            key: str = f"{data['InstrumentID'], data['PosiDirection']}"
            position: PositionData = self.positions.get(key, None)
            if not position:
                position = PositionData(
                    symbol=data["InstrumentID"],
                    exchange=contract.exchange,
                    direction=DIRECTION_CTP2VT[data["PosiDirection"]],
                    gateway_name=self.gateway_name
                )
                self.positions[key] = position

            # 对于上期所昨仓需要特殊处理
            if position.exchange in [Exchange.SHFE, Exchange.INE]:
                if data["YdPosition"] and not data["TodayPosition"]:
                    position.yd_volume = data["Position"]
            # 对于其他交易所昨仓的计算
            else:
                position.yd_volume = data["Position"] - data["TodayPosition"]

            # 获取合约的乘数信息
            size: int = contract.size

            # 计算之前已有仓位的持仓总成本
            cost: float = position.price * position.volume * size

            # 累加更新持仓数量和盈亏
            position.volume += data["Position"]
            position.pnl += data["PositionProfit"]

            # 计算更新后的持仓总成本和均价
            if position.volume and size:
                cost += data["PositionCost"]
                position.price = cost / (position.volume * size)

            # 更新仓位冻结数量
            if position.direction == Direction.LONG:
                position.frozen += data["ShortFrozen"]
            else:
                position.frozen += data["LongFrozen"]

        if last:
            for position in self.positions.values():
                self.gateway.on_position(position)

            self.positions.clear()

    def onRspQryTradingAccount(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """资金查询回报"""
        if "AccountID" not in data:
            return

        account: AccountData = AccountData(
            accountid=data["AccountID"],
            balance=data["Balance"],
            frozen=data["FrozenMargin"] + data["FrozenCash"] + data["FrozenCommission"],
            gateway_name=self.gateway_name
        )
        account.available = data["Available"]

        self.gateway.on_account(account)

    def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """合约查询回报"""
        product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
        if product:
            contract: ContractData = ContractData(
                symbol=data["InstrumentID"],
                exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
                name=data["InstrumentName"],
                product=product,
                size=data["VolumeMultiple"],
                pricetick=data["PriceTick"],
                # hxxjava add start
                max_market_order_volume=data["MaxMarketOrderVolume"],   
                min_market_order_volume=data["MinMarketOrderVolume"],
                max_limit_order_volume=data["MaxLimitOrderVolume"],
                min_limit_order_volume=data["MinLimitOrderVolume"],
                open_date=data["OpenDate"], 
                expire_date=data["ExpireDate"],
                is_trading=data["IsTrading"],
                long_margin_ratio=data["LongMarginRatio"],
                short_margin_ratio=data["ShortMarginRatio"],
                # hxxjava add end
                gateway_name=self.gateway_name
            )

            # 期权相关
            if contract.product == Product.OPTION:
                # 移除郑商所期权产品名称带有的C/P后缀
                if contract.exchange == Exchange.CZCE:
                    contract.option_portfolio = data["ProductID"][:-1]
                else:
                    contract.option_portfolio = data["ProductID"]

                contract.option_underlying = data["UnderlyingInstrID"]
                contract.option_type = OPTIONTYPE_CTP2VT.get(data["OptionsType"], None)
                contract.option_strike = data["StrikePrice"]
                contract.option_index = str(data["StrikePrice"])
                contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

            # self.gateway.on_contract(contract)

            symbol_contract_map[contract.symbol] = contract

        if last:
            # 更新所有合约信息
            self.gateway.write_log("更新所有合约信息...")
            self.gateway.on_contract_end(symbol_contract_map.values())

            self.contract_inited = True
            self.gateway.write_log("合约信息查询成功")
            self.gateway.write_log(f"收到{len(symbol_contract_map)}条合约信息")

            self.gateway.write_log(f"提取{len(self.ex_margin_rate_data)}条交易所保证金率信息")
            if self.ex_margin_rate_data:
                EMRs = []
                for data in self.ex_margin_rate_data:
                    EMRs.append(self.extractExchangeMarginRate(data))
                self.gateway.on_exchange_margin_rate_end(EMRs)
                self.ex_margin_rate_data.clear()

            self.gateway.write_log(f"提取{len(self.status_data)}条状态信息")
            if self.status_data:
                statuses = []
                for data in self.status_data:
                    statuses.append(self.extractInstrumentStatus(data))
                self.gateway.on_status_end(statuses)
                self.status_data.clear()

            self.gateway.write_log(f"提取{len(self.order_data)}条委托单信息")
            for data in self.order_data:
                self.onRtnOrder(data)
            self.order_data.clear()

            self.gateway.write_log(f"提取{len(self.trade_data)}条成交单信息")
            for data in self.trade_data:
                self.onRtnTrade(data)
            self.trade_data.clear()

            self.inited = True

    def onRtnInstrumentStatus(self,data:dict):
        """ 
        当接收到合约品种状态信息 # hxxjava debug 
        """
        if not self.contract_inited:
            self.status_data.append(data)
            return

        status = self.extractInstrumentStatus(data) 
        self.gateway.on_status(status)

    def extractInstrumentStatus(self,data:dict): # hxxjava add
        """ 提取合约品种状态信息 """
        return StatusData(
            symbol = data["InstrumentID"],
            exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
            settlement_group_id = data["SettlementGroupID"],
            instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
            trading_segment_sn = data["TradingSegmentSN"],
            enter_time = data["EnterTime"],
            enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
            exchange_inst_id = data["ExchangeInstID"],
            gateway_name=self.gateway_name
        )

    def onRtnOrder(self, data: dict) -> None:
        """ 委托更新推送 """
        if not self.contract_inited:
            self.order_data.append(data)
            return

        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map[symbol]

        frontid: int = data["FrontID"]
        sessionid: int = data["SessionID"]
        order_ref: str = data["OrderRef"]
        orderid: str = f"{frontid}_{sessionid}_{order_ref}"

        timestamp: str = f"{data['InsertDate']} {data['InsertTime']}"
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
        dt = CHINA_TZ.localize(dt)

        dt1 = datetime.strptime(f"{data['TradingDay']}", "%Y%m%d")  # hxxjava add
        tradingday = dt1.date()                                     # hxxjava add

        order = OrderData(
            symbol=symbol,
            exchange=contract.exchange,
            orderid=orderid,
            type=ORDERTYPE_CTP2VT[data["OrderPriceType"]],
            direction=DIRECTION_CTP2VT[data["Direction"]],
            offset=OFFSET_CTP2VT[data["CombOffsetFlag"]],
            price=data["LimitPrice"],
            volume=float(data["VolumeTotalOriginal"]),  # hxxjava change
            traded=float(data["VolumeTraded"]),         # hxxjava change
            status=STATUS_CTP2VT[data["OrderStatus"]],
            datetime=dt,
            tradingday=tradingday,                      # hxxjava add
            gateway_name=self.gateway_name
        )
        self.gateway.on_order(order)

        self.sysid_orderid_map[data["OrderSysID"]] = orderid

    def onRtnTrade(self, data: dict) -> None:
        """ 成交数据推送 """
        if not self.contract_inited:
            self.trade_data.append(data)
            return

        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map[symbol]

        orderid: str = self.sysid_orderid_map[data["OrderSysID"]]

        timestamp: str = f"{data['TradeDate']} {data['TradeTime']}"
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
        dt = CHINA_TZ.localize(dt)

        datestr = f"{data['TradingDay']}"           # hxxjava add
        dt1 = datetime.strptime(datestr, "%Y%m%d")  # hxxjava add
        tradingday = dt1.date()                     # hxxjava add

        trade = TradeData(
            symbol=symbol,
            exchange=contract.exchange,
            orderid=orderid,
            tradeid=data["TradeID"],
            direction=DIRECTION_CTP2VT[data["Direction"]],
            offset=OFFSET_CTP2VT[data["OffsetFlag"]],
            price=data["Price"],
            volume=data["Volume"],
            datetime=dt,
            tradingday=tradingday,                      # hxxjava add
            gateway_name=self.gateway_name
        )
        self.gateway.on_trade(trade)

    def onRspForQuoteInsert(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        """询价请求回报"""
        if not error["ErrorID"]:
            symbol: str = data["InstrumentID"]
            msg: str = f"{symbol}询价请求发送成功"
            self.gateway.write_log(msg)
        else:
            self.gateway.write_error("询价请求发送失败", error)

    def connect(
        self,
        address: str,
        userid: str,
        password: str,
        brokerid: int,
        auth_code: str,
        appid: str
    ) -> None:
        """连接服务器"""
        self.userid = userid
        self.password = password
        self.brokerid = brokerid
        self.auth_code = auth_code
        self.appid = appid

        if not self.connect_status:
            path: Path = get_folder_path(self.gateway_name.lower())
            self.createFtdcTraderApi((str(path) + "\\Td").encode("GBK"))

            self.subscribePrivateTopic(0)
            self.subscribePublicTopic(0)
            self.registerFront(address)
            self.init()

            self.connect_status = True
        else:
            self.authenticate()

    def authenticate(self) -> None:
        """发起授权验证"""
        ctp_req: dict = {
            "UserID": self.userid,
            "BrokerID": self.brokerid,
            "AuthCode": self.auth_code,
            "AppID": self.appid
        }

        self.reqid += 1
        self.reqAuthenticate(ctp_req, self.reqid)

    def login(self) -> None:
        """用户登录"""
        if self.login_failed:
            return

        ctp_req: dict = {
            "UserID": self.userid,
            "Password": self.password,
            "BrokerID": self.brokerid,
            "AppID": self.appid
        }

        self.reqid += 1
        self.reqUserLogin(ctp_req, self.reqid)

    def send_order(self, req: OrderRequest) -> str:
        """委托下单"""
        if req.offset not in OFFSET_VT2CTP:
            self.gateway.write_log("请选择开平方向")
            return ""

        if req.type not in ORDERTYPE_VT2CTP:
            self.gateway.write_log(f"当前接口不支持该类型的委托{req.type.value}")
            return ""

        self.order_ref += 1

        ctp_req: dict = {
            "InstrumentID": req.symbol,
            "ExchangeID": req.exchange.value,
            "LimitPrice": req.price,
            "VolumeTotalOriginal": int(req.volume),
            "OrderPriceType": ORDERTYPE_VT2CTP.get(req.type, ""),
            "Direction": DIRECTION_VT2CTP.get(req.direction, ""),
            "CombOffsetFlag": OFFSET_VT2CTP.get(req.offset, ""),
            "OrderRef": str(self.order_ref),
            "InvestorID": self.userid,
            "UserID": self.userid,
            "BrokerID": self.brokerid,
            "CombHedgeFlag": THOST_FTDC_HF_Speculation,
            "ContingentCondition": THOST_FTDC_CC_Immediately,
            "ForceCloseReason": THOST_FTDC_FCC_NotForceClose,
            "IsAutoSuspend": 0,
            "TimeCondition": THOST_FTDC_TC_GFD,
            "VolumeCondition": THOST_FTDC_VC_$,
            "MinVolume": 1
        }

        if req.type == OrderType.FAK:
            ctp_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice
            ctp_req["TimeCondition"] = THOST_FTDC_TC_IOC
            ctp_req["VolumeCondition"] = THOST_FTDC_VC_$
        elif req.type == OrderType.FOK:
            ctp_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice
            ctp_req["TimeCondition"] = THOST_FTDC_TC_IOC
            ctp_req["VolumeCondition"] = THOST_FTDC_VC_CV

        self.reqid += 1
        ret_val = self.reqOrderInsert(ctp_req, self.reqid)
        if ret_val == 0:   # hxxjava change
            # 报单成功
            orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
            order: OrderData = req.create_order_data(orderid, self.gateway_name)
            self.gateway.on_order(order)

            return order.vt_orderid
        if ret_val in [-2,-3]:
            # 违反报单流控,添加到报单函数队列的尾部
            self.order_functions.append(Function(func=self.send_order,once=True,param={'req':req}))

        return ""

    def cancel_order(self, req: CancelRequest) -> None:
        """委托撤单"""
        frontid, sessionid, order_ref = req.orderid.split("_")

        ctp_req: dict = {
            "InstrumentID": req.symbol,
            "ExchangeID": req.exchange.value,
            "OrderRef": order_ref,
            "FrontID": int(frontid),
            "SessionID": int(sessionid),
            "ActionFlag": THOST_FTDC_AF_Delete,
            "BrokerID": self.brokerid,
            "InvestorID": self.userid
        }

        self.reqid += 1
        if self.reqOrderAction(ctp_req, self.reqid) in [-2,-3]:
            # 违反报单流控,添加到报单函数队列的尾部
            self.order_functions.append(Function(func=self.cancel_order,once=True,param={'req':req}))

    def send_rfq(self, req: OrderRequest) -> str:
        """询价请求"""
        self.order_ref += 1

        ctp_req: dict = {
            "InstrumentID": req.symbol,
            "ExchangeID": req.exchange.value,
            "ForQuoteRef": str(self.order_ref),
            "BrokerID": self.brokerid,
            "InvestorID": self.userid
        }

        self.reqid += 1
        ret_val = self.reqForQuoteInsert(ctp_req, self.reqid)
        if ret_val == 0:    # hxxjava change
            # 成功执行了询价报单
            orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
            vt_orderid: str = f"{self.gateway_name}.{orderid}"
            return vt_orderid

        if ret_val in [-2,-3]:
            # 违反报单流控,添加到报单函数队列的尾部
            self.order_functions.append(Function(func=self.send_rfq,once=True,param={'req':req}))            

        return ""

    def query_account(self) -> int:
        """查询资金"""
        if not self.connect_status:
            return -4
        # self.reqid += 1
        # self.reqQryTradingAccount({}, self.reqid)
        return self.execute_func(func=self.reqQryTradingAccount,req={},prompt="查询资金账户")

    def query_position(self) -> int:
        """查询持仓"""
        if not self.connect_status:
            return -4

        if not symbol_contract_map:
            return -5

        ctp_req: dict = {
            "BrokerID": self.brokerid,
            "InvestorID": self.userid
        }

        # self.reqid += 1
        # self.reqQryInvestorPosition(ctp_req, self.reqid)

        return self.execute_func(func=self.reqQryInvestorPosition,req=ctp_req,prompt="查询投资者持仓")

    def close(self) -> None:
        """关闭连接"""
        if self.connect_status:
            self.exit()


def adjust_price(price: float) -> float:
    """将异常的浮点数最大值(MAX_FLOAT)数据调整为0"""
    if price == MAX_FLOAT:
        price = 0
    return price
Administrator
avatar
加入于:
帖子: 4469
声望: 302

感谢分享,精华送上!

Member
avatar
加入于:
帖子: 9
声望: 0

用Python的交易员 wrote:

感谢分享,精华送上!
这些改进,需要我们自己按帖子上的方法自己改吗?还是等下个版本官方统一更新,我们点击更新就可以了?

Administrator
avatar
加入于:
帖子: 4469
声望: 302

邹亮 wrote:

用Python的交易员 wrote:

感谢分享,精华送上!
这些改进,需要我们自己按帖子上的方法自己改吗?还是等下个版本官方统一更新,我们点击更新就可以了?

这些是hxxjava大神自己扩展开发的内容,社区官方版本里没有的,有需要的话可以自己参考修改

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3

沪公网安备 31011502017034号

【用户协议】
【隐私政策】