vn.py官网
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 307
声望: 90

1. 交易线的代码

把下面的代码保持为vnpy\usertools\trade_line.py:

'''
Author: hxxjava
Date: 2021-06-24 19:57:32
LastEditTime: 2021-07-05 21:23:04
LastEditors: Please set LastEditors
Description: 
'''
from dataclasses import dataclass
from typing import Callable
from vnpy.trader.constant import Direction,Offset
from vnpy.event import EventEngine,Event
from copy import deepcopy

EVENT_PRICE_CHANGE = "ePriceChange."    # 价格变化消息

@dataclass
class PriceData:
    """
    价格变化数据定义
    """    
    sponsor:str     # 发起者
    price_name:str  # 价格名称
    price:float     # 价格


@dataclass
class TradeCommand:
    """
    交易指令数据定义
    """    
    trade_line_name:str
    sponsor:str       # 发起者  
    price_name:str    # 价格名称
    direction:Direction
    offset:Offset
    price:float
    payup:float
    trade_num:int


class TradeLine():
    """
    名称:交易线。按照设定的条件,发出交易指令。相当于预警下单线。
    """

    def __init__(self,event_engine:EventEngine,tc:TradeCommand,condition:str,on_trade_command: Callable,excuted:bool=False):
        self.event_engine = event_engine
        self.tc = tc
        self.condition = condition
        self.price = None
        self.excuted = excuted
        self.on_trade_command = on_trade_command

        self.register_event()

    def register_event(self):
        """ 注册消息 """
        self.event_engine.register(EVENT_PRICE_CHANGE,self.process_pricechange_event)

    def process_pricechange_event(self,event:Event):
        """ """
        pc:PriceData = event.data
        if self.tc.sponsor == pc.sponsor and self.tc.price_name == pc.price_name:
            self.check_price(pc.price)

    def check_price(self,price:float):
        """ """
        if self.excuted:
            self.price = price
            return

        if self.condition == ">":
            # 大于触发价
            if price > self.tc.price:
                self.send_trade_event(price)

        elif self.condition == "<":
            # 大于触发价
            if price < self.tc.price:
                self.send_trade_event(price)

        elif self.condition == ">=":
            # 大于触发价
            if price >= self.tc.price:
                self.send_trade_event(price)

        elif self.condition == "<=":
            # 大于触发价
            if price <= self.tc.price:
                self.send_trade_event(price)        

        elif self.condition == "^":
            # 上穿触发价
            if self.price is not None:
                if self.price <= self.tc.price < price:
                    self.send_trade_event(price)    

        elif self.condition == "v":
            # 下穿触发价
            if self.price is not None:
                if self.price >= self.tc.price > price:
                    self.send_trade_event(price)  

        self.price = price             

    def send_trade_event(self,price:float):
        """ """
        if self.excuted:
            return

        tc = deepcopy(self.tc) # 简单,不模棱两可
        # tc.price=price  # 注释掉,直接用交易线的价格

        self.excuted = True
        # 执行交易指令
        self.on_trade_command(tc)

    def set_emit_price(self,price:float):
        """ 设置交易线触发价格 """
        self.tc.price = price

    def set_excuted(self,excuted:bool):
        """ 设置交易线是否交易过标志 """
        self.excuted = excuted 

    def reset(self):
        """ 复位交易线 """
        self.price = None
        self.excuted = False 

    def __repr__(self) -> str:
        out_str = "TradeLine=({},condition={},on_trade_command={},price={},excuted={})".\
            format(
                self.tc,
                self.condition,
                self.on_trade_command,
                self.price,
                self.excuted
            )
        return out_str

2. 如何使用交易线

下面是一个测试用的价差交易的策略例子,只为可以演示交易线的创建、触发价格更新和最新价格的更新和交易动作的执行,并不表示该策略可以盈利。
把下面代码保存为boll_arbitrage_strategy.py,保存到vnpy_spreadtrading\stategies目录下,就可以创建价差策略来观察trade_line的运作机制。

""" """
from datetime import datetime
from typing import List,Dict,Tuple,Union
from vnpy.event.engine import Event, EventEngine
from os import close, name, write
from typing import Dict
from vnpy.trader.constant import Direction, Offset,InstrumentStatus
from numpy import nan
import talib
from vnpy.trader.utility import BarGenerator, ArrayManager
from vnpy_spreadtrading.base import LegData
from vnpy_spreadtrading import (
    SpreadStrategyTemplate,
    SpreadAlgoTemplate,
    SpreadData,
    OrderData,
    TradeData,
    TickData,
    BarData
)

from vnpy.usertools.trade_line import (
    EVENT_PRICE_CHANGE,
    PriceData,
    TradeCommand,
    TradeLine
)


class BollArbitrageStrategy(SpreadStrategyTemplate):
    """
    利用Keltner通道进行套利的一种价差交易
    """

    author = "hxxjava"

    bar_window = 5      # K线周期
    boll_window = 26    # BOLL参数1
    max_steps = 4       # 最大开仓次数
    min_step = 2         # 最小开仓距离
    step_pos = 1        # 每步开仓数量     
    payup = 10          
    interval = 5

    spread_pos = 0.0      # 当前价差数量
    boll_mid = nan     # BOLL中轨   
    step = nan         # 开仓步长

    parameters = [
        "bar_window",
        "boll_window",
        "max_steps",
        "min_step",
        "step_pos",
        "payup",
        "interval"
    ]

    variables = [
        "boll_mid",
        "step",
        "spread_pos",
    ]

    def __init__(
        self,
        strategy_engine,
        strategy_name: str,
        spread: SpreadData,
        setting: dict
    ):
        """"""
        super().__init__(
            strategy_engine, strategy_name, spread, setting
        )
        self.event_engine:EventEngine = self.strategy_engine.event_engine

        self.bg = BarGenerator(self.on_spread_bar,self.bar_window,self.on_xmin_spread_bar)
        self.am = ArrayManager(size=60)

        self.pre_spread_pos = self.spread_pos

        self.trade_lines:Dict[str,TradeLine] = {}
        self.init_trade_lines()

        self.bars:List[BarData] = []

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")

        self.load_bar(days=10,callback=self.on_spread_bar)
        # self.load_tick(days=1)

    def on_start(self):
        """
        Callback when strategy is started.
        """
        # 开启响应的所有交易线的交易功能
        self.write_log("策略启动")
        self.adjust_trade_lines_emitprice()
        self.set_trade_lines_excuted("all_lines",False)
        self.adjust_trade_lines_excute()

    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")

        self.put_event()

    def on_spread_data(self):
        """
        Callback when spread price is updated.
        """
        tick = self.get_spread_tick()
        self.on_spread_tick(tick)

    def on_spread_tick(self, tick: TickData):
        """
        Callback when new spread tick data is generated.
        """
        self.bg.update_tick(tick)
        if self.trading:
            pc = PriceData(sponsor=self.strategy_name,price_name=self.spread_name,price=tick.last_price)
            self.event_engine.put(Event(EVENT_PRICE_CHANGE,data=pc))

    def on_spread_bar(self,bar:BarData):
        """
        Callback when 1 min spread bar data is generated.
        """
        # print(f"on_spread_bar bar={bar}")
        self.bg.update_bar(bar)

    def on_xmin_spread_bar(self, bar: BarData):
        """
        Callback when x min spread bar data is generated.
        """
        self.bars.append(bar)
        if len(self.bars) > 100:
            self.bars.pop(0)

        self.am.update_bar(bar)
        if not self.am.inited:
            return

        # self.boll_mid = self.am.ma(self.boll_window)
        self.boll_mid = talib.MA(self.am.close,self.boll_window)[-1]

        # BOLL通道
        std = self.am.std(self.boll_window,array=True) 
        temp = talib.MA(std,5)
        self.step = 2*temp[-1]

        self.adjust_trade_lines_emitprice()

        print(f"boll_mid,step=({self.boll_mid,self.step})")

        self.put_event()

    def on_spread_pos(self):
        """
        Callback when spread position is updated.
        """
        self.spread_pos = self.get_spread_pos()

        if self.spread_pos == 0:
            if self.pre_spread_pos>0:
                # 复位做多交易线和空平交易线交易标志
                self.set_trade_lines_excuted("below_lines",False)

            if self.pre_spread_pos<0:
                # 复位做多交易线和空平交易线交易标志
                self.set_trade_lines_excuted("above_lines",False)

        if self.pre_spread_pos == 0:
            if self.spread_pos>0:
                # 当持有多仓时,使能平多仓交易线
                self.set_trade_lines_excuted("above_mid",False)

            if self.spread_pos<0:
                # 当持有空仓时,使能平多仓交易线
                self.set_trade_lines_excuted("below_mid",False)

        self.pre_spread_pos = self.spread_pos
        self.put_event()

    def init_trade_lines(self):
        """
        init all trade lines
        """
        for step in range(self.min_step,self.min_step+self.max_steps):
            line_name = f"trade_line{step}"
            tc = TradeCommand(trade_line_name=line_name,
                    sponsor=self.strategy_name,
                    price_name=self.spread_name,
                    direction=Direction.SHORT,
                    offset=Offset.OPEN,
                    price=0.0,trade_num=self.step_pos,payup=self.payup)
            trade_line = TradeLine(
                    event_engine = self.event_engine,
                    condition=">=",
                    tc = tc,on_trade_command=self.on_trade_command,
                    excuted=True)
            self.trade_lines[line_name] = trade_line

            line_name = f"trade_line{-step}"
            tc = TradeCommand(trade_line_name=line_name,
                    sponsor=self.strategy_name,
                    price_name=self.spread_name,
                    direction=Direction.LONG,
                    offset=Offset.OPEN,
                    price=0.0,trade_num=self.step_pos,payup=self.payup)
            trade_line = TradeLine(
                    event_engine = self.event_engine,
                    condition="<=",
                    tc = tc,on_trade_command=self.on_trade_command,
                    excuted=True)  
            self.trade_lines[line_name] = trade_line    

        line_name = f"below_mid"
        tc = TradeCommand(trade_line_name=line_name,
                sponsor=self.strategy_name,
                price_name=self.spread_name,
                direction=Direction.LONG,
                offset=Offset.CLOSE,
                price=0.0,trade_num=-1,payup=self.payup)
        trade_line = TradeLine(
                event_engine = self.event_engine,
                condition="v", # 中轨之下
                tc = tc,on_trade_command=self.on_trade_command,
                excuted=False if self.spread_pos < 0 else True)  
        self.trade_lines[line_name] = trade_line  

        line_name = f"above_mid"
        tc = TradeCommand(trade_line_name=line_name,
                sponsor=self.strategy_name,
                price_name=self.spread_name,
                direction=Direction.SHORT,
                offset=Offset.CLOSE,
                price=0.0,trade_num=-1,payup=self.payup)
        trade_line = TradeLine(
                event_engine = self.event_engine,
                condition="^",  # 中轨之上
                tc = tc,on_trade_command=self.on_trade_command,
                excuted=False if self.spread_pos > 0 else True)  
        self.trade_lines[line_name] = trade_line  

    def on_trade_command(self,tc:TradeCommand):
        """
        process trade command
        """ 
        if not self.trading:
            return

        # print(f"excute trade command :{self.get_spread_tick()} {tc}")
        if nan in [self.boll_mid,self.step]:
            return    

        if tc.direction == Direction.LONG: 
            volume = tc.trade_num
            if tc.trade_line_name == "below_mid":  
                # 中轨下全部清空仓
                if not self.spread_pos < 0:
                    self.write_log("无空仓可平!")
                    return
                volume = abs(self.spread_pos) 

            if volume == 0:
                self.write_log("做多0$错误!")
                return

            next_vol = abs(self.spread_pos+volume)
            if not next_vol <= self.max_steps*self.step_pos:
                self.write_log("再做多将超过最大仓位")
                return

            algoid = self.start_long_algo(
                        price=tc.price,
                        volume=volume,
                        payup=tc.payup,
                        offset=tc.offset,
                        interval=self.interval)  

            print(f"executed start_long_algo : {algoid}")

        elif tc.direction == Direction.SHORT:
            volume = tc.trade_num
            if tc.trade_line_name == "above_mid": 
                # 中轨下全部清多仓 
                if not self.spread_pos > 0:
                    self.write_log("无空仓可平!")
                    return
                volume = abs(self.spread_pos) 

            if volume == 0:
                self.write_log("做空0$错误!")
                return

            next_vol = abs(self.spread_pos-volume)
            if not next_vol <= self.max_steps*self.step_pos:
                self.write_log("再做空将超过最大仓位")
                return

            algoid = self.start_short_algo(
                        price=tc.price,
                        volume=volume,
                        payup=tc.payup,
                        offset=tc.offset,
                        interval=self.interval)  

            print(f"executed start_short_algo : {algoid}")

    def adjust_trade_lines_emitprice(self):
        """ 调整各个交易线的触发价格 """
        for step in range(self.min_step,self.min_step+self.max_steps):
            # 调整做空交易线的触发价格 
            line_name = f"trade_line{step}"
            trade_line = self.trade_lines.get(line_name,None)
            if trade_line:
                trade_line.set_emit_price(self.boll_mid+step*self.step)
            # 调整做多交易线的触发价格
            line_name = f"trade_line{-step}"
            trade_line = self.trade_lines.get(line_name,None)
            if trade_line:
                trade_line.set_emit_price(self.boll_mid-step*self.step)

        # 调整上中轨线和下中轨线的触发价格
        for line_name in ["above_mid","below_mid"]:
            trade_line = self.trade_lines.get(line_name,None)
            if trade_line:
                trade_line.set_emit_price(self.boll_mid)        

    def set_trade_lines_excuted(self,select:str,excuted:bool):
        """ 
        设置两套多交易线的执行标志 
        select = "all_lines":设置所有交易线的执行标志
                 "above_lines":设置所有做空交易线的执行标志
                 "below_lines":设置所有做多交易线的执行标志
                 "below_mid":设置平空仓交易线的执行标志
                 "above_mid":设置平多仓交易线的执行标志
        excuted = True:已执行;False:未执行
        """
        for step in range(self.min_step,self.min_step+self.max_steps):
            if select in ["above_lines","all_lines"]:
                # 做空交易线
                line_name = f"trade_line{step}"
                trade_line = self.trade_lines.get(line_name,None)
                if trade_line:
                    trade_line.set_excuted(excuted)

            if select in ["below_lines","all_lines"]:
                # 做多交易线
                line_name = f"trade_line{-step}"
                trade_line = self.trade_lines.get(line_name,None)
                if trade_line:
                    trade_line.set_excuted(excuted)

        if select in ["below_mid","all_lines"]:
            # 下中轨线
            trade_line = self.trade_lines.get("below_mid",None)
            if trade_line:
                trade_line.set_excuted(excuted)   

        if select in ["above_mid","all_lines"]:
            # 上中轨线
            trade_line = self.trade_lines.get("above_mid",None)
            if trade_line:
                trade_line.set_excuted(excuted)   

    def adjust_trade_lines_excute(self):
        """ 调整各个交易线的执行标志 """
        if self.spread_pos == 0:
            self.set_trade_lines_excuted("above_lines",False)
            self.set_trade_lines_excuted("below_lines",False)
            self.set_trade_lines_excuted("above_mid",True)
            self.set_trade_lines_excuted("below_mid",True)
        elif self.spread_pos > 0:
            self.set_trade_lines_excuted("above_lines",True)
            self.set_trade_lines_excuted("above_mid",True)
            self.set_trade_lines_excuted("below_mid",False)
        elif self.spread_pos < 0:
            self.set_trade_lines_excuted("below_lines",True)
            self.set_trade_lines_excuted("above_mid",False)
            self.set_trade_lines_excuted("below_mid",True)      

    def on_spread_algo(self, algo: SpreadAlgoTemplate):
        """
        Callback when algo status is updated.
        """
        if not algo.is_active():
            print(f"algoid = {algo.algoid}")

    def on_order(self, order: OrderData):
        """
        Callback when order status is updated.
        """
        print(f"{self.spread_name} {order}")

    def on_trade(self, trade: TradeData):
        """
        Callback when new trade data is received.
        """
        print(f"{self.spread_name} {trade}")

3 交易线的优点

  • vnpy停止单只能在vnpy的CTA策略模块中使用,而交易线可以在几乎所有的vnpy模块的策略中使用;
  • 交易线的触发交易的价格是可变的,它可以让您的指标具备交易功能,这个在上面的例子中已经做了演示;
  • 和vnpy停止单一样,交易线也是本地的条件单,但它只有符合条件才出发交易动作。

慢体会它用法,您也许会有惊喜发现!

Member
avatar
加入于:
帖子: 23
声望: 0

感谢大佬分享!
mark!

Administrator
avatar
加入于:
帖子: 4490
声望: 302

感谢分享!精华送上

Member
avatar
加入于:
帖子: 4
声望: 0

self.event_engine:EventEngine = self.strategy_engine.event_engine
这行在价差回测引擎执行的时候报错,strategy_engine:BacktestingEngine 是没有event_einge的
是不是说有事件信号的,回测引擎都暂时不支持?

Member
avatar
加入于:
帖子: 1
声望: 0

感谢大佬分分享,比stoporder更符合实际使用

Member
avatar
加入于:
帖子: 307
声望: 90

morgan66b5694ca1a94a98 wrote:

self.event_engine:EventEngine = self.strategy_engine.event_engine
这行在价差回测引擎执行的时候报错,strategy_engine:BacktestingEngine 是没有event_einge的
是不是说有事件信号的,回测引擎都暂时不支持?

印象中价差回测是没有现成的测试界面,你是如何编写回测代码的呢?
总之交易线需要和策略引擎使用同一个消息引擎就可以了。
本来合约价格是可以通过订阅获得的,就是考虑到回测时无法订阅价格,
所以改成了由策略通知交易线价格发生了变化的。

    def on_spread_tick(self, tick: TickData):
        """
        Callback when new spread tick data is generated.
        """
        self.bg.update_tick(tick)
        if self.trading:
            pc = PriceData(sponsor=self.strategy_name,price_name=self.spread_name,price=tick.last_price)
            self.event_engine.put(Event(EVENT_PRICE_CHANGE,data=pc))

当然如果你的米筐账号没有tick数据(通常贵多了),那么你也可以把通知价格更新的语句放在on_bar()中,
tick.last_price用bar.close代替,当然这也是没有办法的事情。

Member
avatar
加入于:
帖子: 4
声望: 0

hxxjava wrote:

morgan66b5694ca1a94a98 wrote:

self.event_engine:EventEngine = self.strategy_engine.event_engine
这行在价差回测引擎执行的时候报错,strategy_engine:BacktestingEngine 是没有event_einge的
是不是说有事件信号的,回测引擎都暂时不支持?

印象中价差回测是没有现成的测试界面,你是如何编写回测代码的呢?
总之交易线需要和策略引擎使用同一个消息引擎就可以了。
本来合约价格是可以通过订阅获得的,就是考虑到回测时无法订阅价格,
所以改成了由策略通知交易线价格发生了变化的。

    def on_spread_tick(self, tick: TickData):
        """
        Callback when new spread tick data is generated.
        """
        self.bg.update_tick(tick)
        if self.trading:
            pc = PriceData(sponsor=self.strategy_name,price_name=self.spread_name,price=tick.last_price)
            self.event_engine.put(Event(EVENT_PRICE_CHANGE,data=pc))

当然如果你的米筐账号没有tick数据(通常贵多了),那么你也可以把通知价格更新的语句放在on_bar()中,
tick.last_price用bar.close代替,当然这也是没有办法的事情。

感谢大佬!
我没用界面回测,直接写个vnpy.vnpy_spreadtrading.backtesting.BacktestEngine实例,设置参数跑回测的。
只需要在策略初始化处,加判断若它是回测引擎,则新建一个EventEgine(),并手工给它start() ,最后在run_backtesting轮播bar数据时,要sleep(0.01)一下,毕竟是不同的线程在做,防止交易线处理得慢。单这样做回测起来确实慢很多。

Member
avatar
加入于:
帖子: 21
声望: 2

感谢大佬分享

Member
avatar
加入于:
帖子: 1
声望: 0

谢谢分享,新手还在学习中。

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3

沪公网安备 31011502017034号