from datetime import time
from typing import List, Dict
import pandas as pd
from vnpy_portfoliostrategy import (
StrategyEngine,
StrategyTemplate,
TickData,
BarData,
TradeData,
BarGenerator,
ArrayManager,
)
from vnpy_portfoliostrategy.utility import PortfolioBarGenerator
from generator import DailyBarGenerator
from vnpy.trader.constant import Direction,Interval
from generator import DailyBarGenerator
from datetime import datetime
class qushiStrategy(StrategyTemplate):
""""""
author = "action"
ema_window = 20
slow_window =20
fast_window =26
boll_dev=2
add_price=60
limt_price=22
fixed_size = 1
price_add: int = 1
capital: int = 40_000
parameters = [
"boll_window",
"boll_dev",
"fixed_size",
"ma_window",
"add_price",
"limt_price",
"price_add"
"capital"
]
variables = []
def __init__(
self,
strategy_engine: "StrategyEngine",
strategy_name: str,
vt_symbols: List[str],
setting: dict
):
"""构造函数"""
super().__init__(strategy_engine, strategy_name, vt_symbols, setting)
# 加载指数合约信息
df: pd.DataFrame = pd.read_csv("index_contract.csv")
self.contracts_sizes: Dict[str, int] = {row.vt_symbol: row.contract_size for _, row in df.iterrows()}
self.factors: Dict[str, DailyFactor] = {}
self.targets: Dict[str, int] = {}
self.sizes: Dict[str, int] = {}
self.tjds: Dict[str, float] = {}
self.zdfds: Dict[str, float] = {}
self.tjdlist=[]
self.zdfdlist=[]
self.ams: Dict[str, ArrayManager] = {}
self.femas: Dict[str, float] = {}
self.semas: Dict[str, float] = {}
self.emas: Dict[str, float] = {}
self.openif:Dict[str, int] = {}
self.long_stops:Dict[str, float] = {}
self.short_stops:Dict[str, float] = {}
self.short_entry:Dict[str, float] = {}
self.long_entry:Dict[str, float] = {}
self.pbg = PortfolioBarGenerator(self.on_bars, 3, self.on_3minute_bars, Interval.MINUTE)
for vt_symbol in vt_symbols:
self.factors[vt_symbol] = DailyFactor(
vt_symbol
)
self.ams[vt_symbol] = ArrayManager()
self.targets[vt_symbol] = 0
def on_init(self):
"""初始化"""
self.write_log("策略初始化")
self.load_bars(10)
def on_start(self):
"""启动"""
pass
def on_stop(self):
"""停止"""
pass
def on_tick(self, tick: TickData):
"""Tick推送"""
self.pbg.update_tick(tick)
def on_bars(self, bars: Dict[str, BarData]):
"""原始K线推送"""
# 全撤之前委托
self.cancel_all()
self.pbg.update_bars(bars)
# 计算合约目标
self.calculate_targets(bars)
for vt_symbol, bar in bars.items():
am: ArrayManager = self.ams[vt_symbol]
if not am.inited:
return
current_pos = self.get_pos(vt_symbol)
if current_pos > 0:
if bar.close_price<self.long_stops[vt_symbol]:
self.targets[vt_symbol]=0
if current_pos < 0:
if bar.close_price>self.short_stops[vt_symbol]:
self.targets[vt_symbol]=0
# 发送交易委托
self.send_orders(bars)
def on_3minute_bars(self, bars: Dict[str, BarData]) -> None:
"""3分钟K线回调"""
self.cancel_all()
for vt_symbol, bar in bars.items():
am: ArrayManager = self.ams[vt_symbol]
am.update_bar(bar)
for vt_symbol, bar in bars.items():
am: ArrayManager = self.ams[vt_symbol]
if not am.inited:
return
self.femas[vt_symbol] = am.ema(self.fast_window)
self.semas[vt_symbol] = am.ema(self.slow_window)
current_pos = self.get_pos(vt_symbol)
self.allopen=0
for vt_symbol, bar in bars.items():
if current_pos!=0:
self.allopen+=1
if current_pos == 0:
if self.allopen==0:
if self.openif[vt_symbol]==1:
if self.femas[vt_symbol]>self.semas[vt_symbol]:
if ( am.close[-1]>=am.high[-2]) and (am.close[-1]>=am.high[-3]):
self.targets[vt_symbol]=self.sizes[vt_symbol]
self.long_stops[vt_symbol]=min(am.low[-1],am.low[-2],am.low[-3])
self.long_entry[vt_symbol]=bar.close_price
if self.femas[vt_symbol]<self.semas[vt_symbol]:
if ( am.close[-1]<=am.low[-2]) and (am.close[-1]>=am.low[-3]):
self.targets[vt_symbol]=-self.sizes[vt_symbol]
self.short_stops[vt_symbol]=max(am.high[-1],am.high[-2],am.high[-3])
self.short_entry[vt_symbol]=bar.close_price
elif current_pos > 0:
if (am.low[-1]>am.low[-2]) and (am.low[-2]>am.low[-3]):
self.long_stops[vt_symbol]=am.low[-2]
elif current_pos < 0:
if (am.high[-1]<am.high[-2]) and (am.high[-2]<am.high[-3]):
self.short_stops[vt_symbol]=am.high[-2]
def calculate_targets(self, bars: Dict[str, BarData]) -> None:
"""计算每个合约手数和投机度、振幅"""
for vt_symbol, bar in bars.items():
factor: DailyFactor= self.factors[vt_symbol]
factor.on_bar(bar)
self.sizes[vt_symbol] = (self.capital*5)//(bar.close_price*self.contracts_sizes[vt_symbol])
self.tjds[vt_symbol] =factor.get_tjd
self.zdfds[vt_symbol] =factor.get_zdfd
sorted_tjd=sorted(self.tjds.items(),key=lambda x:x[1],reverse=False)
sorted_tjd_three=sorted_tjd[:2]
for item in sorted_tjd_three :
self.tjdlist.append(item[0])
if len(self.tjdlist)>=4:
self.tjdlist.pop(0)
sorted_zdfd=sorted(self.zdfds.items(),key=lambda x:x[1],reverse=False)
sorted_zdfd_three=sorted_zdfd[:2]
for item in sorted_zdfd_three :
self.zdfdlist.append(item[0])
if len(self.zdfdlist)>=4:
self.zdfdlist.pop(0)
for vt_symbol, bar in bars.items():
if (vt_symbol in self.tjdlist) or (vt_symbol in self.zdfdlist):
self.openif[vt_symbol]=1
else:
self.openif[vt_symbol]=0
def send_orders(self, bars: Dict[str, BarData]) -> None:
"""发送委托"""
for vt_symbol, bar in bars.items():
# 计算目标和实际仓位差
target = self.targets[vt_symbol]
pos = self.get_pos(vt_symbol)
diff: int = target - pos
# 基于仓位差执行交易
if diff > 0:
# 由于海龟所有开平仓都会先回到仓位0的情况
# 因此只需要考虑本次是开仓还是平仓即可
if pos < 0:
self.cover(vt_symbol, self.short_stops[vt_symbol], abs(diff))
else:
self.buy(vt_symbol, self.long_entry[vt_symbol], abs(diff))
elif diff < 0:
price: float = bar.close_price - self.price_add
if pos > 0:
self.sell(vt_symbol, self.long_stops[vt_symbol], abs(diff))
else:
self.short(vt_symbol, self.short_entry[vt_symbol], abs(diff))
class DailyFactor:
"""品种投机度,振幅计算"""
def __init__(
self,
vt_symbol: str
) -> None:
"""构造函数"""
self.vt_symbol: str = vt_symbol
self.tjd :float=0
self.zdfd:float=0
# 工具
self.am = ArrayManager()
self.bg = DailyBarGenerator(self.on_daily_bar, time(14, 59))
def on_bar(self, bar: BarData):
"""原始K线推送"""
self.bg.update_bar(bar)
def on_daily_bar(self, bar: BarData):
"""日K线推送"""
# 缓存K线序列
self.am.update_bar(bar)
if not self.am.inited:
return
self.tjd=self.am.volume/self.am.open_interest
self.zdfd=(self.am.high-self.am.low )/self.am.open
def get_tjd(self) -> int:
"""获取投机度"""
return self.tjd
def get_zdfd(self) -> int:
"""获取震动幅度"""
return self.zdfd