VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 5
声望: 0

您好,本人是CTP新人,尝试通过vnpy的接口去发单和接收回报。现在能够正常发单,但是关于接收到的回报有很多不懂得地方,希望大家能够帮一帮:

  1. 发单后,需要等待回报,请问怎么从engine或者gateway那边拿到回报?目前只能看到terminal上面打印的log回报
  2. 请问vnpy里面是否有类似EventListener的东西,用于接收返回的消息?如果有请问在哪里查看
  3. 请大神们分享一些文档或者连接,让我学习一下

谢谢各位

Super Moderator
avatar
加入于:
帖子: 107
声望: 12

我来帮你了解 VeighNa 中如何接收交易回报和事件处理机制。

根据你的问题,我来详细解答 VeighNa 中如何接收交易回报和事件处理机制。

VeighNa 的事件回调机制

VeighNa 采用的是回调函数(Callback Function)机制,而不是传统的 EventListener 模式。当你发送委托后,系统会自动调用相应的回调函数来通知你订单状态的变化。

主要的回调函数

在 CTA 策略中,有以下几个关键的回调函数:

1. on_order - 委托状态更新

def on_order(self, order: OrderData):
    """
    收到委托回报时被调用
    """
    self.write_log(f"委托更新: {order.vt_orderid}, 状态: {order.status}")
    # 你可以在这里处理各种委托状态
    if order.status == Status.ALLTRADED:
        self.write_log("委托已全部成交")
    elif order.status == Status.CANCELLED:
        self.write_log("委托已撤销")

2. on_trade - 成交回报

def on_trade(self, trade: TradeData):
    """
    收到成交回报时被调用
    """
    self.write_log(f"成交: {trade.vt_orderid}, 价格: {trade.price}, 数量: {trade.volume}")
    # 处理具体的成交逻辑

3. on_stop_order - 停止单回报

def on_stop_order(self, stop_order: StopOrder):
    """
    收到停止单回报时被调用
    """
    self.write_log(f"停止单: {stop_order.stop_orderid}, 状态: {stop_order.status}")

完整的策略示例

from vnpy_ctastrategy import (
    CtaTemplate,
    StopOrder,
    TickData,
    BarData,
    TradeData,
    OrderData,
    BarGenerator,
    ArrayManager,
)
from vnpy.trader.constant import Status, Direction, Offset

class MyStrategy(CtaTemplate):
    """"""
    author = "Your Name"

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        super().__init__(cta_engine, strategy_name, vt_symbol, setting)
        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()

        # 用于跟踪活动订单
        self.active_orderids = set()

    def on_init(self):
        """策略初始化"""
        self.write_log("策略初始化")
        self.load_bar(10)

    def on_start(self):
        """策略启动"""
        self.write_log("策略启动")

    def on_stop(self):
        """策略停止"""
        self.write_log("策略停止")

    def on_tick(self, tick: TickData):
        """行情更新"""
        self.bg.update_tick(tick)

    def on_bar(self, bar: BarData):
        """K线更新"""
        # 这里可以发出交易信号
        if self.pos == 0:
            # 买入信号
            orderid = self.buy(bar.close_price, 1)
            if orderid:
                self.active_orderids.update(orderid)

    def on_order(self, order: OrderData):
        """委托状态更新"""
        self.write_log(
            f"委托更新 - ID: {order.vt_orderid}, "
            f"状态: {order.status}, "
            f"价格: {order.price}, "
            f"数量: {order.volume}, "
            f"已成交: {order.traded}"
        )

        # 如果委托完成(全部成交或已撤销),从活动订单中移除
        if order.status in [Status.ALLTRADED, Status.CANCELLED, Status.REJECTED]:
            if order.vt_orderid in self.active_orderids:
                self.active_orderids.remove(order.vt_orderid)

    def on_trade(self, trade: TradeData):
        """成交回报"""
        self.write_log(
            f"成交 - ID: {trade.vt_orderid}, "
            f"方向: {trade.direction}, "
            f"开平: {trade.offset}, "
            f"价格: {trade.price}, "
            f"数量: {trade.volume}"
        )

    def on_stop_order(self, stop_order: StopOrder):
        """停止单回报"""
        self.write_log(f"停止单: {stop_order.stop_orderid}, 状态: {stop_order.status}")

如何获取回报数据

  1. 不需要主动查询:VeighNa 会自动将回报数据通过回调函数传递给你
  2. 数据结构
    • OrderData: 包含委托的完整信息(状态、价格、数量、已成交数量等)
    • TradeData: 包含成交的详细信息(成交价格、数量、时间等)
  3. 实时性:所有回调都是实时触发的,你不需要轮询或等待

在不同模块中的差异

  • CTA策略模块:支持 on_orderon_tradeon_stop_order 回调
  • 组合策略模块:不支持回调,需要通过 get_order()get_pos() 主动查询
  • 价差交易模块:支持 on_orderon_tradeon_spread_pos 等回调

调试技巧

如果你只能看到 terminal 的日志但想在代码中处理数据,可以:

  1. 在回调函数中添加日志

    def on_order(self, order: OrderData):
     self.write_log(f"收到委托回报: {order.__dict__}")
    
  2. 保存到策略变量
    ```python
    def init(self, ...):
    self.last_order = None
    self.trades = []

def on_order(self, order: OrderData):
self.last_order = order

def on_trade(self, trade: TradeData):
self.trades.append(trade)


3. **使用 `put_event()` 刷新界面**:
```python
def on_order(self, order: OrderData):
    # 处理逻辑
    self.put_event()  # 通知界面刷新

这样你就可以在策略中完整地接收和处理所有交易回报,而不仅仅是看到日志。回调函数就是 VeighNa 的"EventListener"机制。

Member
avatar
加入于:
帖子: 5
声望: 0

谢谢你的回复,对我帮助非常的大。我这边使用了下面的代码去获取order的更新,但是遇到拒单的时候,没有办法获得拒单的报错代码,比如:“CTP | 委托 3_xxxxxx_1 状态更新,26:已撤单报单被拒绝SHFE:当前状态禁止此项操作”,想要获取这个报错代码26,请问该怎么做。我目前是使用下面的办法去获得order最新消息,代码如下

from vnpy_scripttrader import init_cli_trading
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.constant import Exchange, Product, OrderType, Direction, Offset, Status
from vnpy.trader.event import EVENT_ORDER
from vnpy.trader.object import (AccountData, OrderData, TradeData, PositionData, 
                               TickData, BarData, ContractData, LogData, 
                               SubscribeRequest, OrderRequest, CancelRequest)
from vnpy_ctp import CtpGateway

class CTPAPI:
    def __init__(self):
        self.gateway_name = 'CTP'
        # 使用init_cli_trading初始化交易引擎,传递CTP网关类
        self.main_engine = init_cli_trading([CtpGateway])
        self.config = self.load_config()
        self.connected = False

        # 订单回调管理
        self.orders = {}
        # 注册订单事件监听器
        self._register_event_listeners()

    def load_config(self):
        config_path = os.path.join(dirname(dirname(dirname(__file__))),  'etc', 'config.json')
        with open(config_path, 'r', encoding='utf-8') as f:
            return json.load(f)['ctp']

    def _register_event_listeners(self):
        """注册事件监听器"""
        # 获取事件引擎并注册订单事件监听器
        event_engine = self.main_engine.event_engine
        event_engine.register(EVENT_ORDER, self._on_order)

    def _on_order(self, event):
        """处理订单事件"""
        order = event.data
        self.orders[order.vt_orderid] = order
        print(f"[{self.gateway_name}] 订单回报:")
        print(f"  订单号: {order.vt_orderid}")
        print(f"  合约: {order.vt_symbol}")
        print(f"  方向: {order.direction.value}")
        print(f"  状态: {order.status.value}")
        print(f"  价格: {order.price}")
        print(f"  数量: {order.volume}")
        print(f"  成交: {order.traded}")
        print(f"  引用: {order.reference}")
        print(f"  开平: {order.offset.value}")

目前只能收到order被撤销,接收不到order被拒的信息。如何修改代码才可以获得reject error code? 谢谢

Super Moderator
avatar
加入于:
帖子: 107
声望: 12

我来帮你解决如何获取拒单错误代码的问题。

首先让我查询一下 VeighNa 中关于错误信息处理的相关文档和代码结构。

现在让我查看一下 VeighNa 的数据结构定义,特别是 OrderData 和错误信息相关的字段。

让我查看 object.py 文件,这里定义了 OrderData 和其他数据结构。

现在让我查看 CTP 网关的具体实现,看看错误信息是如何处理的。

让我查看 CTP 网关的实现代码,特别是错误处理部分。

现在让我查看一下事件引擎和日志相关的代码,看看错误信息是如何传递的。

现在让我查看事件引擎的代码,了解 LogData 的结构。

让我查看一下事件常量的定义。

让我使用正确的路径来查看这些文件。

现在让我查看一下日志事件的处理。根据我分析的代码,我发现问题的关键点:

  1. 拒单错误信息的传递方式:在 CTP 网关中,当委托被拒绝时,会通过 onRspOrderInsert 方法处理,这个方法会创建一个 Status.REJECTED 状态的 OrderData 对象,并调用 gateway.on_order(order)

  2. 错误代码的获取:错误信息实际上是通过 error 参数传递的,但在 onRspOrderInsert 中,错误信息只被用于日志输出(self.gateway.write_error("交易委托失败", error)),而没有被存储到 OrderData 对象中。

  3. 解决方案:VeighNa 的 OrderData 类中没有直接存储错误代码的字段,但错误信息会被记录在日志中,并通过 EVENT_LOG 事件传递。

让我为你提供完整的解决方案:

问题分析

在你的代码中,你只能接收到 OrderData 对象,但 OrderData 中没有包含错误代码字段。错误代码实际上是在 CTP 网关的 onRspOrderInsert 方法中通过 error 参数接收的,然后被记录到日志中。

解决方案

方案1:监听日志事件(推荐)

你可以同时监听 EVENT_LOG 事件来获取错误信息:

from vnpy_scripttrader import init_cli_trading
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.constant import Exchange, Product, OrderType, Direction, Offset, Status
from vnpy.trader.event import EVENT_ORDER, EVENT_LOG
from vnpy.trader.object import (AccountData, OrderData, TradeData, PositionData, 
                               TickData, BarData, ContractData, LogData, 
                               SubscribeRequest, OrderRequest, CancelRequest)
from vnpy_ctp import CtpGateway
import os
import json
from os.path import dirname

class CTPAPI:
    def __init__(self):
        self.gateway_name = 'CTP'
        self.main_engine = init_cli_trading([CtpGateway])
        self.config = self.load_config()
        self.connected = False

        # 订单回调管理
        self.orders = {}
        # 错误信息存储
        self.error_logs = []

        # 注册事件监听器
        self._register_event_listeners()

    def load_config(self):
        config_path = os.path.join(dirname(dirname(dirname(__file__))), 'etc', 'config.json')
        with open(config_path, 'r', encoding='utf-8') as f:
            return json.load(f)['ctp']

    def _register_event_listeners(self):
        """注册事件监听器"""
        event_engine = self.main_engine.event_engine
        event_engine.register(EVENT_ORDER, self._on_order)
        event_engine.register(EVENT_LOG, self._on_log)  # 新增日志监听

    def _on_order(self, event):
        """处理订单事件"""
        order = event.data
        self.orders[order.vt_orderid] = order

        print(f"[{self.gateway_name}] 订单回报:")
        print(f"  订单号: {order.vt_orderid}")
        print(f"  合约: {order.vt_symbol}")
        print(f"  方向: {order.direction.value}")
        print(f"  状态: {order.status.value}")
        print(f"  价格: {order.price}")
        print(f"  数量: {order.volume}")
        print(f"  成交: {order.traded}")
        print(f"  引用: {order.reference}")
        print(f"  开平: {order.offset.value}")

        # 检查是否为拒单
        if order.status == Status.REJECTED:
            print(f"  *** 订单被拒绝 ***")
            # 这里可以关联最近的日志错误信息
            self._check_recent_errors(order.vt_orderid)

    def _on_log(self, event):
        """处理日志事件"""
        log_data = event.data
        if isinstance(log_data, LogData):
            log_msg = log_data.msg
            self.error_logs.append({
                'time': log_data.time,
                'msg': log_msg
            })

            # 检查是否包含错误信息
            if "交易委托失败" in log_msg or "代码:" in log_msg:
                print(f"[错误日志] {log_msg}")

                # 提取错误代码
                if "代码:" in log_msg:
                    try:
                        # 从日志中提取错误代码
                        # 格式: "交易委托失败,代码:26,信息:CTP:..."
                        parts = log_msg.split(",代码:")
                        if len(parts) > 1:
                            error_code_part = parts[1]
                            error_code = error_code_part.split(",")[0]
                            print(f"  *** 提取到错误代码: {error_code} ***")
                    except Exception as e:
                        print(f"解析错误代码失败: {e}")

    def _check_recent_errors(self, orderid):
        """检查最近的错误日志,关联到订单"""
        if self.error_logs:
            # 获取最近的错误日志
            recent_error = self.error_logs[-1]
            print(f"  关联的错误信息: {recent_error['msg']}")

    def send_order(self, symbol, exchange, direction, offset, price, volume, order_type=OrderType.LIMIT):
        """发送订单"""
        req = OrderRequest(
            symbol=symbol,
            exchange=exchange,
            direction=direction,
            offset=offset,
            price=price,
            volume=volume,
            type=order_type
        )
        return self.main_engine.send_order(req, self.gateway_name)

方案2:修改 OrderData 类(高级方案)

如果你需要更直接的错误代码访问,可以考虑扩展 OrderData 类,但这需要修改底层代码。

方案3:使用策略模板的错误处理

如果你使用 CTA 策略模板,可以在 on_order 中结合日志来处理:

def on_order(self, order: OrderData):
    """委托状态更新"""
    if order.status == Status.REJECTED:
        self.write_log(f"订单被拒绝: {order.vt_orderid}")
        # 错误代码会自动在日志中显示,因为 CTP 网关会自动记录
        # 你可以在终端或日志文件中看到完整的错误信息

关键要点

  1. 错误代码来源:CTP 网关在 onRspOrderInsert 中接收到的 error 字典包含 ErrorIDErrorMsg 字段。

  2. 日志记录:错误信息通过 gateway.write_error() 方法被记录为日志事件。

  3. 事件监听:通过监听 EVENT_LOG 事件,你可以捕获所有的错误日志,包括错误代码。

  4. 错误代码26:在你的例子中,错误代码26对应的是"当前状态禁止此项操作",这是 CTP 柜台返回的标准错误。

使用建议

  • 推荐方案1:同时监听订单事件和日志事件,这样既能获取订单状态,又能获取详细的错误信息。
  • 错误关联:由于订单拒绝和错误日志在时间上是紧密关联的,你可以通过时间戳来关联它们。
  • 日志解析:从日志消息中提取错误代码,格式通常是"交易委托失败,代码:{error_id},信息:{error_msg}"。

这样你就可以完整地获取到拒单的错误代码26以及相关的错误信息了。
[警告: 达到最大工具调用次数限制]

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】