VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 2
声望: 0

当前的算法交易部分提供的几个twap,最优限价等等交易方法看起来都需要手动执行。请问如果我希望价差策略给出开仓信号以后,我的挂单委托自动使用算法交易部分提供的算法该如何执行。

Member
avatar
加入于:
帖子: 2
声望: 0

或者说,我是不是应该通过修改vnpy_spreadtrading这个文件夹下的algo.py里的_send_payup_order(self, vt_symbol: str, leg_volume: float) -> None:这个函数完成我的需求

Super Moderator
avatar
加入于:
帖子: 128
声望: 14

我来帮你分析这个问题。这涉及到 vnpy_spreadtrading(价差交易)和 vnpy_algotrading(算法交易)两个模块的协作。我先查询一下相关文档和源码。

让我继续查看 algo.py 的源码以及算法交易模块的结构:

让我继续查看算法交易引擎和价差策略模板的源码:

根据源码分析,我来为你详细解答这个问题。

核心结论

你的理解方向是对的,但具体实现需要调整。

vnpy_spreadtradingvnpy_algotrading 是两个独立的算法体系

模块 算法类型 处理对象 默认算法
vnpy_spreadtrading 价差执行算法 价差合约(多腿对冲) SpreadTakerAlgo(主动对价)
vnpy_algotrading 单合约执行算法 单一合约 TWAP、BestLimit、Iceberg 等

关键问题vnpy_algotrading 的 TWAP、BestLimit 等算法是针对单一合约设计的,而价差交易需要同时处理多条腿的对冲


正确的实现方案

方案一:自定义价差算法类(推荐)

继承 SpreadTakerAlgo,重写 send_leg_order 方法,在发送单腿委托时使用 AlgoEngine 启动 TWAP/BestLimit 算法。

# 文件:custom_spread_algo.py
from vnpy_spreadtrading.algo import SpreadTakerAlgo
from vnpy_spreadtrading.base import LegData
from vnpy.trader.object import TickData, ContractData
from vnpy.trader.constant import Direction

class SpreadTwapAlgo(SpreadTakerAlgo):
    """使用TWAP算法执行价差的各条腿"""
    algo_name: str = "SpreadTwap"

    def __init__(
        self,
        algo_engine: "SpreadAlgoEngine",
        algoid: str,
        spread: "SpreadData",
        direction: Direction,
        price: float,
        volume: float,
        payup: int,
        interval: int,
        lock: bool,
        extra: dict
    ) -> None:
        """"""
        super().__init__(
            algo_engine, algoid, spread, direction,
            price, volume, payup, interval, lock, extra
        )

        # 从extra获取TWAP参数
        self.algo_setting: dict = extra.get("algo_setting", {})
        self.algo_template: str = extra.get("algo_template", "TwapAlgo")

        # 存储已启动的算法交易实例
        self.leg_algos: dict[str, str] = {}  # vt_symbol -> algo_name

    def send_leg_order(
        self,
        vt_symbol: str,
        price: float,
        volume: float,
        direction: Direction,
        fak: bool = False
    ) -> None:
        """"""
        # 获取算法交易引擎
        main_engine = self.algo_engine.main_engine
        algo_engine = main_engine.get_engine("AlgoTrading")

        if not algo_engine:
            # 如果算法交易引擎未启动,回退到默认行为
            super().send_leg_order(vt_symbol, price, volume, direction, fak)
            return

        leg: LegData = self.spread.legs[vt_symbol]
        volume = round(volume / leg.min_volume) * leg.min_volume
        if not volume:
            return

        # 计算委托价格
        tick: TickData | None = self.get_tick(vt_symbol)
        leg_contract: ContractData | None = self.get_contract(vt_symbol)

        if tick and leg_contract:
            if direction == Direction.LONG:
                price = tick.ask_price_1 + leg_contract.pricetick * self.payup
            else:
                price = tick.bid_price_1 - leg_contract.pricetick * self.payup

        # 检查涨跌停
        if tick:
            if direction == Direction.LONG and tick.limit_up:
                price = min(price, tick.limit_up)
            elif direction == Direction.SHORT and tick.limit_down:
                price = max(price, tick.limit_down)

        # 启动TWAP/BestLimit算法
        from vnpy.trader.constant import Offset

        offset: Offset = Offset.OPEN  # 根据实际需求调整开平

        algo_name: str = algo_engine.start_algo(
            template_name=self.algo_template,
            vt_symbol=vt_symbol,
            direction=direction,
            offset=offset,
            price=price,
            volume=int(abs(volume)),
            setting=self.algo_setting
        )

        if algo_name:
            self.leg_algos[vt_symbol] = algo_name
            self.write_log(f"启动{self.algo_template}算法: {algo_name} for {vt_symbol}")

            # 监听算法完成事件(需要额外实现)
            # 这里简化处理,实际应该通过事件机制监听算法完成

方案二:在策略层直接调用算法引擎

在策略的 on_spread_data 中,不调用 start_long_algo,而是直接调用 AlgoEngine 为各条腿启动算法:

# 文件:custom_strategy.py
from vnpy_spreadtrading.template import SpreadStrategyTemplate
from vnpy.trader.constant import Direction

class CustomStrategy(SpreadStrategyTemplate):
    """"""
    author = "Your Name"

    parameters = ["target_pos", "payup", "interval"]

    target_pos: int = 0
    payup: int = 0
    interval: int = 5

    def __init__(self, strategy_engine, strategy_name, spread, setting):
        """"""
        super().__init__(strategy_engine, strategy_name, spread, setting)

        self.leg_algos: dict[str, str] = {}  # 记录各腿算法实例

    def on_spread_data(self):
        """"""
        # 获取算法交易引擎
        algo_engine = self.strategy_engine.main_engine.get_engine("AlgoTrading")
        if not algo_engine:
            self.write_log("算法交易引擎未启动")
            return

        # 计算目标价差仓位
        current_pos = self.get_spread_pos()

        if current_pos < self.target_pos:
            # 需要开多价差
            self.execute_spread_with_algo(
                Direction.LONG, 
                self.target_pos - current_pos,
                algo_engine
            )
        elif current_pos > self.target_pos:
            # 需要开空价差
            self.execute_spread_with_algo(
                Direction.SHORT,
                current_pos - self.target_pos,
                algo_engine
            )

    def execute_spread_with_algo(
        self,
        direction: Direction,
        volume: float,
        algo_engine
    ) -> None:
        """使用算法交易执行价差"""
        from vnpy.trader.constant import Offset

        # 获取主动腿
        active_leg = self.spread.active_leg

        # 计算主动腿委托量
        active_volume = self.spread.calculate_leg_volume(
            active_leg.vt_symbol,
            volume if direction == Direction.LONG else -volume
        )

        # 为主动腿启动TWAP算法
        tick = self.get_leg_tick(active_leg.vt_symbol)
        if not tick:
            return

        price = tick.ask_price_1 if direction == Direction.LONG else tick.bid_price_1

        algo_name = algo_engine.start_algo(
            template_name="TwapAlgo",
            vt_symbol=active_leg.vt_symbol,
            direction=direction,
            offset=Offset.OPEN,
            price=price,
            volume=int(abs(active_volume)),
            setting={"time_interval": self.interval, "volume_percent": 0.1}
        )

        if algo_name:
            self.leg_algos[active_leg.vt_symbol] = algo_name
            self.write_log(f"为主动腿{active_leg.vt_symbol}启动TWAP算法: {algo_name}")

        # 注意:被动腿的对冲需要在主动腿成交后处理
        # 这需要监听算法交易的成交事件,实现较为复杂

关键注意事项

1. 关于 _send_payup_order 方法

你提到的 _send_payup_order 方法不存在SpreadTakerAlgo 类中。实际的方法是:

  • send_leg_order(self, vt_symbol, price, volume, direction, fak):发送单腿委托
  • take_active_leg(self):执行主动腿
  • hedge_passive_legs(self):对冲被动腿

2. 价差算法的特殊性

价差交易的核心逻辑是:

  1. 主动腿先成交 → 产生单边敞口
  2. 立即对冲被动腿 → 恢复价差头寸

如果直接使用 vnpy_algotrading 的算法,需要确保:

  • 主动腿算法完成后,再启动被动腿算法
  • 处理主动腿部分成交的情况
  • 监控整体价差头寸,避免过度对冲

3. 推荐做法

方案一(自定义算法类) 是更优雅的实现方式,因为:

  • 保持价差交易的核心逻辑(主动腿→被动腿)
  • 只在委托执行层替换为TWAP/BestLimit
  • 可以利用 SpreadAlgoEngine 的事件机制

4. 需要修改的配置

vnpy_spreadtrading/engine.py 中,修改 SpreadAlgoEngine 的算法类:

class SpreadAlgoEngine:
    """"""
    # 修改这里,使用自定义算法类
    algo_class: type[SpreadTakerAlgo] = SpreadTwapAlgo

或者在策略中通过 extra 参数传递算法配置,在自定义算法中读取。


总结

你的需求不能简单地通过修改 _send_payup_order 实现,因为该方法不存在。正确的做法是:

  1. 理解架构vnpy_spreadtrading 的算法是价差级别的,vnpy_algotrading 的算法是合约级别的
  2. 选择方案:推荐方案一(自定义 SpreadTakerAlgo 子类)
  3. 处理对冲:确保主动腿成交后再对冲被动腿,这是价差交易的核心
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】