from vnpy_ctastrategy import (
CtaTemplate,
StopOrder,
Direction,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager,
)
from vnpy.trader.constant import Interval
class TurtleSignalStrategyPro_hour(CtaTemplate):
""""""
author = "用Python的交易员"
entry_window = 20
exit_window = 10
atr_window = 20
fixed_size = 1
rsi_lenth = 7
rsi_up = 80
rsi_down = 20
entry_up = 0
entry_down = 0
exit_up = 0
exit_down = 0
atr_value = 0
long_entry = 0
short_entry = 0
long_stop = 0
short_stop = 0
parameters = ["entry_window", "exit_window", "atr_window", "fixed_size",'rsi_lenth','rsi_up','rsi_down']
variables = ["entry_up", "entry_down", "exit_up", "exit_down", "atr_value", 'long_entry', 'short_entry', 'long_stop', 'short_stop']
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar, 1, self.on_hour_bar, Interval.HOUR)
self.am = ArrayManager()
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(5)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.bg.update_bar(bar)
def on_hour_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
self.am.update_bar(bar)
if not self.am.inited:
return
# Only calculates new entry channel when no position holding
if not self.pos:
self.entry_up, self.entry_down = self.am.donchian(
self.entry_window
)
self.exit_up, self.exit_down = self.am.donchian(self.exit_window)
if not self.pos:
self.atr_value = self.am.atr(self.atr_window)
self.long_entry = 0
self.short_entry = 0
self.long_stop = 0
self.short_stop = 0
self.send_buy_orders(self.entry_up)
self.send_short_orders(self.entry_down)
elif self.pos > 0:
if self.am.rsi(self.rsi_lenth) >= self.rsi_up:
self.sell(bar.close_price,abs(self.pos),stop=True)
else:
self.send_buy_orders(self.entry_up)
sell_price = max(self.long_stop, self.exit_down)
self.sell(sell_price, abs(self.pos), stop=True)
elif self.pos < 0:
if self.am.rsi(self.rsi_lenth) <= self.rsi_down:
self.cover(bar.close_price,abs(self.pos),stop=True)
else:
self.send_short_orders(self.entry_down)
cover_price = min(self.short_stop, self.exit_up)
self.cover(cover_price, abs(self.pos), stop=True)
self.put_event()
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
if trade.direction == Direction.LONG:
self.long_entry = trade.price
self.long_stop = self.long_entry - 2 * self.atr_value
else:
self.short_entry = trade.price
self.short_stop = self.short_entry + 2 * self.atr_value
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
pass
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass
def send_buy_orders(self, price):
""""""
t = self.pos / self.fixed_size
if t < 1:
self.buy(price, self.fixed_size, stop=True)
if t < 2:
self.buy(price + self.atr_value * 0.5, self.fixed_size*2, stop=True)
if t < 3:
self.buy(price + self.atr_value, self.fixed_size, stop=True)
def send_short_orders(self, price):
""""""
t = self.pos / self.fixed_size
if t > -1:
self.short(price, self.fixed_size, stop=True)
if t > -2:
self.short(price - self.atr_value * 0.5, self.fixed_size*2, stop=True)
if t > -3:
self.short(price - self.atr_value, self.fixed_size, stop=True)