3.5 修改vnpy_ctp\gateway\ctp_gateway.py,内容如下:
from pathlib import Path
import sys
import pytz
from datetime import datetime
from time import sleep, time
from vnpy.event.engine import EventEngine
from typing import Callable, Dict, List
import threading # hxxjava debug
from vnpy.trader.constant import (
Direction,
HedgeType,
Offset,
Exchange,
OrderType,
Product,
Status,
OptionType,
InstrumentStatus,
StatusEnterReason,
IdCardType,
ProductClass,
)
from vnpy.trader.gateway import BaseGateway,Function
from vnpy.trader.object import (
TickData,
OrderData,
TradeData,
PositionData,
AccountData,
ContractData,
StatusData, # hxxjava add
MarginData, # hxxjava add
MarginRateAdjustData, # hxxjava add
InstrumentMarginData, # hxxjava add
CommissionData, # hxxjava add
OrderCommRateData, # hxxjava add
BrokerTradingParamsData, # hxxjava add
InvestorData, # hxxjava add
ProductData, # hxxjava add
MarginRequest, # hxxjava add
MarginRateAdjustRequest, # hxxjava add
CommissionRequest, # hxxjava add
OrderCommRateRequest, # hxxjava add
BrokerTradingParamsRequest, # hxxjava add
ProductRequst, # hxxjava add
GatewayData, # hxxjava add
OrderRequest,
CancelRequest,
SubscribeRequest,
)
from vnpy.trader.utility import get_folder_path
from vnpy.trader.event import EVENT_TIMER
from threading import Timer # hxxjava add
from ..api import (
MdApi,
TdApi,
THOST_FTDC_OAS_Submitted,
THOST_FTDC_OAS_Accepted,
THOST_FTDC_OAS_Rejected,
THOST_FTDC_OST_NoTradeQueueing,
THOST_FTDC_OST_PartTradedQueueing,
THOST_FTDC_OST_AllTraded,
THOST_FTDC_OST_Canceled,
THOST_FTDC_D_Buy,
THOST_FTDC_D_Sell,
THOST_FTDC_PD_Long,
THOST_FTDC_PD_Short,
THOST_FTDC_OPT_LimitPrice,
THOST_FTDC_OPT_AnyPrice,
THOST_FTDC_OF_Open,
THOST_FTDC_OFEN_Close,
THOST_FTDC_OFEN_CloseYesterday,
THOST_FTDC_OFEN_CloseToday,
THOST_FTDC_PC_Futures,
THOST_FTDC_PC_Options,
THOST_FTDC_PC_SpotOption,
THOST_FTDC_PC_Combination,
THOST_FTDC_CP_CallOptions,
THOST_FTDC_CP_PutOptions,
THOST_FTDC_HF_Speculation,
THOST_FTDC_CC_Immediately,
THOST_FTDC_FCC_NotForceClose,
THOST_FTDC_TC_GFD,
THOST_FTDC_VC_$,
THOST_FTDC_TC_IOC,
THOST_FTDC_VC_CV,
THOST_FTDC_AF_Delete
)
# 委托状态映射
STATUS_CTP2VT: Dict[str, Status] = {
THOST_FTDC_OAS_Submitted: Status.SUBMITTING,
THOST_FTDC_OAS_Accepted: Status.SUBMITTING,
THOST_FTDC_OAS_Rejected: Status.REJECTED,
THOST_FTDC_OST_NoTradeQueueing: Status.NOTTRADED,
THOST_FTDC_OST_PartTradedQueueing: Status.PARTTRADED,
THOST_FTDC_OST_AllTraded: Status.ALLTRADED,
THOST_FTDC_OST_Canceled: Status.CANCELLED
}
# 多空方向映射
DIRECTION_VT2CTP: Dict[Direction, str] = {
Direction.LONG: THOST_FTDC_D_Buy,
Direction.SHORT: THOST_FTDC_D_Sell
}
DIRECTION_CTP2VT: Dict[str, Direction] = {v: k for k, v in DIRECTION_VT2CTP.items()}
DIRECTION_CTP2VT[THOST_FTDC_PD_Long] = Direction.LONG
DIRECTION_CTP2VT[THOST_FTDC_PD_Short] = Direction.SHORT
# 委托类型映射
ORDERTYPE_VT2CTP: Dict[OrderType, str] = {
OrderType.LIMIT: THOST_FTDC_OPT_LimitPrice,
OrderType.MARKET: THOST_FTDC_OPT_AnyPrice
}
ORDERTYPE_CTP2VT: Dict[str, OrderType] = {v: k for k, v in ORDERTYPE_VT2CTP.items()}
# 开平方向映射
OFFSET_VT2CTP: Dict[Offset, str] = {
Offset.OPEN: THOST_FTDC_OF_Open,
Offset.CLOSE: THOST_FTDC_OFEN_Close,
Offset.CLOSETODAY: THOST_FTDC_OFEN_CloseToday,
Offset.CLOSEYESTERDAY: THOST_FTDC_OFEN_CloseYesterday,
}
OFFSET_CTP2VT: Dict[str, Offset] = {v: k for k, v in OFFSET_VT2CTP.items()}
# 交易所映射
EXCHANGE_CTP2VT: Dict[str, Exchange] = {
"CFFEX": Exchange.CFFEX,
"SHFE": Exchange.SHFE,
"CZCE": Exchange.CZCE,
"DCE": Exchange.DCE,
"INE": Exchange.INE
}
# 产品类型映射
PRODUCT_CTP2VT: Dict[str, Product] = {
THOST_FTDC_PC_Futures: Product.FUTURES,
THOST_FTDC_PC_Options: Product.OPTION,
THOST_FTDC_PC_SpotOption: Product.OPTION,
THOST_FTDC_PC_Combination: Product.SPREAD
}
# 期权类型映射
OPTIONTYPE_CTP2VT: Dict[str, OptionType] = {
THOST_FTDC_CP_CallOptions: OptionType.CALL,
THOST_FTDC_CP_PutOptions: OptionType.PUT
}
# 品种状态进入原因映射 hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
"0": InstrumentStatus.BEFORE_TRADING,
"1": InstrumentStatus.NO_TRADING,
"2": InstrumentStatus.CONTINOUS,
"3": InstrumentStatus.AUCTION_ORDERING,
"4": InstrumentStatus.AUCTION_BALANCE,
"5": InstrumentStatus.AUCTION_MATCH,
"6": InstrumentStatus.CLOSE,
"7": InstrumentStatus.CLOSE
}
# 品种状态进入原因映射 hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
"1": StatusEnterReason.AUTOMATIC,
"2": StatusEnterReason.MANUAL,
"3": StatusEnterReason.FUSE
}
# 其他常量
MAX_FLOAT = sys.float_info.max # 浮点数极限值
CHINA_TZ = pytz.timezone("Asia/Shanghai") # 中国时区
# 合约数据全局缓存字典
symbol_contract_map: Dict[str, ContractData] = {}
class CtpGateway(BaseGateway):
"""
vn.py用于对接期货CTP柜台的交易接口。
"""
default_setting: Dict[str, str] = {
"用户名": "",
"密码": "",
"经纪商代码": "",
"交易服务器": "",
"行情服务器": "",
"产品名称": "",
"授权编码": ""
}
exchanges: List[str] = list(EXCHANGE_CTP2VT.values())
def __init__(self, event_engine: EventEngine, gateway_name: str = "CTP") -> None:
"""构造函数"""
super().__init__(event_engine, gateway_name)
self.td_api: "CtpTdApi" = CtpTdApi(self)
self.md_api: "CtpMdApi" = CtpMdApi(self)
def connect(self, setting: dict) -> None:
"""连接交易接口"""
userid: str = setting["用户名"]
password: str = setting["密码"]
brokerid: str = setting["经纪商代码"]
td_address: str = setting["交易服务器"]
md_address: str = setting["行情服务器"]
appid: str = setting["产品名称"]
auth_code: str = setting["授权编码"]
if (
(not td_address.startswith("tcp://"))
and (not td_address.startswith("ssl://"))
):
td_address = "tcp://" + td_address
if (
(not md_address.startswith("tcp://"))
and (not md_address.startswith("ssl://"))
):
md_address = "tcp://" + md_address
self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid)
self.md_api.connect(md_address, userid, password, brokerid)
self.init_query()
def subscribe(self, req: SubscribeRequest) -> None:
"""订阅行情"""
self.md_api.subscribe(req)
def send_order(self, req: OrderRequest) -> str:
"""委托下单"""
# 期权询价
if req.type == OrderType.RFQ:
vt_orderid: str = self.td_api.send_rfq(req)
# 其他委托
else:
vt_orderid: str = self.td_api.send_order(req)
return vt_orderid
def cancel_order(self, req: CancelRequest) -> None:
"""委托撤单"""
self.td_api.cancel_order(req)
def query_account(self) -> None:
"""查询资金"""
self.td_api.query_account()
def query_position(self) -> None:
"""查询持仓"""
self.td_api.query_position()
def queryInvestUnit(self): # hxxjava add
"""查询投资单元"""
self.td_api.queryInvestUnit()
def query_broker_trading_params(self,req:BrokerTradingParamsRequest): # hxxjava add
""" 查询经纪公司交易参数 """
self.td_api.query_broker_trading_params(req)
def query_investor(self) -> None:
"""
查询投资者
"""
self.td_api.query_investor()
def query_product(self,req:ProductRequst) -> None:
""" 查询产品信息 """
self.td_api.query_product(req)
def query_commission(self,req:CommissionRequest) -> None: # hxxjava add
"""查询手续费数据"""
self.td_api.query_commission(req)
def query_order_commission(self,req:OrderCommRateRequest): # hxxjava add
"""查询报单手续费数据"""
self.td_api.query_order_commission(req)
def query_margin_ratio(self,req:MarginRequest): # hxxjava add
"""查询保证金率数据"""
self.td_api.query_margin_ratio(req)
def query_exchange_margin_rate(self,req:MarginRequest): # hxxjava add
"""查询交易所保证金率数据"""
self.td_api.query_exchange_margin_rate(req)
def query_exchange_margin_rate_adjust(self,req:MarginRateAdjustRequest): # hxxjava add
"""查询交易所保证金率调整数据"""
self.td_api.query_exchange_margin_rate_adjust(req)
def close(self):
""""""
self.td_api.close()
self.md_api.close()
def write_error(self, msg: str, error: dict) -> None:
"""输出错误信息日志"""
error_id: int = error["ErrorID"]
error_msg: str = error["ErrorMsg"]
msg = f"{msg},代码:{error_id},信息:{error_msg}"
self.write_log(msg)
def process_timer_event(self, event) -> None:
"""定时事件处理"""
self.count += 1
if self.count < 2:
return
self.count = 0
if self.td_api.connect_status:
print(
"交易所保证金:",len(self.td_api.ex_margin_rate_data),
"合约状态:",len(self.td_api.status_data),
"合约信息:",len(symbol_contract_map.keys()),
"委托单数:",len(self.td_api.order_data),
"成交单数:",len(self.td_api.trade_data),
"报单+查询:",(len(self.td_api.order_functions),len(self.td_api.query_functions))
)
self.md_api.update_date()
def init_query(self) -> None:
"""初始化查询任务"""
self.count: int = 0
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
class CtpMdApi(MdApi):
""""""
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.subscribed: List[str] = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.current_date: str = datetime.now().strftime("%Y%m%d")
def onFrontConnected(self) -> None:
"""服务器连接成功回报"""
self.gateway.on_connect(GatewayData(name="CTP",type='MD')) # hxxjava add
self.gateway.write_log("行情服务器连接成功")
self.login()
def onFrontDisconnected(self, reason: int) -> None:
"""服务器连接断开回报"""
self.login_status = False
self.gateway.on_disconnect(GatewayData(name="CTP",type='MD',reason=reason)) # hxxjava add
self.gateway.write_log(f"行情服务器连接断开,原因{reason}")
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""用户登录请求回报"""
if not error["ErrorID"]:
self.login_status = True
self.gateway.write_log("行情服务器登录成功")
for symbol in self.subscribed:
self.subscribeMarketData(symbol)
else:
self.gateway.write_error("行情服务器登录失败", error)
def onRspError(self, error: dict, reqid: int, last: bool) -> None:
"""请求报错回报"""
self.gateway.write_error("行情接口报错", error)
def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""订阅行情回报"""
if not error or not error["ErrorID"]:
return
self.gateway.write_error("行情订阅失败", error)
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
# 过滤没有时间戳的异常行情数据
if not data["UpdateTime"]:
return
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if not contract:
return
timestamp: str = f"{self.current_date} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
dt = CHINA_TZ.localize(dt)
tick: TickData = TickData(
symbol=symbol,
exchange=contract.exchange,
datetime=dt,
name=contract.name,
volume=data["Volume"],
open_interest=data["OpenInterest"],
last_price=adjust_price(data["LastPrice"]),
limit_up=data["UpperLimitPrice"],
limit_down=data["LowerLimitPrice"],
open_price=adjust_price(data["OpenPrice"]),
high_price=adjust_price(data["HighestPrice"]),
low_price=adjust_price(data["LowestPrice"]),
pre_close=adjust_price(data["PreClosePrice"]),
bid_price_1=adjust_price(data["BidPrice1"]),
ask_price_1=adjust_price(data["AskPrice1"]),
bid_volume_1=data["BidVolume1"],
ask_volume_1=data["AskVolume1"],
gateway_name=self.gateway_name
)
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])
tick.bid_price_4 = adjust_price(data["BidPrice4"])
tick.bid_price_5 = adjust_price(data["BidPrice5"])
tick.ask_price_2 = adjust_price(data["AskPrice2"])
tick.ask_price_3 = adjust_price(data["AskPrice3"])
tick.ask_price_4 = adjust_price(data["AskPrice4"])
tick.ask_price_5 = adjust_price(data["AskPrice5"])
tick.bid_volume_2 = data["BidVolume2"]
tick.bid_volume_3 = data["BidVolume3"]
tick.bid_volume_4 = data["BidVolume4"]
tick.bid_volume_5 = data["BidVolume5"]
tick.ask_volume_2 = data["AskVolume2"]
tick.ask_volume_3 = data["AskVolume3"]
tick.ask_volume_4 = data["AskVolume4"]
tick.ask_volume_5 = data["AskVolume5"]
self.gateway.on_tick(tick)
def connect(self, address: str, userid: str, password: str, brokerid: int) -> None:
"""连接服务器"""
self.userid = userid
self.password = password
self.brokerid = brokerid
# 禁止重复发起连接,会导致异常崩溃
if not self.connect_status:
path: Path = get_folder_path(self.gateway_name.lower())
self.createFtdcMdApi((str(path) + "\\Md").encode("GBK"))
self.registerFront(address)
self.init()
self.connect_status = True
def login(self) -> None:
"""用户登录"""
ctp_req: dict = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid
}
self.reqid += 1
self.reqUserLogin(ctp_req, self.reqid)
def subscribe(self, req: SubscribeRequest) -> None:
"""订阅行情"""
if self.login_status:
self.subscribeMarketData(req.symbol)
self.subscribed.add(req.symbol)
def close(self) -> None:
"""关闭连接"""
if self.connect_status:
self.exit()
def update_date(self) -> None:
"""更新当前日期"""
self.current_date = datetime.now().strftime("%Y%m%d")
class CtpTdApi(TdApi):
""""""
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
# hxxjava start
self.order_functions:List[Function] = []
self.query_functions:List[Function] = []
self.inited = False
self.debug = True
self.td_lock = threading.Lock()
# hxxjava end
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.order_ref: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.auth_status: bool = False
self.login_failed: bool = False
self.contract_inited: bool = False
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.auth_code: str = ""
self.appid: str = ""
self.frontid: int = 0
self.sessionid: int = 0
# hxxjava start
self.constract_data: List[dict] = []
self.status_data: List[dict] = []
self.ex_margin_rate_data: List[dict] = []
# hxxjava end
self.order_data: List[dict] = []
self.trade_data: List[dict] = []
self.positions: Dict[str, PositionData] = {}
self.sysid_orderid_map: Dict[str, str] = {}
self.init_order_query_functions()
def init_order_query_functions(self) -> None:
"""初始化查询任务"""
self.order_functions.clear()
self.query_functions.clear()
self.query_functions.append(Function(func=self.query_investor,once=True))
self.query_functions.append(Function(func=self.query_broker_trading_params,param={'req':BrokerTradingParamsRequest()},once=True))
self.query_functions.append(Function(func=self.query_account,maxcount=3))
self.query_functions.append(Function(func=self.query_position,maxcount=3))
self.exec_order_cmds()
self.exec_query_cmds()
def exec_order_cmds(self): # hxxjava add
""" 执行报单命令 """
if self.inited and self.order_functions:
# 报单命令列表中有内容
order_func:Function = self.order_functions.pop(0)
ret_val = order_func.exec()
if ret_val == 0:
# 报单命令执行成功
if not order_func.once:
self.order_functions.append(order_func)
else:
# 报单命令执行失败,添加到命令列表的头部
self.order_functions.insert(0,order_func)
self.order_timer = Timer(1.0/6, self.exec_order_cmds)
self.order_timer.start()
def exec_query_cmds(self): # hxxjava add
""" 执行查询命令 """
if self.inited and self.query_functions:
# 查询命令列表中有内容
func:Function = self.query_functions.pop(0)
ret_val = func.exec()
# print(f"{func.to_str()} exec()={ret_val}")
if ret_val == 0:
# 查询命令执行成功
if not func.once:
# 周期性查询命令,添加到命令列表的尾部
self.query_functions.append(func)
elif ret_val == -5:
# 循环计数未达到的,添加到命令列表的尾部
self.query_functions.append(func)
elif ret_val in [-2,-3]:
# 查询命令执行失败,添加到命令列表的头部
self.query_functions.insert(0,func)
self.query_timer = Timer(1.0, self.exec_query_cmds)
self.query_timer.start()
def execute_func(self,func:Callable,req:Dict,prompt:str="") -> int:
"""
统一的查询执行函数,
# 返回值:
# 0:成功;-1:网络连接失败
# -2:未处理请求超过许可数;-3:每秒发送请求数超过许可数
"""
self.td_lock.acquire()
self.reqid += 1
result = func(req,self.reqid)
self.td_lock.release()
if prompt and self.debug:
print(f"{prompt} excuted at {datetime.now()},result={result}")
return result
def query_investor(self):
"""" 请求查询投资者 """
req = {"BrokerID":self.brokerid,"InvestorID":self.userid}
if self.execute_func(func=self.reqQryInvestor,req=req,prompt="请求查询投资者") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_investor,once=True))
def onRspQryInvestor(self, data: dict, error: dict, reqid: int, last: bool): # -> None:
"""
请求查询投资者响应,当执行ReqQryInvestor后,该方法被调用。
"""
# print(f"onRspQryInvestor data={data},error={error},reqid={reqid},last={last}")
if data:
investor = InvestorData(
id = data['InvestorID'],
name = data['InvestorName'],
broker = data['BrokerID'],
group = data['InvestorGroupID'],
identifiedCardType = IdCardType(data['IdentifiedCardType']),
identifiedCardNo = data['IdentifiedCardNo'],
is_active = data['IsActive'],
telephone = data['Telephone'],
address = data['Address'],
open_date = data['OpenDate'],
mobile = data['Mobile'],
commission_model = data['CommModelID'],
margin_model = data['MarginModelID'],
gateway_name=self.gateway_name
)
self.gateway.on_query_investor(investor)
def queryInvestUnit(self): # hxxjava add
"""
# 查询投资单元
"""
if not self.connect_status:
return
req = {}
req["BrokerID"]=self.brokerid
req["InvestorID"]=self.userid
req["InvestUnitID"]=""
if self.execute_func(func=self.reqQryInvestUnit,req=req,prompt="查询投资单元") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.queryInvestUnit,once=True))
def onRspQryInvestUnit(self, data: dict, error: dict, reqid: int, last: bool): # -> None:
"""
# 查询投资单元应答
"""
print(f"onRspQryInvestUnit data={data} error={error}")
def onRspQryInstrumentCommissionRate(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
查询合约手续费率
"""
if data:
commission = CommissionData(
symbol = data['InstrumentID'],
exchange = data["ExchangeID"], # EXCHANGE_CTP2VT[data["ExchangeID"]]
open_ratio_bymoney=data['OpenRatioByMoney'],
open_ratio_byvolume=data['OpenRatioByVolume'],
close_ratio_bymoney=data['CloseRatioByMoney'],
close_ratio_byvolume=data['CloseRatioByVolume'],
close_today_ratio_bymoney=data['CloseTodayRatioByMoney'],
close_today_ratio_byvolume=data['CloseTodayRatioByVolume'],
investor_range = data['InvestorRange'],
biz_type = data['BizType'],
invest_unit_id = data['InvestUnitID'],
gateway_name=self.gateway_name
)
# print(f"onRspQryInstrumentCommissionRate = {data}")
self.gateway.on_commission(commission)
def onRspQryInstrumentOrderCommRate(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
查询报单手续费
"""
if data:
commission = OrderCommRateData(
symbol = data['InstrumentID'],
exchange = data["ExchangeID"], # EXCHANGE_CTP2VT[data["ExchangeID"]]
investor_range = data['InvestorRange'],
invest_unit_id = data['InvestUnitID'],
hedge_flag = data['HedgeFlag'],
order_comm_byvolume=data['OrderCommByVolume'],
order_action_comm_byvolume=data['OrderActionCommByVolume'],
gateway_name=self.gateway_name
)
self.gateway.on_order_commission(commission)
def onRspQryInstrumentMarginRate(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
查询合约保证金率
"""
if data:
margin_rate = InstrumentMarginData(
symbol = data['InstrumentID'],
exchange = data["ExchangeID"], # EXCHANGE_CTP2VT[data["ExchangeID"]]
long_margin_rate = data["LongMarginRatioByMoney"],
long_margin_perlot = data["LongMarginRatioByVolume"],
short_margin_rate = data["ShortMarginRatioByMoney"],
short_margin_perlot = data["ShortMarginRatioByVolume"],
is_ralative=data['IsRelative'],
investor_id=data['InvestorID'],
invest_unit_id=data['InvestUnitID'],
gateway_name=self.gateway_name
)
self.gateway.on_instrument_margin_rate(margin_rate)
def onRspQryExchangeMarginRate(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
收到查询交易所保证金率结果
"""
if not self.contract_inited:
# 合约数据还没有初始化
if error:
print(f"查询交易所保证金率 错误={error}")
if data:
self.ex_margin_rate_data.append(data)
if last:
self.gateway.write_log(f"开始查询合约信息")
if self.execute_func(func=self.reqQryInstrument,req={},prompt="查询合约信息") in [-2,-3]:
self.timeContract = Timer(interval=1.0,function = self.onRspQryExchangeMarginRate,
kwargs={'data':None,'error':None,'reqid':reqid,'last':True})
self.timeContract.start()
return
elif error:
pass
elif data:
margin_rate = self.extractExchangeMarginRate(data)
self.gateway.on_exchange_margin_rate(margin_rate)
# print(f"查询交易所保证金率结果 :data={data} error={error}")
def extractExchangeMarginRate(self,data:dict): # hxxjava add
""" 提取交易所保证金率并且发送给网关 """
return MarginData(
symbol = data['InstrumentID'],
exchange = data["ExchangeID"], # 总是为空 EXCHANGE_CTP2VT[data["ExchangeID"]],
hedge_flag = HedgeType(data["HedgeFlag"]),
long_margin_rate = data["LongMarginRatioByMoney"],
long_margin_perlot = data["LongMarginRatioByVolume"],
short_margin_rate = data["ShortMarginRatioByMoney"],
short_margin_perlot = data["ShortMarginRatioByVolume"],
gateway_name=self.gateway_name
)
def onRspQryExchangeMarginRateAdjust(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
"""
收到查询交易所保证金率结果
"""
if error:
pass
if data:
adjust = MarginRateAdjustData(
symbol = data["InstrumentID"],
hedge_flag = HedgeType(data["HedgeFlag"]),
LongMarginRatioByMoney = data["LongMarginRatioByMoney"],
LongMarginRatioByVolume = data["LongMarginRatioByVolume"],
ShortMarginRatioByMoney = data["ShortMarginRatioByMoney"],
ShortMarginRatioByVolume = data["ShortMarginRatioByVolume"],
ExchLongMarginRatioByMoney = data["ExchLongMarginRatioByMoney"],
ExchLongMarginRatioByVolume = data["ExchLongMarginRatioByVolume"],
ExchShortMarginRatioByMoney = data["ExchShortMarginRatioByMoney"],
ExchShortMarginRatioByVolume = data["ExchShortMarginRatioByVolume"],
NoLongMarginRatioByMoney = data["NoLongMarginRatioByMoney"],
NoLongMarginRatioByVolume = data["NoLongMarginRatioByVolume"],
NoShortMarginRatioByMoney = data["NoShortMarginRatioByMoney"],
NoShortMarginRatioByVolume = data["NoShortMarginRatioByVolume"],
gateway_name=self.gateway_name
)
# 并且发送给网关
self.gateway.on_exchange_margin_rate_adjust(adjust)
def onRspQryBrokerTradingParams(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
""" 请求查询经纪公司交易参数响应 """
from vnpy.trader.constant import HedgeType,InstrumentStatus,MarginPriceType,AlgorithmType,IncludeCloseProfitType,Currency,OptionRoyaltyPriceType # hxxjava add
if data:
# print(f"onRspQryBrokerTradingParams : {data}")
broker_trading_param = BrokerTradingParamsData(
margin_price_type = MarginPriceType(data["MarginPriceType"]),
algorithm = AlgorithmType(data["Algorithm"]),
avail_include_close_profit = IncludeCloseProfitType(data["AvailIncludeCloseProfit"]),
currency = Currency(data["CurrencyID"]) if data["CurrencyID"] else "",
option_royalty_price_type = OptionRoyaltyPriceType(data["OptionRoyaltyPriceType"]),
account = data["AccountID"],
gateway_name=self.gateway_name
)
self.gateway.on_broker_trading_params(broker_trading_param)
def onRspQryProduct(self, data: dict, error: dict, reqid: int, last: bool): # hxxjava add
""" 请求查询产品响应 """
print(f"请求查询产品响应 :data={data} error={error}")
if data:
product = ProductData(
id = data["ProductID"],
name = data["ProductName"],
exchange = Exchange(data["ExchangeID"]),
product_class = data["ProductClass"],
size = data["VolumeMultiple"],
price_tick = data["PriceTick"],
MaxMarketOrderVolume = data["MaxMarketOrderVolume"],
MinMarketOrderVolume = data["MinMarketOrderVolume"],
MaxLimitOrderVolume = data["MaxLimitOrderVolume"],
MinLimitOrderVolume = data["MinLimitOrderVolume"],
PositionType = data["PositionType"],
PositionDateType = data["PositionDateType"],
TradeCurrencyID = data["TradeCurrencyID"],
CloseDealType = data["CloseDealType"],
MortgageFundUseRange = data["MortgageFundUseRange"],
ExchangeProductID = data["ExchangeProductID"],
UnderlyingMultiple = data["UnderlyingMultiple"],
gateway_name=self.gateway_name
)
self.gateway.on_query_product(product)
def query_broker_trading_params(self,req:BrokerTradingParamsRequest): # hxxjava add
""" 请求查询经纪公司交易参数 """
if not self.connect_status:
return
req_dict = {}
req_dict["BrokerID"] = self.brokerid
req_dict["InvestorID"] = self.userid
req_dict["CurrencyID"] = req.CurrencyID
req_dict["AccountID"] = req.AccountID
if self.execute_func(func=self.reqQryBrokerTradingParams,req=req_dict,prompt="查询经纪公司交易参数") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_broker_trading_params,once=True,param={'req':req}))
def query_product(self,req:ProductRequst) -> None:
""" 请求查询产品 """
if not self.connect_status:
return
req_dict = {}
req_dict["ProductID"] = req.product
req_dict["ProductClass"] = req.product_class.value
req_dict["ExchangeID"] = "" if not req.exchange else req.exchange.value
if self.execute_func(func=self.reqQryProduct,req=req_dict,prompt="请求查询产品") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_product,once=True,param={'req':req}))
def query_commission(self,req:CommissionRequest): # hxxjava add
""" 查询手续费率 """
if not self.connect_status:
return
commission_req = {}
commission_req['BrokerID'] = self.brokerid
commission_req['InvestorID'] = self.userid
commission_req['InvestorUnitID'] = self.userid #hxxjava debug char[31]
commission_req['InstrumentID'] = req.symbol
commission_req['ExchangeID'] = req.exchange.value if req.exchange else ""
if self.execute_func(func=self.reqQryInstrumentCommissionRate,req=commission_req,prompt="查询手续费率") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_commission,once=True,param={'req':req}))
def query_order_commission(self,req:OrderCommRateRequest): # hxxjava add
""" 查询报单手续费 """
if not self.connect_status:
return
req_dict = {}
req_dict['BrokerID'] = self.brokerid
req_dict['InvestorID'] = self.userid
req_dict['InstrumentID'] = req.symbol
if self.execute_func(func=self.reqQryInstrumentOrderCommRate,req=req_dict,prompt="查询报单手续费") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_commission,once=True,param={'req':req}))
def query_margin_ratio(self,req:MarginRequest): # hxxjava add
""" 合约保证金率查询 """
if not self.connect_status:
return
ctp_req = {}
ctp_req['BrokerID'] = self.brokerid
ctp_req['InvestorID'] = self.userid
ctp_req['HedgeFlag'] = req.hedge_type.value
ctp_req['ExchangeID'] = req.exchange.value if req.exchange else ""
ctp_req['InvestUnitID'] = self.userid # hxxjava debug char[81]
ctp_req['InstrumentID'] = req.symbol
if self.execute_func(func=self.reqQryInstrumentMarginRate,req=ctp_req,prompt="合约保证金率查询") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_margin_ratio,once=True,param={'req':req}))
def query_exchange_margin_rate(self,req:MarginRequest): # hxxjava add
""" 交易所保证金率查询 """
if not self.connect_status:
return
out_req = {}
out_req['BrokerID'] = self.brokerid
out_req['InstrumentID'] = req.symbol
out_req['HedgeFlag'] = req.hedge_type.value
out_req['ExchangeID'] = req.exchange.value if req.exchange else ""
if self.execute_func(func=self.reqQryExchangeMarginRate,req=out_req,prompt="交易所保证金率查询") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_exchange_margin_rate,once=True,param={'req':req}))
def query_exchange_margin_rate_adjust(self,req:MarginRateAdjustRequest): # hxxjava add
""" 保证金率调整查询 """
if not self.connect_status:
return
out_req = {}
out_req['BrokerID'] = self.brokerid
out_req['InstrumentID'] = req.symbol
out_req['HedgeFlag'] = req.hedge_type.value
if self.execute_func(func=self.reqQryExchangeMarginRateAdjust,req=out_req,prompt="交易所保证金率调整查询") in [-2,-3]:
# 违反查询流控,添加到查询函数队列的尾部
self.query_functions.append(Function(func=self.query_exchange_margin_rate_adjust,once=True,param={'req':req}))
def onFrontConnected(self):
""""""
self.gateway.on_connect(GatewayData(name="CTP",type='TD')) # hxxjava add
self.gateway.write_log("交易服务器连接成功")
if self.auth_code:
self.authenticate()
else:
self.login()
def onFrontDisconnected(self, reason: int) -> None:
""" 服务器连接断开回报 """
self.login_status = False
self.gateway.on_disconnect(GatewayData(name="CTP",type='TD',reason=reason)) # hxxjava add
self.gateway.write_log(f"交易服务器连接断开,原因{reason}")
def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""用户授权验证回报"""
if not error['ErrorID']:
self.auth_status = True
self.gateway.write_log("交易服务器授权验证成功")
self.login()
else:
self.gateway.write_error("交易服务器授权验证失败", error)
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""用户登录请求回报"""
if not error["ErrorID"]:
self.frontid = data["FrontID"]
self.sessionid = data["SessionID"]
self.login_status = True
self.gateway.write_log("交易服务器登录成功")
# 自动确认结算单
ctp_req: dict = {
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
self.reqSettlementInfoConfirm(ctp_req, self.reqid)
else:
self.login_failed = True
self.gateway.write_error("交易服务器登录失败", error)
def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""委托下单失败回报"""
order_ref: str = data["OrderRef"]
orderid: str = f"{self.frontid}_{self.sessionid}_{order_ref}"
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
order: OrderData = OrderData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT.get(data["CombOffsetFlag"], Offset.NONE),
price=data["LimitPrice"],
volume=float(data["VolumeTotalOriginal"]), # hxxjava change
status=Status.REJECTED,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
self.gateway.write_error("交易委托失败", error)
def onRspOrderAction(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""委托撤单失败回报"""
self.gateway.write_error("交易撤单失败", error)
def onRspSettlementInfoConfirm(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""确认结算单回报"""
self.gateway.write_log("结算信息确认成功")
# # 由于流控,单次查询可能失败,通过while循环持续尝试,直到成功发出请求
# while True:
# self.reqid += 1
# n: int = self.reqQryInstrument({}, self.reqid)
# if not n:
# break
# else:
# sleep(1)
# hxxjava change start
self.gateway.write_log(f"查询交易所保证金")
self.query_exchange_margin_rate(MarginRequest(symbol="",exchange=None))
# hxxjava change end
def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""持仓查询回报"""
if not data:
return
# print(f"持仓查询回报:{data}")
# 必须已经收到了合约信息后才能处理
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if contract:
# 获取之前缓存的持仓数据缓存
key: str = f"{data['InstrumentID'], data['PosiDirection']}"
position: PositionData = self.positions.get(key, None)
if not position:
position = PositionData(
symbol=data["InstrumentID"],
exchange=contract.exchange,
direction=DIRECTION_CTP2VT[data["PosiDirection"]],
gateway_name=self.gateway_name
)
self.positions[key] = position
# 对于上期所昨仓需要特殊处理
if position.exchange in [Exchange.SHFE, Exchange.INE]:
if data["YdPosition"] and not data["TodayPosition"]:
position.yd_volume = data["Position"]
# 对于其他交易所昨仓的计算
else:
position.yd_volume = data["Position"] - data["TodayPosition"]
# 获取合约的乘数信息
size: int = contract.size
# 计算之前已有仓位的持仓总成本
cost: float = position.price * position.volume * size
# 累加更新持仓数量和盈亏
position.volume += data["Position"]
position.pnl += data["PositionProfit"]
# 计算更新后的持仓总成本和均价
if position.volume and size:
cost += data["PositionCost"]
position.price = cost / (position.volume * size)
# 更新仓位冻结数量
if position.direction == Direction.LONG:
position.frozen += data["ShortFrozen"]
else:
position.frozen += data["LongFrozen"]
if last:
for position in self.positions.values():
self.gateway.on_position(position)
self.positions.clear()
def onRspQryTradingAccount(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""资金查询回报"""
if "AccountID" not in data:
return
account: AccountData = AccountData(
accountid=data["AccountID"],
balance=data["Balance"],
frozen=data["FrozenMargin"] + data["FrozenCash"] + data["FrozenCommission"],
gateway_name=self.gateway_name
)
account.available = data["Available"]
self.gateway.on_account(account)
def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""合约查询回报"""
product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
if product:
contract: ContractData = ContractData(
symbol=data["InstrumentID"],
exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
name=data["InstrumentName"],
product=product,
size=data["VolumeMultiple"],
pricetick=data["PriceTick"],
# hxxjava add start
max_market_order_volume=data["MaxMarketOrderVolume"],
min_market_order_volume=data["MinMarketOrderVolume"],
max_limit_order_volume=data["MaxLimitOrderVolume"],
min_limit_order_volume=data["MinLimitOrderVolume"],
open_date=data["OpenDate"],
expire_date=data["ExpireDate"],
is_trading=data["IsTrading"],
long_margin_ratio=data["LongMarginRatio"],
short_margin_ratio=data["ShortMarginRatio"],
# hxxjava add end
gateway_name=self.gateway_name
)
# 期权相关
if contract.product == Product.OPTION:
# 移除郑商所期权产品名称带有的C/P后缀
if contract.exchange == Exchange.CZCE:
contract.option_portfolio = data["ProductID"][:-1]
else:
contract.option_portfolio = data["ProductID"]
contract.option_underlying = data["UnderlyingInstrID"]
contract.option_type = OPTIONTYPE_CTP2VT.get(data["OptionsType"], None)
contract.option_strike = data["StrikePrice"]
contract.option_index = str(data["StrikePrice"])
contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")
# self.gateway.on_contract(contract)
symbol_contract_map[contract.symbol] = contract
if last:
# 更新所有合约信息
self.gateway.write_log("更新所有合约信息...")
self.gateway.on_contract_end(symbol_contract_map.values())
self.contract_inited = True
self.gateway.write_log("合约信息查询成功")
self.gateway.write_log(f"收到{len(symbol_contract_map)}条合约信息")
self.gateway.write_log(f"提取{len(self.ex_margin_rate_data)}条交易所保证金率信息")
if self.ex_margin_rate_data:
EMRs = []
for data in self.ex_margin_rate_data:
EMRs.append(self.extractExchangeMarginRate(data))
self.gateway.on_exchange_margin_rate_end(EMRs)
self.ex_margin_rate_data.clear()
self.gateway.write_log(f"提取{len(self.status_data)}条状态信息")
if self.status_data:
statuses = []
for data in self.status_data:
statuses.append(self.extractInstrumentStatus(data))
self.gateway.on_status_end(statuses)
self.status_data.clear()
self.gateway.write_log(f"提取{len(self.order_data)}条委托单信息")
for data in self.order_data:
self.onRtnOrder(data)
self.order_data.clear()
self.gateway.write_log(f"提取{len(self.trade_data)}条成交单信息")
for data in self.trade_data:
self.onRtnTrade(data)
self.trade_data.clear()
self.inited = True
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约品种状态信息 # hxxjava debug
"""
if not self.contract_inited:
self.status_data.append(data)
return
status = self.extractInstrumentStatus(data)
self.gateway.on_status(status)
def extractInstrumentStatus(self,data:dict): # hxxjava add
""" 提取合约品种状态信息 """
return StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
def onRtnOrder(self, data: dict) -> None:
""" 委托更新推送 """
if not self.contract_inited:
self.order_data.append(data)
return
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
frontid: int = data["FrontID"]
sessionid: int = data["SessionID"]
order_ref: str = data["OrderRef"]
orderid: str = f"{frontid}_{sessionid}_{order_ref}"
timestamp: str = f"{data['InsertDate']} {data['InsertTime']}"
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
dt = CHINA_TZ.localize(dt)
dt1 = datetime.strptime(f"{data['TradingDay']}", "%Y%m%d") # hxxjava add
tradingday = dt1.date() # hxxjava add
order = OrderData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
type=ORDERTYPE_CTP2VT[data["OrderPriceType"]],
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT[data["CombOffsetFlag"]],
price=data["LimitPrice"],
volume=float(data["VolumeTotalOriginal"]), # hxxjava change
traded=float(data["VolumeTraded"]), # hxxjava change
status=STATUS_CTP2VT[data["OrderStatus"]],
datetime=dt,
tradingday=tradingday, # hxxjava add
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
self.sysid_orderid_map[data["OrderSysID"]] = orderid
def onRtnTrade(self, data: dict) -> None:
""" 成交数据推送 """
if not self.contract_inited:
self.trade_data.append(data)
return
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
orderid: str = self.sysid_orderid_map[data["OrderSysID"]]
timestamp: str = f"{data['TradeDate']} {data['TradeTime']}"
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
dt = CHINA_TZ.localize(dt)
datestr = f"{data['TradingDay']}" # hxxjava add
dt1 = datetime.strptime(datestr, "%Y%m%d") # hxxjava add
tradingday = dt1.date() # hxxjava add
trade = TradeData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
tradeid=data["TradeID"],
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT[data["OffsetFlag"]],
price=data["Price"],
volume=data["Volume"],
datetime=dt,
tradingday=tradingday, # hxxjava add
gateway_name=self.gateway_name
)
self.gateway.on_trade(trade)
def onRspForQuoteInsert(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""询价请求回报"""
if not error["ErrorID"]:
symbol: str = data["InstrumentID"]
msg: str = f"{symbol}询价请求发送成功"
self.gateway.write_log(msg)
else:
self.gateway.write_error("询价请求发送失败", error)
def connect(
self,
address: str,
userid: str,
password: str,
brokerid: int,
auth_code: str,
appid: str
) -> None:
"""连接服务器"""
self.userid = userid
self.password = password
self.brokerid = brokerid
self.auth_code = auth_code
self.appid = appid
if not self.connect_status:
path: Path = get_folder_path(self.gateway_name.lower())
self.createFtdcTraderApi((str(path) + "\\Td").encode("GBK"))
self.subscribePrivateTopic(0)
self.subscribePublicTopic(0)
self.registerFront(address)
self.init()
self.connect_status = True
else:
self.authenticate()
def authenticate(self) -> None:
"""发起授权验证"""
ctp_req: dict = {
"UserID": self.userid,
"BrokerID": self.brokerid,
"AuthCode": self.auth_code,
"AppID": self.appid
}
self.reqid += 1
self.reqAuthenticate(ctp_req, self.reqid)
def login(self) -> None:
"""用户登录"""
if self.login_failed:
return
ctp_req: dict = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid,
"AppID": self.appid
}
self.reqid += 1
self.reqUserLogin(ctp_req, self.reqid)
def send_order(self, req: OrderRequest) -> str:
"""委托下单"""
if req.offset not in OFFSET_VT2CTP:
self.gateway.write_log("请选择开平方向")
return ""
if req.type not in ORDERTYPE_VT2CTP:
self.gateway.write_log(f"当前接口不支持该类型的委托{req.type.value}")
return ""
self.order_ref += 1
ctp_req: dict = {
"InstrumentID": req.symbol,
"ExchangeID": req.exchange.value,
"LimitPrice": req.price,
"VolumeTotalOriginal": int(req.volume),
"OrderPriceType": ORDERTYPE_VT2CTP.get(req.type, ""),
"Direction": DIRECTION_VT2CTP.get(req.direction, ""),
"CombOffsetFlag": OFFSET_VT2CTP.get(req.offset, ""),
"OrderRef": str(self.order_ref),
"InvestorID": self.userid,
"UserID": self.userid,
"BrokerID": self.brokerid,
"CombHedgeFlag": THOST_FTDC_HF_Speculation,
"ContingentCondition": THOST_FTDC_CC_Immediately,
"ForceCloseReason": THOST_FTDC_FCC_NotForceClose,
"IsAutoSuspend": 0,
"TimeCondition": THOST_FTDC_TC_GFD,
"VolumeCondition": THOST_FTDC_VC_$,
"MinVolume": 1
}
if req.type == OrderType.FAK:
ctp_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice
ctp_req["TimeCondition"] = THOST_FTDC_TC_IOC
ctp_req["VolumeCondition"] = THOST_FTDC_VC_$
elif req.type == OrderType.FOK:
ctp_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice
ctp_req["TimeCondition"] = THOST_FTDC_TC_IOC
ctp_req["VolumeCondition"] = THOST_FTDC_VC_CV
self.reqid += 1
ret_val = self.reqOrderInsert(ctp_req, self.reqid)
if ret_val == 0: # hxxjava change
# 报单成功
orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
order: OrderData = req.create_order_data(orderid, self.gateway_name)
self.gateway.on_order(order)
return order.vt_orderid
if ret_val in [-2,-3]:
# 违反报单流控,添加到报单函数队列的尾部
self.order_functions.append(Function(func=self.send_order,once=True,param={'req':req}))
return ""
def cancel_order(self, req: CancelRequest) -> None:
"""委托撤单"""
frontid, sessionid, order_ref = req.orderid.split("_")
ctp_req: dict = {
"InstrumentID": req.symbol,
"ExchangeID": req.exchange.value,
"OrderRef": order_ref,
"FrontID": int(frontid),
"SessionID": int(sessionid),
"ActionFlag": THOST_FTDC_AF_Delete,
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
if self.reqOrderAction(ctp_req, self.reqid) in [-2,-3]:
# 违反报单流控,添加到报单函数队列的尾部
self.order_functions.append(Function(func=self.cancel_order,once=True,param={'req':req}))
def send_rfq(self, req: OrderRequest) -> str:
"""询价请求"""
self.order_ref += 1
ctp_req: dict = {
"InstrumentID": req.symbol,
"ExchangeID": req.exchange.value,
"ForQuoteRef": str(self.order_ref),
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
ret_val = self.reqForQuoteInsert(ctp_req, self.reqid)
if ret_val == 0: # hxxjava change
# 成功执行了询价报单
orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
vt_orderid: str = f"{self.gateway_name}.{orderid}"
return vt_orderid
if ret_val in [-2,-3]:
# 违反报单流控,添加到报单函数队列的尾部
self.order_functions.append(Function(func=self.send_rfq,once=True,param={'req':req}))
return ""
def query_account(self) -> int:
"""查询资金"""
if not self.connect_status:
return -4
# self.reqid += 1
# self.reqQryTradingAccount({}, self.reqid)
return self.execute_func(func=self.reqQryTradingAccount,req={},prompt="查询资金账户")
def query_position(self) -> int:
"""查询持仓"""
if not self.connect_status:
return -4
if not symbol_contract_map:
return -5
ctp_req: dict = {
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
# self.reqid += 1
# self.reqQryInvestorPosition(ctp_req, self.reqid)
return self.execute_func(func=self.reqQryInvestorPosition,req=ctp_req,prompt="查询投资者持仓")
def close(self) -> None:
"""关闭连接"""
if self.connect_status:
self.exit()
def adjust_price(price: float) -> float:
"""将异常的浮点数最大值(MAX_FLOAT)数据调整为0"""
if price == MAX_FLOAT:
price = 0
return price