CtpTdApi 未提供数据,需要修改onRspQryInvestorPosition接口实现
实现方案:
- 新增CtpTdApiVNPY类,继承CtpTdApi, 重载onRspQryInvestorPosition,在onRspQryInvestorPosition中计算开仓均价和逐笔浮盈
- 新增CtpVNPY类,继承CtpGateway,重载init接口,使用CtpTdApiVNPY初始化td_api
- 修改程序初始化,使用CtpVNPY类初始化。
具体代码:
新增CtpTdApiVNPY类
class CtpTdApiVNPY(CtpTdApi): def __init__(self, gateway: CtpGateway) -> None: """构造函数""" super().__init__(gateway) def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool) -> None: """持仓查询回报""" if not data: return # 必须已经收到了合约信息后才能处理 symbol: str = data["InstrumentID"] contract = symbol_contract_map.get(symbol, None) if contract: # 获取之前缓存的持仓数据缓存 key: str = f"{data['InstrumentID'], data['PosiDirection']}" position = self.positions.get(key, None) if not position: position = PositionData( symbol=data["InstrumentID"], exchange=contract.exchange, direction=DIRECTION_CTP2VT[data["PosiDirection"]], gateway_name=self.gateway_name ) self.positions[key] = position # 对于上期所昨仓需要特殊处理 if position.exchange in {Exchange.SHFE, Exchange.INE}: if data["YdPosition"] and not data["TodayPosition"]: position.yd_volume = data["Position"] # 对于其他交易所昨仓的计算 else: position.yd_volume = data["Position"] - data["TodayPosition"] # 获取合约的乘数信息 size: int = contract.size # 计算之前已有仓位的持仓总成本 cost: float = position.price * position.volume * size position.opencost = data["OpenCost"] position.openprice = 0 # 累加更新持仓数量和盈亏 position.volume += data["Position"] position.pnl += data["PositionProfit"] # 计算更新后的持仓总成本和均价 if position.volume and size: cost += data["PositionCost"] position.price = cost / (position.volume * size) # 计算开仓均价 position.openprice = position.opencost / (position.volume * size) # 更新仓位冻结数量 if position.direction == Direction.LONG: position.frozen += data["ShortFrozen"] else: position.frozen += data["LongFrozen"] # 计算浮盈 position.settlementPrice = data["SettlementPrice"] floatprofit = position.settlementPrice * position.volume * size - position.opencost if position.direction == Direction.LONG: position.floatProfit = floatprofit else: position.floatProfit = -floatprofit if last: for position in self.positions.values(): self.gateway.on_position(position) self.positions.clear()
新增CtpVNPY类
class CtpVNPY(CtpGateway): def __init__(self, event_engine: EventEngine, gateway_name: str) -> None: super().__init__(event_engine, gateway_name) self.td_api: "CtpTdApi" = CtpTdApiVNPY(self)
- 修改程序初始化
init_cli_trading([CtpVNPY])