jingsiaosing wrote:
1.行情建议直接用期货公司给的实盘前置,不用输入用户名密码可以直接使用的,比simnow稳定
2.不太可能是带宽不够,不过无法排除网络不稳定的原因。建议直接贴出你的代码
已经实盘用了很久,没有发现你所说的丢tick情况
我用的就是期货公司的实盘前置,simnow只是用于交易端验证登录的,不用于接受行情。
我把代码贴一下,我自己把ctp_gateway整个全改了,把里面MdApi和TdApi中涉及到上层消息队列相关的逻辑全部去掉了,然后CtpGateway也不再是继承自抽象类BaseGateway了,而是直接作为MdApi和TdApi的wrapper类用于方面我在run里面直接测试。剩余的逻辑变动都很简单,run函数直接调用CtpGateway中相关的接口就行,不需要import eventengine之类的任何类。
先是run函数,代码量较少
import multiprocessing
from time import sleep
from datetime import datetime, time
from logging import INFO
from vnpy.trader.setting import SETTINGS
from vnpy_ctp import CtpGateway
from vnpy.trader.constant import Product
from vnpy.trader.object import ContractData, SubscribeRequest
from vnpy_ctp import CtpGateway
SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
ctp_setting = {
"用户名": "",
"密码": "",
"经纪商代码": "9999",
"交易服务器": "180.168.146.187:10201",
"行情服务器": "tcp://122.224.243.56:41313",
"产品名称": "simnow_client_test",
"授权编码": "0000000000000000",
"产品信息": ""
}
# Chinese futures market trading period (day/night)
DAY_START = time(8, 35)
DAY_END = time(15, 0)
# NIGHT_START = time(20, 45)
NIGHT_START = time(20, 45)
NIGHT_END = time(2, 45)
def check_trading_period():
""""""
current_time = datetime.now().time()
trading = False
if (
(current_time >= DAY_START and current_time <= DAY_END)
or (current_time >= NIGHT_START)
or (current_time <= NIGHT_END)
):
trading = True
return trading
def run_child():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
gateway = CtpGateway('CTP')
gateway.connect(ctp_setting)
sleep(10)
print(len(gateway.total_contracts))
total_contracts: list[ContractData] = gateway.total_contracts
future_contracts = [contract for contract in total_contracts if contract.product == Product.FUTURES]
# 只订阅单品种
for contract in future_contracts:
if contract.symbol == 'rb2301':
req: SubscribeRequest = SubscribeRequest(
symbol=contract.symbol,
exchange=contract.exchange
)
gateway.subscribe(req)
#订阅全市场
# for contract in future_contracts:
# req: SubscribeRequest = SubscribeRequest(
# symbol=contract.symbol,
# exchange=contract.exchange
# )
# gateway.subscribe(req)
try:
while True:
sleep(10)
except KeyboardInterrupt:
print('收到退出指令')
gateway.close()
def run_parent():
"""
Running in the parent process.
"""
print("启动CTA策略守护父进程")
child_process = None
while True:
trading = check_trading_period()
# Start child process in trading period
if trading and child_process is None:
print("启动子进程")
child_process = multiprocessing.Process(target=run_child)
child_process.start()
print("子进程启动成功")
# 非记录时间则退出子进程
if not trading and child_process is not None:
if not child_process.is_alive():
child_process = None
print("子进程关闭成功")
sleep(5)
if __name__ == "__main__":
run_parent()
然后是我自己修改的ctpgateway:
import sys
import pytz
from datetime import datetime
from time import sleep
from typing import Dict, List, Tuple
from pathlib import Path
from vnpy.event import EventEngine
from vnpy.trader.constant import (
Direction,
Offset,
Exchange,
OrderType,
Product,
Status,
OptionType
)
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
TickData,
OrderData,
TradeData,
PositionData,
AccountData,
ContractData,
OrderRequest,
CancelRequest,
SubscribeRequest,
)
from vnpy.trader.utility import get_folder_path
from vnpy.trader.event import EVENT_TIMER
from ..api import (
MdApi,
TdApi,
THOST_FTDC_OST_NoTradeQueueing,
THOST_FTDC_OST_PartTradedQueueing,
THOST_FTDC_OST_AllTraded,
THOST_FTDC_OST_Canceled,
THOST_FTDC_OST_Unknown,
THOST_FTDC_D_Buy,
THOST_FTDC_D_Sell,
THOST_FTDC_PD_Long,
THOST_FTDC_PD_Short,
THOST_FTDC_OPT_LimitPrice,
THOST_FTDC_OPT_AnyPrice,
THOST_FTDC_OF_Open,
THOST_FTDC_OFEN_Close,
THOST_FTDC_OFEN_CloseYesterday,
THOST_FTDC_OFEN_CloseToday,
THOST_FTDC_PC_Futures,
THOST_FTDC_PC_Options,
THOST_FTDC_PC_SpotOption,
THOST_FTDC_PC_Combination,
THOST_FTDC_CP_CallOptions,
THOST_FTDC_CP_PutOptions,
THOST_FTDC_HF_Speculation,
THOST_FTDC_CC_Immediately,
THOST_FTDC_FCC_NotForceClose,
THOST_FTDC_TC_GFD,
THOST_FTDC_VC_$,
THOST_FTDC_TC_IOC,
THOST_FTDC_VC_CV,
THOST_FTDC_AF_Delete
)
# 委托状态映射
STATUS_CTP2VT: Dict[str, Status] = {
THOST_FTDC_OST_NoTradeQueueing: Status.NOTTRADED,
THOST_FTDC_OST_PartTradedQueueing: Status.PARTTRADED,
THOST_FTDC_OST_AllTraded: Status.ALLTRADED,
THOST_FTDC_OST_Canceled: Status.CANCELLED,
THOST_FTDC_OST_Unknown: Status.SUBMITTING
}
# 多空方向映射
DIRECTION_VT2CTP: Dict[Direction, str] = {
Direction.LONG: THOST_FTDC_D_Buy,
Direction.SHORT: THOST_FTDC_D_Sell
}
DIRECTION_CTP2VT: Dict[str, Direction] = {v: k for k, v in DIRECTION_VT2CTP.items()}
DIRECTION_CTP2VT[THOST_FTDC_PD_Long] = Direction.LONG
DIRECTION_CTP2VT[THOST_FTDC_PD_Short] = Direction.SHORT
# 委托类型映射
ORDERTYPE_VT2CTP: Dict[OrderType, tuple] = {
OrderType.LIMIT: (THOST_FTDC_OPT_LimitPrice, THOST_FTDC_TC_GFD, THOST_FTDC_VC_$),
OrderType.MARKET: (THOST_FTDC_OPT_AnyPrice, THOST_FTDC_TC_GFD, THOST_FTDC_VC_$),
OrderType.FAK: (THOST_FTDC_OPT_LimitPrice, THOST_FTDC_TC_IOC, THOST_FTDC_VC_$),
OrderType.FOK: (THOST_FTDC_OPT_LimitPrice, THOST_FTDC_TC_IOC, THOST_FTDC_VC_CV),
}
ORDERTYPE_CTP2VT: Dict[Tuple, OrderType] = {v: k for k, v in ORDERTYPE_VT2CTP.items()}
# 开平方向映射
OFFSET_VT2CTP: Dict[Offset, str] = {
Offset.OPEN: THOST_FTDC_OF_Open,
Offset.CLOSE: THOST_FTDC_OFEN_Close,
Offset.CLOSETODAY: THOST_FTDC_OFEN_CloseToday,
Offset.CLOSEYESTERDAY: THOST_FTDC_OFEN_CloseYesterday,
}
OFFSET_CTP2VT: Dict[str, Offset] = {v: k for k, v in OFFSET_VT2CTP.items()}
# 交易所映射
EXCHANGE_CTP2VT: Dict[str, Exchange] = {
"CFFEX": Exchange.CFFEX,
"SHFE": Exchange.SHFE,
"CZCE": Exchange.CZCE,
"DCE": Exchange.DCE,
"INE": Exchange.INE,
"GFEX": Exchange.GFEX
}
# 产品类型映射
PRODUCT_CTP2VT: Dict[str, Product] = {
THOST_FTDC_PC_Futures: Product.FUTURES,
THOST_FTDC_PC_Options: Product.OPTION,
THOST_FTDC_PC_SpotOption: Product.OPTION,
THOST_FTDC_PC_Combination: Product.SPREAD
}
# 期权类型映射
OPTIONTYPE_CTP2VT: Dict[str, OptionType] = {
THOST_FTDC_CP_CallOptions: OptionType.CALL,
THOST_FTDC_CP_PutOptions: OptionType.PUT
}
# 其他常量
MAX_FLOAT = sys.float_info.max # 浮点数极限值
CHINA_TZ = pytz.timezone("Asia/Shanghai") # 中国时区
# 合约数据全局缓存字典
symbol_contract_map: Dict[str, ContractData] = {}
class CtpGateway:
"""
VeighNa用于对接期货CTP柜台的交易接口。
"""
default_name: str = "CTP"
default_setting: Dict[str, str] = {
"用户名": "",
"密码": "",
"经纪商代码": "",
"交易服务器": "",
"行情服务器": "",
"产品名称": "",
"授权编码": ""
}
exchanges: List[str] = list(EXCHANGE_CTP2VT.values())
def __init__(self, gateway_name: str) -> None:
"""构造函数"""
# super().__init__(event_engine, gateway_name)
self.gateway_name = gateway_name
self.td_api: "CtpTdApi" = CtpTdApi(self)
self.md_api: "CtpMdApi" = CtpMdApi(self)
self.total_contracts = []
def connect(self, setting: dict) -> None:
"""连接交易接口"""
userid: str = setting["用户名"]
password: str = setting["密码"]
brokerid: str = setting["经纪商代码"]
td_address: str = setting["交易服务器"]
md_address: str = setting["行情服务器"]
appid: str = setting["产品名称"]
auth_code: str = setting["授权编码"]
if (
(not td_address.startswith("tcp://"))
and (not td_address.startswith("ssl://"))
and (not td_address.startswith("socks"))
):
td_address = "tcp://" + td_address
if (
(not md_address.startswith("tcp://"))
and (not md_address.startswith("ssl://"))
and (not md_address.startswith("socks"))
):
md_address = "tcp://" + md_address
self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid)
self.md_api.connect(md_address, userid, password, brokerid)
self.init_query()
def subscribe(self, req: SubscribeRequest) -> None:
"""订阅行情"""
self.md_api.subscribe(req)
def send_order(self, req: OrderRequest) -> str:
"""委托下单"""
return self.td_api.send_order(req)
def cancel_order(self, req: CancelRequest) -> None:
"""委托撤单"""
self.td_api.cancel_order(req)
def query_account(self) -> None:
"""查询资金"""
self.td_api.query_account()
def query_position(self) -> None:
"""查询持仓"""
self.td_api.query_position()
def close(self) -> None:
"""关闭接口"""
self.td_api.close()
self.md_api.close()
def write_error(self, msg: str, error: dict) -> None:
"""输出错误信息日志"""
error_id: int = error["ErrorID"]
error_msg: str = error["ErrorMsg"]
msg: str = f"{msg},代码:{error_id},信息:{error_msg}"
print(msg)
# self.write_log(msg)
def process_timer_event(self, event) -> None:
"""定时事件处理"""
self.count += 1
if self.count < 2:
return
self.count = 0
func = self.query_functions.pop(0)
func()
self.query_functions.append(func)
self.md_api.update_date()
def init_query(self) -> None:
"""初始化查询任务"""
self.count: int = 0
self.query_functions: list = [self.query_account, self.query_position]
# self.event_engine.register(EVENT_TIMER, self.process_timer_event)
class CtpMdApi(MdApi):
""""""
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.subscribed: set = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.current_date: str = datetime.now().strftime("%Y%m%d")
self.pretime = datetime.now()
self.i = 0
def onFrontConnected(self) -> None:
"""服务器连接成功回报"""
print("行情服务器连接成功")
self.login()
def onFrontDisconnected(self, reason: int) -> None:
"""服务器连接断开回报"""
self.login_status = False
print(f"行情服务器连接断开,原因{reason}")
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""用户登录请求回报"""
if not error["ErrorID"]:
self.login_status = True
print("行情服务器登录成功")
for symbol in self.subscribed:
self.subscribeMarketData(symbol)
else:
print("行情服务器登录失败", error)
def onRspError(self, error: dict, reqid: int, last: bool) -> None:
"""请求报错回报"""
print("行情接口报错", error)
def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""订阅行情回报"""
if not error or not error["ErrorID"]:
return
print("行情订阅失败", error)
def onRtnDepthMarketData(self, data: dict) -> None:
"""行情数据推送"""
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
date_str: str = self.current_date
timestamp: str = f"{date_str} {data['UpdateTime']}.{data['UpdateMillisec']}"
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
if data["InstrumentID"] == 'rb2301':
self.i += 1
timediff = (dt - self.pretime).microseconds + (dt - self.pretime).seconds * 1000_000
if timediff > 500_000:
infoo = f"合约:{data['InstrumentID']},时间:{data['UpdateTime']}, 毫秒:{data['UpdateMillisec']:<3}, 最新价:{data['LastPrice']}, 订阅数量:{len(self.subscribed)}, 行情数量:{self.i:<4},和上一笔时间差:{timediff/1000_000:0.2f}s!!!!!"
else:
infoo = f"合约:{data['InstrumentID']}, 时间:{data['UpdateTime']}, 毫秒:{data['UpdateMillisec']:<3}, 最新价:{data['LastPrice']}, 订阅数量:{len(self.subscribed)}, 行情数量:{self.i:<4}, 和上一笔时间差:{float(timediff/1000_000):0.2f}s"
self.pretime = dt
print(infoo, flush = True)
def connect(self, address: str, userid: str, password: str, brokerid: str) -> None:
"""连接服务器"""
self.userid = userid
self.password = password
self.brokerid = brokerid
# 禁止重复发起连接,会导致异常崩溃
if not self.connect_status:
path: Path = get_folder_path(self.gateway_name.lower())
self.createFtdcMdApi((str(path) + "\\Md").encode("GBK"))
self.registerFront(address)
self.init()
self.connect_status = True
def login(self) -> None:
"""用户登录"""
ctp_req: dict = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid
}
self.reqid += 1
self.reqUserLogin(ctp_req, self.reqid)
def subscribe(self, req: SubscribeRequest) -> None:
"""订阅行情"""
if self.login_status:
self.subscribeMarketData(req.symbol)
self.subscribed.add(req.symbol)
def close(self) -> None:
"""关闭连接"""
if self.connect_status:
self.exit()
def update_date(self) -> None:
"""更新当前日期"""
self.current_date = datetime.now().strftime("%Y%m%d")
class CtpTdApi(TdApi):
""""""
def __init__(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().__init__()
self.gateway: CtpGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.reqid: int = 0
self.order_ref: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.auth_status: bool = False
self.login_failed: bool = False
self.auth_failed: bool = False
self.contract_inited: bool = False
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.auth_code: str = ""
self.appid: str = ""
self.frontid: int = 0
self.sessionid: int = 0
self.order_data: List[dict] = []
self.trade_data: List[dict] = []
self.positions: Dict[str, PositionData] = {}
self.sysid_orderid_map: Dict[str, str] = {}
def onFrontConnected(self) -> None:
"""服务器连接成功回报"""
print("交易服务器连接成功")
if self.auth_code:
self.authenticate()
else:
self.login()
def onFrontDisconnected(self, reason: int) -> None:
"""服务器连接断开回报"""
self.login_status = False
print(f"交易服务器连接断开,原因{reason}")
def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""用户授权验证回报"""
if not error['ErrorID']:
self.auth_status = True
print("交易服务器授权验证成功")
self.login()
else:
self.auth_failed = True
print("交易服务器授权验证失败", error)
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""用户登录请求回报"""
if not error["ErrorID"]:
self.frontid = data["FrontID"]
self.sessionid = data["SessionID"]
self.login_status = True
print("交易服务器登录成功")
# 自动确认结算单
ctp_req: dict = {
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
self.reqSettlementInfoConfirm(ctp_req, self.reqid)
else:
self.login_failed = True
print("交易服务器登录失败", error)
def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""委托下单失败回报"""
order_ref: str = data["OrderRef"]
orderid: str = f"{self.frontid}_{self.sessionid}_{order_ref}"
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
order: OrderData = OrderData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT.get(data["CombOffsetFlag"], Offset.NONE),
price=data["LimitPrice"],
volume=data["VolumeTotalOriginal"],
status=Status.REJECTED,
gateway_name=self.gateway_name
)
# self.gateway.on_order(order)
print("交易委托失败", error)
def onRspOrderAction(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""委托撤单失败回报"""
print("交易撤单失败", error)
def onRspSettlementInfoConfirm(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""确认结算单回报"""
print("结算信息确认成功")
# 由于流控,单次查询可能失败,通过while循环持续尝试,直到成功发出请求
while True:
self.reqid += 1
n: int = self.reqQryInstrument({}, self.reqid)
if not n:
break
else:
sleep(1)
def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""持仓查询回报"""
if not data:
return
# 必须已经收到了合约信息后才能处理
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map.get(symbol, None)
if contract:
# 获取之前缓存的持仓数据缓存
key: str = f"{data['InstrumentID'], data['PosiDirection']}"
position: PositionData = self.positions.get(key, None)
if not position:
position = PositionData(
symbol=data["InstrumentID"],
exchange=contract.exchange,
direction=DIRECTION_CTP2VT[data["PosiDirection"]],
gateway_name=self.gateway_name
)
self.positions[key] = position
# 对于上期所昨仓需要特殊处理
if position.exchange in {Exchange.SHFE, Exchange.INE}:
if data["YdPosition"] and not data["TodayPosition"]:
position.yd_volume = data["Position"]
# 对于其他交易所昨仓的计算
else:
position.yd_volume = data["Position"] - data["TodayPosition"]
# 获取合约的乘数信息
size: int = contract.size
# 计算之前已有仓位的持仓总成本
cost: float = position.price * position.volume * size
# 累加更新持仓数量和盈亏
position.volume += data["Position"]
position.pnl += data["PositionProfit"]
# 计算更新后的持仓总成本和均价
if position.volume and size:
cost += data["PositionCost"]
position.price = cost / (position.volume * size)
# 更新仓位冻结数量
if position.direction == Direction.LONG:
position.frozen += data["ShortFrozen"]
else:
position.frozen += data["LongFrozen"]
if last:
for position in self.positions.values():
self.gateway.on_position(position)
self.positions.clear()
def onRspQryTradingAccount(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""资金查询回报"""
if "AccountID" not in data:
return
account: AccountData = AccountData(
accountid=data["AccountID"],
balance=data["Balance"],
frozen=data["FrozenMargin"] + data["FrozenCash"] + data["FrozenCommission"],
gateway_name=self.gateway_name
)
account.available = data["Available"]
# self.gateway.on_account(account)
def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
"""合约查询回报"""
product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
if product:
# 这边测试了一下从底层ctp api接口中获取到的期货合约信息格式
# 可以看到这边得到的合约信息是齐全的,也就是说如果需要的话后续可以对
# 合约信息格式做修改添加
# if product != Product.OPTION:
# print(data)
contract: ContractData = ContractData(
symbol=data["InstrumentID"],
exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
name=data["InstrumentName"],
product=product,
size=data["VolumeMultiple"],
pricetick=data["PriceTick"],
gateway_name=self.gateway_name
)
# 期权相关
if contract.product == Product.OPTION:
# 移除郑商所期权产品名称带有的C/P后缀
if contract.exchange == Exchange.CZCE:
contract.option_portfolio = data["ProductID"][:-1]
else:
contract.option_portfolio = data["ProductID"]
contract.option_underlying = data["UnderlyingInstrID"]
contract.option_type = OPTIONTYPE_CTP2VT.get(data["OptionsType"], None)
contract.option_strike = data["StrikePrice"]
contract.option_index = str(data["StrikePrice"])
contract.option_listed = datetime.strptime(data["OpenDate"], "%Y%m%d")
contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")
# self.gateway.on_contract(contract)
symbol_contract_map[contract.symbol] = contract
self.gateway.total_contracts.append(contract)
if last:
self.contract_inited = True
print("合约信息查询成功")
for data in self.order_data:
self.onRtnOrder(data)
self.order_data.clear()
for data in self.trade_data:
self.onRtnTrade(data)
self.trade_data.clear()
def onRtnOrder(self, data: dict) -> None:
"""委托更新推送"""
if not self.contract_inited:
self.order_data.append(data)
return
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
frontid: int = data["FrontID"]
sessionid: int = data["SessionID"]
order_ref: str = data["OrderRef"]
orderid: str = f"{frontid}_{sessionid}_{order_ref}"
timestamp: str = f"{data['InsertDate']} {data['InsertTime']}"
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
dt: datetime = CHINA_TZ.localize(dt)
tp: tuple = (data["OrderPriceType"], data["TimeCondition"], data["VolumeCondition"])
order: OrderData = OrderData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
type=ORDERTYPE_CTP2VT[tp],
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT[data["CombOffsetFlag"]],
price=data["LimitPrice"],
volume=data["VolumeTotalOriginal"],
traded=data["VolumeTraded"],
status=STATUS_CTP2VT[data["OrderStatus"]],
datetime=dt,
gateway_name=self.gateway_name
)
# self.gateway.on_order(order)
self.sysid_orderid_map[data["OrderSysID"]] = orderid
def onRtnTrade(self, data: dict) -> None:
"""成交数据推送"""
if not self.contract_inited:
self.trade_data.append(data)
return
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
orderid: str = self.sysid_orderid_map[data["OrderSysID"]]
timestamp: str = f"{data['TradeDate']} {data['TradeTime']}"
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
dt: datetime = CHINA_TZ.localize(dt)
trade: TradeData = TradeData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
tradeid=data["TradeID"],
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT[data["OffsetFlag"]],
price=data["Price"],
volume=data["Volume"],
datetime=dt,
gateway_name=self.gateway_name
)
# self.gateway.on_trade(trade)
def connect(
self,
address: str,
userid: str,
password: str,
brokerid: str,
auth_code: str,
appid: str
) -> None:
"""连接服务器"""
self.userid = userid
self.password = password
self.brokerid = brokerid
self.auth_code = auth_code
self.appid = appid
if not self.connect_status:
path: Path = get_folder_path(self.gateway_name.lower())
self.createFtdcTraderApi((str(path) + "\\Td").encode("GBK"))
self.subscribePrivateTopic(0)
self.subscribePublicTopic(0)
self.registerFront(address)
self.init()
self.connect_status = True
else:
self.authenticate()
def authenticate(self) -> None:
"""发起授权验证"""
if self.auth_failed:
return
ctp_req: dict = {
"UserID": self.userid,
"BrokerID": self.brokerid,
"AuthCode": self.auth_code,
"AppID": self.appid
}
self.reqid += 1
self.reqAuthenticate(ctp_req, self.reqid)
def login(self) -> None:
"""用户登录"""
if self.login_failed:
return
ctp_req: dict = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid,
"AppID": self.appid
}
self.reqid += 1
self.reqUserLogin(ctp_req, self.reqid)
def send_order(self, req: OrderRequest) -> str:
"""委托下单"""
if req.offset not in OFFSET_VT2CTP:
print("请选择开平方向")
return ""
if req.type not in ORDERTYPE_VT2CTP:
print(f"当前接口不支持该类型的委托{req.type.value}")
return ""
self.order_ref += 1
tp: tuple = ORDERTYPE_VT2CTP[req.type]
price_type, time_condition, volume_condition = tp
ctp_req: dict = {
"InstrumentID": req.symbol,
"ExchangeID": req.exchange.value,
"LimitPrice": req.price,
"VolumeTotalOriginal": int(req.volume),
"OrderPriceType": price_type,
"Direction": DIRECTION_VT2CTP.get(req.direction, ""),
"CombOffsetFlag": OFFSET_VT2CTP.get(req.offset, ""),
"OrderRef": str(self.order_ref),
"InvestorID": self.userid,
"UserID": self.userid,
"BrokerID": self.brokerid,
"CombHedgeFlag": THOST_FTDC_HF_Speculation,
"ContingentCondition": THOST_FTDC_CC_Immediately,
"ForceCloseReason": THOST_FTDC_FCC_NotForceClose,
"IsAutoSuspend": 0,
"TimeCondition": time_condition,
"VolumeCondition": volume_condition,
"MinVolume": 1
}
self.reqid += 1
n: int = self.reqOrderInsert(ctp_req, self.reqid)
if n:
print(f"委托请求发送失败,错误代码:{n}")
return ""
orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
order: OrderData = req.create_order_data(orderid, self.gateway_name)
# self.gateway.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest) -> None:
"""委托撤单"""
frontid, sessionid, order_ref = req.orderid.split("_")
ctp_req: dict = {
"InstrumentID": req.symbol,
"ExchangeID": req.exchange.value,
"OrderRef": order_ref,
"FrontID": int(frontid),
"SessionID": int(sessionid),
"ActionFlag": THOST_FTDC_AF_Delete,
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
self.reqOrderAction(ctp_req, self.reqid)
def query_account(self) -> None:
"""查询资金"""
self.reqid += 1
self.reqQryTradingAccount({}, self.reqid)
def query_position(self) -> None:
"""查询持仓"""
if not symbol_contract_map:
return
ctp_req: dict = {
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
self.reqQryInvestorPosition(ctp_req, self.reqid)
def close(self) -> None:
"""关闭连接"""
if self.connect_status:
self.exit()
def adjust_price(price: float) -> float:
"""将异常的浮点数最大值(MAX_FLOAT)数据调整为0"""
if price == MAX_FLOAT:
price = 0
return price