1. 价差策略模块中自带的策略StatisticalArbitrageStrategy
这个价差策略的大致意思是:
- 当价差的数据发生变化是,特到价差的tick,然后利用BarGenerator来合成出1分钟价差K线;
- 得到1分钟价差K线后再更新一个ArrayManager self.am,self.am初始化成功后就可以计算BOLL通道的中轨、上轨和下轨;
- 当1分钟K线的收盘价与BOLL通道的中轨、上轨和下轨的关系来进行价差的开仓和平仓交易。
2. 价差策略StatisticalArbitrageStrategy的代码及错误
说明:错误的地方我已知注释了。
from vnpy.trader.utility import BarGenerator, ArrayManager
from vnpy.app.spread_trading import (
SpreadStrategyTemplate,
SpreadAlgoTemplate,
SpreadData,
OrderData,
TradeData,
TickData,
BarData
)
class StatisticalArbitrageStrategy(SpreadStrategyTemplate):
""""""
author = "用Python的交易员"
boll_window = 20
boll_dev = 2
max_pos = 10
payup = 10
interval = 5
spread_pos = 0.0
boll_up = 0.0
boll_down = 0.0
boll_mid = 0.0
parameters = [
"boll_window",
"boll_dev",
"max_pos",
"payup",
"interval"
]
variables = [
"spread_pos",
"boll_up",
"boll_down",
"boll_mid"
]
def __init__(
self,
strategy_engine,
strategy_name: str,
spread: SpreadData,
setting: dict
):
""""""
super().__init__(
strategy_engine, strategy_name, spread, setting
)
self.bg = BarGenerator(self.on_spread_bar)
self.am = ArrayManager()
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
self.put_event()
def on_spread_data(self):
"""
Callback when spread price is updated.
"""
tick = self.get_spread_tick()
self.on_spread_tick(tick)
def on_spread_tick(self, tick: TickData):
"""
Callback when new spread tick data is generated.
"""
self.bg.update_tick(tick) # 这里有兼容性错误,BarGenerator处理不了最新价为0的tick
def on_spread_bar(self, bar: BarData):
"""
Callback when spread bar data is generated.
"""
self.stop_all_algos()
self.am.update_bar(bar)
if not self.am.inited:
return
self.boll_mid = self.am.sma(self.boll_window)
self.boll_up, self.boll_down = self.am.boll(
self.boll_window, self.boll_dev)
if not self.spread_pos:
if bar.close_price >= self.boll_up:
self.start_short_algo(
bar.close_price - 10,
self.max_pos,
payup=self.payup,
interval=self.interval
)
elif bar.close_price <= self.boll_down:
self.start_long_algo(
bar.close_price + 10,
self.max_pos,
payup=self.payup,
interval=self.interval
)
elif self.spread_pos < 0:
if bar.close_price <= self.boll_mid:
self.start_long_algo(
bar.close_price + 10,
abs(self.spread_pos),
payup=self.payup,
interval=self.interval
)
else:
if bar.close_price >= self.boll_mid:
self.start_short_algo(
bar.close_price - 10,
abs(self.spread_pos),
payup=self.payup,
interval=self.interval
)
self.put_event()
def on_spread_pos(self):
"""
Callback when spread position is updated.
"""
self.spread_pos = self.get_spread_pos()
self.put_event()
def on_spread_algo(self, algo: SpreadAlgoTemplate):
"""
Callback when algo status is updated.
"""
pass
def on_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback when new trade data is received.
"""
pass
def stop_open_algos(self):
""""""
if self.buy_algoid:
self.stop_algo(self.buy_algoid)
if self.short_algoid:
self.stop_algo(self.short_algoid)
def stop_close_algos(self):
""""""
if self.sell_algoid: # self.sell_algoid没有定义
self.stop_algo(self.sell_algoid)
if self.cover_algoid: # self.cover_algoid没有定义
self.stop_algo(self.cover_algoid)
3. 这里BarGenerator与价差存在兼容性错误:处理不了最新价为0的tick
下面是BarGenerator的update_tick()函数,错误的地方我已知注释了:
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price: # 这个过滤条件有点想当然了
return
# Filter tick data with older timestamp
if self.last_tick and tick.datetime < self.last_tick.datetime:
return
if not self.bar:
new_minute = True
elif (
(self.bar.datetime.minute != tick.datetime.minute)
or (self.bar.datetime.hour != tick.datetime.hour)
):
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
if tick.high_price > self.last_tick.high_price:
self.bar.high_price = max(self.bar.high_price, tick.high_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
if tick.low_price < self.last_tick.low_price:
self.bar.low_price = min(self.bar.low_price, tick.low_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
4. 如何修改此错误?
4.1 修改BarGenerator的update_tick()函数
把我标识的错误的过滤条件改成下面的代码,把它注释掉:
# if not tick.last_price: # 这个过滤条件有点想当然了
# return
4.2 修改价差策略StatisticalArbitrageStrategy
修改之处见代码中的注释:
from vnpy.trader.utility import BarGenerator, ArrayManager
from vnpy.app.spread_trading import (
SpreadStrategyTemplate,
SpreadAlgoTemplate,
SpreadData,
OrderData,
TradeData,
TickData,
BarData
)
class StatisticalArbitrageStrategy(SpreadStrategyTemplate):
""""""
author = "用Python的交易员"
boll_window = 20
boll_dev = 2
max_pos = 10
payup = 10
interval = 5
spread_pos = 0.0
boll_up = 0.0
boll_down = 0.0
boll_mid = 0.0
parameters = [
"boll_window",
"boll_dev",
"max_pos",
"payup",
"interval"
]
variables = [
"spread_pos",
"boll_up",
"boll_down",
"boll_mid"
]
def __init__(
self,
strategy_engine,
strategy_name: str,
spread: SpreadData,
setting: dict
):
""""""
super().__init__(
strategy_engine, strategy_name, spread, setting
)
self.bg = BarGenerator(self.on_spread_bar)
self.am = ArrayManager()
self.buy_algoid:str = "" # hxxjava add
self.short_algoid:str = "" # hxxjava add
self.sell_algoid = "" # hxxjava add
self.cover_algoid = "" # hxxjava add
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
self.put_event()
def on_spread_data(self):
"""
Callback when spread price is updated.
"""
tick = self.get_spread_tick()
self.on_spread_tick(tick)
def on_spread_tick(self, tick: TickData):
"""
Callback when new spread tick data is generated.
"""
self.bg.update_tick(tick)
def on_spread_bar(self, bar: BarData):
"""
Callback when spread bar data is generated.
"""
self.stop_all_algos()
self.am.update_bar(bar)
if not self.am.inited:
return
self.boll_mid = self.am.sma(self.boll_window)
self.boll_up, self.boll_down = self.am.boll(
self.boll_window, self.boll_dev)
if not self.spread_pos:
if bar.close_price >= self.boll_up:
self.buy_algoid = self.start_short_algo( # hxxjava change
bar.close_price - 10,
self.max_pos,
payup=self.payup,
interval=self.interval
)
elif bar.close_price <= self.boll_down:
self.short_algoid = self.start_long_algo( # hxxjava change
bar.close_price + 10,
self.max_pos,
payup=self.payup,
interval=self.interval
)
elif self.spread_pos < 0:
if bar.close_price <= self.boll_mid:
self.sell_algoid = self.start_long_algo( # hxxjava change
bar.close_price + 10,
abs(self.spread_pos),
payup=self.payup,
interval=self.interval
)
else:
if bar.close_price >= self.boll_mid:
self.cover_algoid = self.start_short_algo( # hxxjava change
bar.close_price - 10,
abs(self.spread_pos),
payup=self.payup,
interval=self.interval
)
self.put_event()
def on_spread_pos(self):
"""
Callback when spread position is updated.
"""
self.spread_pos = self.get_spread_pos()
self.put_event()
def on_spread_algo(self, algo: SpreadAlgoTemplate):
"""
Callback when algo status is updated.
"""
if not algo.is_active(): # hxxjava add
if self.buy_algoid == algo.algoid:
self.buy_algoid = ""
elif self.sell_algoid == algo.algoid:
self.sell_algoid = ""
elif self.short_algoid == algo.algoid:
self.short_algoid = ""
else:
self.cover_algoid = ""
def on_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback when new trade data is received.
"""
pass
def stop_open_algos(self):
""""""
if self.buy_algoid:
self.stop_algo(self.buy_algoid)
if self.short_algoid:
self.stop_algo(self.short_algoid)
def stop_close_algos(self):
""""""
if self.sell_algoid:
self.stop_algo(self.sell_algoid)
if self.cover_algoid:
self.stop_algo(self.cover_algoid)