一个等交易长度、固定位置的K线产生器FixedBarGenerator
以下代码保存在vnpy\user_tools\my_strategy_tool.py文件中
from typing import Callable,List,Dict, Tuple, Union
import copy
import numpy as np
import talib
import vnpy.usertools.mylib as mylib
from vnpy.app.cta_strategy import (
BarGenerator,
ArrayManager
)
from vnpy.trader.object import (
BarData,
TickData,
TradeData
)
from vnpy.trader.constant import Interval
from enum import Enum
import datetime
import rqdatac as rq
from rqdatac.utils import to_date
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")
''' 获得上市日期 '''
def get_listed_date(symbol:str):
info = rq.instruments(symbol)
return to_date(info.listed_date)
''' 获得交割日期 '''
def get_de_listed_date(symbol:str):
info = rq.instruments(symbol)
return to_date(info.de_listed_date)
class Timeunit(Enum):
""" 时间单位 """
SECOND = '1s'
MINUTE = '1m'
HOUR = '1h'
class TradeHours(object):
""" 合约交易时间段 """
def __init__(self,symbol:str):
self.symbol = symbol
self.init()
def init(self):
""" 初始化交易日字典及交易时间段数据列表 """
self.listed_date = get_listed_date(self.symbol)
self.de_listed_date = get_de_listed_date(self.symbol)
self.trade_date_index = {} # 合约的交易日索引字典
self.trade_index_date = {} # 交易天数与交易日字典
trade_dates = rq.get_trading_dates(self.listed_date,self.de_listed_date) # 合约的所有的交易日
days = 0
for td in trade_dates:
self.trade_date_index[td] = days
self.trade_index_date[days] = td
days += 1
trading_hours = rq.get_trading_hours(self.symbol,date=self.listed_date,frequency='tick',expected_fmt='datetime')
self.time_dn_pairs = self._get_trading_times_dn(trading_hours)
trading_hours0 = [(start.replace(tzinfo=CHINA_TZ),stop.replace(tzinfo=CHINA_TZ)) for start,stop in trading_hours]
self.trade_date_index[self.listed_date] = (0,trading_hours0)
for day in range(1,days):
td = self.trade_index_date[day]
trade_datetimes = []
for (start,dn1),(stop,dn2) in self.time_dn_pairs:
#start:开始时间,dn1:相对交易日前推天数,
#stop :开始时间,dn2:相对开始时间后推天数
d = self.trade_index_date[day+dn1]
trade_datetimes.append((
datetime.datetime.combine(d,start,tzinfo=CHINA_TZ),
datetime.datetime.combine(d,stop,tzinfo=CHINA_TZ)+datetime.timedelta(days=dn2)))
self.trade_date_index[td] = (day,trade_datetimes)
def _get_trading_times_dn(self,trading_hours:List[Tuple[datetime.datetime,datetime.datetime]]):
"""
交易时间跨天处理,不推荐外部使用 。
产生的结果:[((start1,dn11),(stop1,dn21)),((start2,dn12),(stop2,dn22)),...,((startN,dn1N),(stopN,dn2N))]
其中:
startN:开始时间,dn1N:相对交易日前推天数,
stopN:开始时间,dn2N:相对开始时间后推天数
"""
ilen = len(trading_hours)
if ilen == 0:
return []
start_stops = []
for start,stop in trading_hours:
start_stops.insert(0,(start.time(),stop.time()))
pre_start,pre_stop = start_stops[0]
dn1 = 0
dn2 = 1 if pre_start > pre_stop else 0
time_dn_pairs = [((pre_start,dn1),(pre_stop,dn2))]
for start,stop in start_stops[1:]:
if start > pre_start:
dn1 -= 1
dn2 = 1 if start > stop else 0
time_dn_pairs.insert(0,((start,dn1),(stop,dn2)))
pre_start,pre_stop = start,stop
return time_dn_pairs
def get_date_tradetimes(self,date:datetime.date):
""" 得到合约date日期的交易时间段 """
idx,trade_times = self.trade_date_index.get(date,(None,[]))
return idx,trade_times
def get_trade_datetimes(self,dt:datetime,allday:bool=False):
""" 得到合约date日期的交易时间段 """
# 得到最早的交易时间
idx0,trade_times0 = self.get_date_tradetimes(self.listed_date)
start0,stop0 = trade_times0[0]
if dt < start0:
return None,[]
# 首先找到dt日期自上市以来的交易天数
date,dn = dt.date(),0
days = None
while date < self.de_listed_date:
days,ths = self.trade_date_index.get(date,(None,[]))
if not days:
dn += 1
date = (dt+datetime.timedelta(days=dn)).date()
else:
break
# 如果超出交割日也没有找到,那这就不是一个有效的交易时间
if days is None:
return (None,[])
index_3 = [days,days+1,days-1] # 前后三天的
date_3d = []
for day in index_3:
date = self.trade_index_date.get(day,None)
date_3d.append(date)
# print(date_3d)
for date in date_3d:
if not date:
# print(f"{date} is not trade date")
continue
idx,trade_dts = self.get_date_tradetimes(date)
# print(f"{date} tradetimes {trade_dts}")
ilen = len(trade_dts)
if ilen > 0:
start0,stop = trade_dts[0] # start0 是date交易日的开始时间
start,stop0 = trade_dts[-1]
if dt<start0 or dt>stop0:
continue
for start,stop in trade_dts:
if dt>=start and dt < stop:
if allday:
return idx,trade_dts
else:
return idx,[(start,stop)]
return None,[]
def get_trade_time_perday(self):
""" 计算每日的交易总时长(单位:分钟) """
TTPD = datetime.timedelta(0,0,0)
datetimes = []
today = datetime.datetime.now().date()
for (start,dn1),(stop,dn2) in self.time_dn_pairs:
start_dt = datetime.datetime.combine(today,start,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn1)
stop_dt = datetime.datetime.combine(today,stop,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn2)
time_delta = stop_dt - start_dt
TTPD = TTPD + time_delta
return int(TTPD.seconds/60)
def get_trade_time_inday(self,dt:datetime,unit:Timeunit=Timeunit.MINUTE):
"""
计算dt在交易日内的分钟数
unit: '1s':second;'1m':minute;'1h';1h
"""
TTID = datetime.timedelta(0,0,0)
day,trade_times = self.get_trade_datetimes(dt,allday=True)
if not trade_times:
return None
for start,stop in trade_times:
if dt > stop:
time_delta = stop - start
TTID += time_delta
elif dt > start:
time_delta = dt - start
TTID += time_delta
break
else:
break
if unit == Timeunit.SECOND:
return TTID.seconds
elif unit == Timeunit.MINUTE:
return int(TTID.seconds/60)
elif unit == Timeunit.HOUR:
return int(TTID.seconds/3600)
else:
return TTID
def convet_to_datetime(self,day:int,minutes:int):
""" 计算minutes在第day交易日内的datetime形式的时间 """
date = self.trade_index_date.get(day,None)
if date is None:
return None
idx,trade_times = self.trade_date_index.get(date,(None,[]))
if not trade_times: # 不一定必要
return None
for (start,stop) in trade_times:
timedelta = stop - start
if minutes < int(timedelta.seconds/60):
return start + datetime.timedelta(minutes=minutes)
else:
minutes -= int(timedelta.seconds/60)
return None
def get_bar_window(self,dt:datetime,window:int,interval:Interval=Interval.MINUTE):
""" 计算dt所在K线的起止时间 """
bar_windows = (None,None)
day,trade_times = self.get_trade_datetimes(dt,allday=True)
if not trade_times:
# print(f"day={day} trade_times={trade_times}")
return bar_windows
# 求每个交易日的交易时间分钟数
TTPD = self.get_trade_time_perday()
# 求dt在交易日内的分钟数
TTID = self.get_trade_time_inday(dt,unit=Timeunit.MINUTE)
# 得到dt时刻K线的起止时间
total_minites = day*TTPD + TTID
# 计算K线宽度(分钟数)
if interval == Interval.MINUTE:
bar_width = window
elif interval == Interval.HOUR:
bar_width = 60*window
elif interval == Interval.DAILY:
bar_width = TTPD*window
elif interval == Interval.WEEKLY:
bar_width = TTPD*window*5
else:
return bar_windows
# 求K线的开始时间的和结束的分钟形式
start_m = int(total_minites/bar_width)*bar_width
stop_m = start_m + bar_width
# 计算K开始时间的datetime形式
start_d = int(start_m / TTPD)
minites = start_m % TTPD
start_dt = self.convet_to_datetime(start_d,minites)
# print(f"start_d={start_d} minites={minites}---->{start_dt}")
# 计算K结束时间的datetime形式
stop_d = int(stop_m / TTPD)
minites = stop_m % TTPD
stop_dt = self.convet_to_datetime(stop_d,minites)
# print(f"stop_d={stop_d} minites={minites}---->{stop_dt}")
return start_dt,stop_dt
class FixedBarGenerator(BarGenerator):
""" 固定位置K线生成器 """
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE,
symbol:str=''
):
super().__init__(on_bar,window,on_window_bar,interval)
self.trade_hours = TradeHours(symbol)
self.kx_start,self.kx_stop = None,None
def update_bar(self,bar:BarData) -> None:
"""
Update 1 minute bar into generator
"""
# 如果bar的时间戳笔windows的时间戳还早,丢弃bar
if self.window_bar and bar.datetime < self.window_bar.datetime:
return
if (self.kx_start,self.kx_stop) == (None,None):
self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
if (self.kx_start,self.kx_stop) == (None,None):
return
# If not inited, creaate window bar object
if (not self.window_bar):
# 获得K线的交易起止时间
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=self.kx_start,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
elif self.kx_start <= bar.datetime and bar.datetime < self.kx_stop:
# 1分钟K线属于当前K线
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
elif bar.datetime >= self.kx_stop: # Check if window bar completed
self.on_window_bar(self.window_bar)
self.window_bar = None
self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
if (self.kx_start,self.kx_stop) == (None,None):
# 不在交易时段
return
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=self.kx_start,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
创建一个可显示K线的策略FixedKxStrategy
以下代码为 [用户目录]\strategies\FixedKxStrategy.py的内容
from typing import Any,List,Dict,Tuple
import copy
from vnpy.app.cta_strategy import (
CtaTemplate,
BarGenerator,
ArrayManager,
StopOrder,
Direction
)
from vnpy.trader.engine import MainEngine,EventEngine
from vnpy.app.cta_strategy.engine import CtaEngine
from vnpy.event.engine import Event
from vnpy.trader.object import (
LogData,
TickData,
BarData,
TradeData,
OrderData,
)
from vnpy.app.cta_strategy import StopOrder
from vnpy.app.cta_strategy.base import EngineType
from vnpy.trader.constant import Interval
from vnpy.app.cta_strategy.base import (
APP_NAME,
EVENT_CTA_LOG,
EVENT_CTA_TICK,
EVENT_CTA_HISTORY_BAR,
EVENT_CTA_BAR,
EVENT_CTA_ORDER,
EVENT_CTA_TRADE,
EVENT_CTA_STOPORDER,
EVENT_CTA_STRATEGY,
)
from vnpy.usertools.kx_chart import ( # hxxjava add
NewChartWidget,
CandleItem,
VolumeItem,
LineItem,
SmaItem,
RsiItem,
MacdItem,
)
from vnpy.usertools.my_strategy_tool import FixedBarGenerator
class FixedKxStrategy(CtaTemplate):
""""""
author = "hxxjava"
kx_interval = 1
show_chart = False # 显示K线图表
parameters = [
"kx_interval",
"show_chart"
]
kx_count:int = 0
cta_manager = None
variables = ["kx_count"]
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
):
super().__init__(cta_engine,strategy_name,vt_symbol,setting)
symbol,exchange = self.vt_symbol.split('.')
self.bg = FixedBarGenerator(self.on_bar,self.kx_interval,self.on_Nmin_bar,symbol=symbol.upper())
self.am = ArrayManager()
cta_engine:CtaEngine = self.cta_engine
self.engine_type = cta_engine.engine_type
self.even_engine = cta_engine.main_engine.event_engine
# 必须在这里声明,因为它们是实例变量
self.all_bars:List[BarData] = []
self.cur_window_bar:[BarData] = None
self.bar_updated = False
def on_init(self):
"""
Callback when strategy is inited.
"""
self.load_bar(20)
if len(self.all_bars)>0:
self.send_event(EVENT_CTA_HISTORY_BAR,self.all_bars)
def on_start(self):
""" """
self.write_log("已开始")
def on_stop(self):
""""""
self.write_log("_kx_strategy 已停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bar_updated = False
self.current_tick = tick # 记录最新tick
# 再更新tick,产生1分钟K线乃至N 分钟线
self.bg.update_tick(tick)
if self.inited:
# 先产生当前临时K线
self.cur_window_bar = self.get_cur_window_bar()
if self.cur_window_bar:
# 发送当前临时K线更新消息
self.send_event(EVENT_CTA_BAR,self.cur_window_bar)
self.send_event(EVENT_CTA_TICK,tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
if self.inited:
self.write_log(f"I got a 1min BarData at {bar.datetime}")
self.bg.update_bar(bar)
self.bar_updated = True
def on_Nmin_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.all_bars.append(bar)
self.kx_count = len(self.all_bars)
if self.inited:
self.write_log(f"I got a {self.kx_interval}min BarData at {bar.datetime}")
self.send_event(EVENT_CTA_BAR,bar)
self.put_event()
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.send_event(EVENT_CTA_TRADE,trade)
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
self.send_event(EVENT_CTA_ORDER,order)
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
self.send_event(EVENT_CTA_STOPORDER,stop_order)
def get_cur_window_bar(self):
window_bar = copy.deepcopy(self.bg.window_bar)
bar = self.bg.bar
if not(window_bar): # 刚产生过window_bar
return None
if self.bar_updated: # 刚更新过window_bar
return window_bar
# 上一分钟window_bar和当前bar合成出临时window bar
window_bar.high_price = max(window_bar.high_price, bar.high_price)
window_bar.low_price = min(window_bar.low_price, bar.low_price)
# Update close price/volume into window bar
window_bar.close_price = bar.close_price
window_bar.volume += int(bar.volume)
window_bar.open_interest = bar.open_interest
return window_bar
def send_event(self,event_type:str,data:Any):
"""
只在实盘引擎并且配置为显示K线图表的情况下发送小线
"""
if self.engine_type==EngineType.LIVE and self.show_chart: # "如果显示K线图表"
self.even_engine.put(Event(event_type,(self.strategy_name,data)))
def init_kx_chart(self,kx_chart:NewChartWidget=None): # hxxjava add ----- 提供给外部调用
# self.write_log("init_kx_chart executed !!!")
if kx_chart:
kx_chart.add_plot("candle", hide_x_axis=True)
kx_chart.add_plot("volume", maximum_height=150)
kx_chart.add_plot("rsi", maximum_height=150)
kx_chart.add_plot("macd", maximum_height=150)
kx_chart.add_item(CandleItem, "candle", "candle")
kx_chart.add_item(VolumeItem, "volume", "volume")
kx_chart.add_item(LineItem, "line", "candle")
kx_chart.add_item(SmaItem, "sma", "candle")
kx_chart.add_item(RsiItem, "rsi", "rsi")
kx_chart.add_item(MacdItem, "macd", "macd")
kx_chart.add_last_price_line()
kx_chart.add_cursor()
运行策略FixedKxStrategy
具体操作方法以及缺少的NewChartWidget等组件参见:
https://www.vnpy.com/forum/topic/3920-wei-kxian-tu-biao-tian-zhuan-jia-wa-rang-ctace-lue-de-yun-xing-kan-de-jian-1
设置方法
按下图的步骤创建一个可以显示K线图的用户策略

运行效果
以下为ag2012.SHFE的30分钟K线图,它显示的K线是等交易时长,位置也是固定的。
