1. 交易线的代码
把下面的代码保持为vnpy\usertools\trade_line.py:
'''
Author: hxxjava
Date: 2021-06-24 19:57:32
LastEditTime: 2021-07-05 21:23:04
LastEditors: Please set LastEditors
Description:
'''
from dataclasses import dataclass
from typing import Callable
from vnpy.trader.constant import Direction,Offset
from vnpy.event import EventEngine,Event
from copy import deepcopy
EVENT_PRICE_CHANGE = "ePriceChange." # 价格变化消息
@dataclass
class PriceData:
"""
价格变化数据定义
"""
sponsor:str # 发起者
price_name:str # 价格名称
price:float # 价格
@dataclass
class TradeCommand:
"""
交易指令数据定义
"""
trade_line_name:str
sponsor:str # 发起者
price_name:str # 价格名称
direction:Direction
offset:Offset
price:float
payup:float
trade_num:int
class TradeLine():
"""
名称:交易线。按照设定的条件,发出交易指令。相当于预警下单线。
"""
def __init__(self,event_engine:EventEngine,tc:TradeCommand,condition:str,on_trade_command: Callable,excuted:bool=False):
self.event_engine = event_engine
self.tc = tc
self.condition = condition
self.price = None
self.excuted = excuted
self.on_trade_command = on_trade_command
self.register_event()
def register_event(self):
""" 注册消息 """
self.event_engine.register(EVENT_PRICE_CHANGE,self.process_pricechange_event)
def process_pricechange_event(self,event:Event):
""" """
pc:PriceData = event.data
if self.tc.sponsor == pc.sponsor and self.tc.price_name == pc.price_name:
self.check_price(pc.price)
def check_price(self,price:float):
""" """
if self.excuted:
self.price = price
return
if self.condition == ">":
# 大于触发价
if price > self.tc.price:
self.send_trade_event(price)
elif self.condition == "<":
# 大于触发价
if price < self.tc.price:
self.send_trade_event(price)
elif self.condition == ">=":
# 大于触发价
if price >= self.tc.price:
self.send_trade_event(price)
elif self.condition == "<=":
# 大于触发价
if price <= self.tc.price:
self.send_trade_event(price)
elif self.condition == "^":
# 上穿触发价
if self.price is not None:
if self.price <= self.tc.price < price:
self.send_trade_event(price)
elif self.condition == "v":
# 下穿触发价
if self.price is not None:
if self.price >= self.tc.price > price:
self.send_trade_event(price)
self.price = price
def send_trade_event(self,price:float):
""" """
if self.excuted:
return
tc = deepcopy(self.tc) # 简单,不模棱两可
# tc.price=price # 注释掉,直接用交易线的价格
self.excuted = True
# 执行交易指令
self.on_trade_command(tc)
def set_emit_price(self,price:float):
""" 设置交易线触发价格 """
self.tc.price = price
def set_excuted(self,excuted:bool):
""" 设置交易线是否交易过标志 """
self.excuted = excuted
def reset(self):
""" 复位交易线 """
self.price = None
self.excuted = False
def __repr__(self) -> str:
out_str = "TradeLine=({},condition={},on_trade_command={},price={},excuted={})".\
format(
self.tc,
self.condition,
self.on_trade_command,
self.price,
self.excuted
)
return out_str
2. 如何使用交易线
下面是一个测试用的价差交易的策略例子,只为可以演示交易线的创建、触发价格更新和最新价格的更新和交易动作的执行,并不表示该策略可以盈利。
把下面代码保存为boll_arbitrage_strategy.py,保存到vnpy_spreadtrading\stategies目录下,就可以创建价差策略来观察trade_line的运作机制。
""" """
from datetime import datetime
from typing import List,Dict,Tuple,Union
from vnpy.event.engine import Event, EventEngine
from os import close, name, write
from typing import Dict
from vnpy.trader.constant import Direction, Offset,InstrumentStatus
from numpy import nan
import talib
from vnpy.trader.utility import BarGenerator, ArrayManager
from vnpy_spreadtrading.base import LegData
from vnpy_spreadtrading import (
SpreadStrategyTemplate,
SpreadAlgoTemplate,
SpreadData,
OrderData,
TradeData,
TickData,
BarData
)
from vnpy.usertools.trade_line import (
EVENT_PRICE_CHANGE,
PriceData,
TradeCommand,
TradeLine
)
class BollArbitrageStrategy(SpreadStrategyTemplate):
"""
利用Keltner通道进行套利的一种价差交易
"""
author = "hxxjava"
bar_window = 5 # K线周期
boll_window = 26 # BOLL参数1
max_steps = 4 # 最大开仓次数
min_step = 2 # 最小开仓距离
step_pos = 1 # 每步开仓数量
payup = 10
interval = 5
spread_pos = 0.0 # 当前价差数量
boll_mid = nan # BOLL中轨
step = nan # 开仓步长
parameters = [
"bar_window",
"boll_window",
"max_steps",
"min_step",
"step_pos",
"payup",
"interval"
]
variables = [
"boll_mid",
"step",
"spread_pos",
]
def __init__(
self,
strategy_engine,
strategy_name: str,
spread: SpreadData,
setting: dict
):
""""""
super().__init__(
strategy_engine, strategy_name, spread, setting
)
self.event_engine:EventEngine = self.strategy_engine.event_engine
self.bg = BarGenerator(self.on_spread_bar,self.bar_window,self.on_xmin_spread_bar)
self.am = ArrayManager(size=60)
self.pre_spread_pos = self.spread_pos
self.trade_lines:Dict[str,TradeLine] = {}
self.init_trade_lines()
self.bars:List[BarData] = []
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(days=10,callback=self.on_spread_bar)
# self.load_tick(days=1)
def on_start(self):
"""
Callback when strategy is started.
"""
# 开启响应的所有交易线的交易功能
self.write_log("策略启动")
self.adjust_trade_lines_emitprice()
self.set_trade_lines_excuted("all_lines",False)
self.adjust_trade_lines_excute()
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
self.put_event()
def on_spread_data(self):
"""
Callback when spread price is updated.
"""
tick = self.get_spread_tick()
self.on_spread_tick(tick)
def on_spread_tick(self, tick: TickData):
"""
Callback when new spread tick data is generated.
"""
self.bg.update_tick(tick)
if self.trading:
pc = PriceData(sponsor=self.strategy_name,price_name=self.spread_name,price=tick.last_price)
self.event_engine.put(Event(EVENT_PRICE_CHANGE,data=pc))
def on_spread_bar(self,bar:BarData):
"""
Callback when 1 min spread bar data is generated.
"""
# print(f"on_spread_bar bar={bar}")
self.bg.update_bar(bar)
def on_xmin_spread_bar(self, bar: BarData):
"""
Callback when x min spread bar data is generated.
"""
self.bars.append(bar)
if len(self.bars) > 100:
self.bars.pop(0)
self.am.update_bar(bar)
if not self.am.inited:
return
# self.boll_mid = self.am.ma(self.boll_window)
self.boll_mid = talib.MA(self.am.close,self.boll_window)[-1]
# BOLL通道
std = self.am.std(self.boll_window,array=True)
temp = talib.MA(std,5)
self.step = 2*temp[-1]
self.adjust_trade_lines_emitprice()
print(f"boll_mid,step=({self.boll_mid,self.step})")
self.put_event()
def on_spread_pos(self):
"""
Callback when spread position is updated.
"""
self.spread_pos = self.get_spread_pos()
if self.spread_pos == 0:
if self.pre_spread_pos>0:
# 复位做多交易线和空平交易线交易标志
self.set_trade_lines_excuted("below_lines",False)
if self.pre_spread_pos<0:
# 复位做多交易线和空平交易线交易标志
self.set_trade_lines_excuted("above_lines",False)
if self.pre_spread_pos == 0:
if self.spread_pos>0:
# 当持有多仓时,使能平多仓交易线
self.set_trade_lines_excuted("above_mid",False)
if self.spread_pos<0:
# 当持有空仓时,使能平多仓交易线
self.set_trade_lines_excuted("below_mid",False)
self.pre_spread_pos = self.spread_pos
self.put_event()
def init_trade_lines(self):
"""
init all trade lines
"""
for step in range(self.min_step,self.min_step+self.max_steps):
line_name = f"trade_line{step}"
tc = TradeCommand(trade_line_name=line_name,
sponsor=self.strategy_name,
price_name=self.spread_name,
direction=Direction.SHORT,
offset=Offset.OPEN,
price=0.0,trade_num=self.step_pos,payup=self.payup)
trade_line = TradeLine(
event_engine = self.event_engine,
condition=">=",
tc = tc,on_trade_command=self.on_trade_command,
excuted=True)
self.trade_lines[line_name] = trade_line
line_name = f"trade_line{-step}"
tc = TradeCommand(trade_line_name=line_name,
sponsor=self.strategy_name,
price_name=self.spread_name,
direction=Direction.LONG,
offset=Offset.OPEN,
price=0.0,trade_num=self.step_pos,payup=self.payup)
trade_line = TradeLine(
event_engine = self.event_engine,
condition="<=",
tc = tc,on_trade_command=self.on_trade_command,
excuted=True)
self.trade_lines[line_name] = trade_line
line_name = f"below_mid"
tc = TradeCommand(trade_line_name=line_name,
sponsor=self.strategy_name,
price_name=self.spread_name,
direction=Direction.LONG,
offset=Offset.CLOSE,
price=0.0,trade_num=-1,payup=self.payup)
trade_line = TradeLine(
event_engine = self.event_engine,
condition="v", # 中轨之下
tc = tc,on_trade_command=self.on_trade_command,
excuted=False if self.spread_pos < 0 else True)
self.trade_lines[line_name] = trade_line
line_name = f"above_mid"
tc = TradeCommand(trade_line_name=line_name,
sponsor=self.strategy_name,
price_name=self.spread_name,
direction=Direction.SHORT,
offset=Offset.CLOSE,
price=0.0,trade_num=-1,payup=self.payup)
trade_line = TradeLine(
event_engine = self.event_engine,
condition="^", # 中轨之上
tc = tc,on_trade_command=self.on_trade_command,
excuted=False if self.spread_pos > 0 else True)
self.trade_lines[line_name] = trade_line
def on_trade_command(self,tc:TradeCommand):
"""
process trade command
"""
if not self.trading:
return
# print(f"excute trade command :{self.get_spread_tick()} {tc}")
if nan in [self.boll_mid,self.step]:
return
if tc.direction == Direction.LONG:
volume = tc.trade_num
if tc.trade_line_name == "below_mid":
# 中轨下全部清空仓
if not self.spread_pos < 0:
self.write_log("无空仓可平!")
return
volume = abs(self.spread_pos)
if volume == 0:
self.write_log("做多0$错误!")
return
next_vol = abs(self.spread_pos+volume)
if not next_vol <= self.max_steps*self.step_pos:
self.write_log("再做多将超过最大仓位")
return
algoid = self.start_long_algo(
price=tc.price,
volume=volume,
payup=tc.payup,
offset=tc.offset,
interval=self.interval)
print(f"executed start_long_algo : {algoid}")
elif tc.direction == Direction.SHORT:
volume = tc.trade_num
if tc.trade_line_name == "above_mid":
# 中轨下全部清多仓
if not self.spread_pos > 0:
self.write_log("无空仓可平!")
return
volume = abs(self.spread_pos)
if volume == 0:
self.write_log("做空0$错误!")
return
next_vol = abs(self.spread_pos-volume)
if not next_vol <= self.max_steps*self.step_pos:
self.write_log("再做空将超过最大仓位")
return
algoid = self.start_short_algo(
price=tc.price,
volume=volume,
payup=tc.payup,
offset=tc.offset,
interval=self.interval)
print(f"executed start_short_algo : {algoid}")
def adjust_trade_lines_emitprice(self):
""" 调整各个交易线的触发价格 """
for step in range(self.min_step,self.min_step+self.max_steps):
# 调整做空交易线的触发价格
line_name = f"trade_line{step}"
trade_line = self.trade_lines.get(line_name,None)
if trade_line:
trade_line.set_emit_price(self.boll_mid+step*self.step)
# 调整做多交易线的触发价格
line_name = f"trade_line{-step}"
trade_line = self.trade_lines.get(line_name,None)
if trade_line:
trade_line.set_emit_price(self.boll_mid-step*self.step)
# 调整上中轨线和下中轨线的触发价格
for line_name in ["above_mid","below_mid"]:
trade_line = self.trade_lines.get(line_name,None)
if trade_line:
trade_line.set_emit_price(self.boll_mid)
def set_trade_lines_excuted(self,select:str,excuted:bool):
"""
设置两套多交易线的执行标志
select = "all_lines":设置所有交易线的执行标志
"above_lines":设置所有做空交易线的执行标志
"below_lines":设置所有做多交易线的执行标志
"below_mid":设置平空仓交易线的执行标志
"above_mid":设置平多仓交易线的执行标志
excuted = True:已执行;False:未执行
"""
for step in range(self.min_step,self.min_step+self.max_steps):
if select in ["above_lines","all_lines"]:
# 做空交易线
line_name = f"trade_line{step}"
trade_line = self.trade_lines.get(line_name,None)
if trade_line:
trade_line.set_excuted(excuted)
if select in ["below_lines","all_lines"]:
# 做多交易线
line_name = f"trade_line{-step}"
trade_line = self.trade_lines.get(line_name,None)
if trade_line:
trade_line.set_excuted(excuted)
if select in ["below_mid","all_lines"]:
# 下中轨线
trade_line = self.trade_lines.get("below_mid",None)
if trade_line:
trade_line.set_excuted(excuted)
if select in ["above_mid","all_lines"]:
# 上中轨线
trade_line = self.trade_lines.get("above_mid",None)
if trade_line:
trade_line.set_excuted(excuted)
def adjust_trade_lines_excute(self):
""" 调整各个交易线的执行标志 """
if self.spread_pos == 0:
self.set_trade_lines_excuted("above_lines",False)
self.set_trade_lines_excuted("below_lines",False)
self.set_trade_lines_excuted("above_mid",True)
self.set_trade_lines_excuted("below_mid",True)
elif self.spread_pos > 0:
self.set_trade_lines_excuted("above_lines",True)
self.set_trade_lines_excuted("above_mid",True)
self.set_trade_lines_excuted("below_mid",False)
elif self.spread_pos < 0:
self.set_trade_lines_excuted("below_lines",True)
self.set_trade_lines_excuted("above_mid",False)
self.set_trade_lines_excuted("below_mid",True)
def on_spread_algo(self, algo: SpreadAlgoTemplate):
"""
Callback when algo status is updated.
"""
if not algo.is_active():
print(f"algoid = {algo.algoid}")
def on_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
print(f"{self.spread_name} {order}")
def on_trade(self, trade: TradeData):
"""
Callback when new trade data is received.
"""
print(f"{self.spread_name} {trade}")
3 交易线的优点
- vnpy停止单只能在vnpy的CTA策略模块中使用,而交易线可以在几乎所有的vnpy模块的策略中使用;
- 交易线的触发交易的价格是可变的,它可以让您的指标具备交易功能,这个在上面的例子中已经做了演示;
- 和vnpy停止单一样,交易线也是本地的条件单,但它只有符合条件才出发交易动作。
慢体会它用法,您也许会有惊喜发现!