一个等交易长度、固定位置的K线产生器FixedBarGenerator
以下代码保存在vnpy\user_tools\my_strategy_tool.py文件中
from typing import Callable,List,Dict, Tuple, Union
import copy
import numpy as np
import talib
import vnpy.usertools.mylib as mylib
from vnpy.app.cta_strategy import (
    BarGenerator,
    ArrayManager
)
from vnpy.trader.object import (
    BarData,
    TickData,
    TradeData
)
from vnpy.trader.constant import Interval
from enum import Enum
import datetime 
import rqdatac as rq
from rqdatac.utils import to_date
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")
''' 获得上市日期 '''
def get_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.listed_date)
''' 获得交割日期 '''
def get_de_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.de_listed_date)
class Timeunit(Enum):
    """ 时间单位 """
    SECOND = '1s'
    MINUTE = '1m'
    HOUR = '1h'
class TradeHours(object):
    """ 合约交易时间段 """
    def __init__(self,symbol:str):
        self.symbol = symbol
        self.init()
    def init(self):
        """ 初始化交易日字典及交易时间段数据列表 """
        self.listed_date = get_listed_date(self.symbol)
        self.de_listed_date = get_de_listed_date(self.symbol)
        self.trade_date_index = {}   # 合约的交易日索引字典
        self.trade_index_date = {}   # 交易天数与交易日字典
        trade_dates = rq.get_trading_dates(self.listed_date,self.de_listed_date) # 合约的所有的交易日
        days = 0
        for td in trade_dates:
            self.trade_date_index[td] = days
            self.trade_index_date[days] = td
            days += 1
        trading_hours = rq.get_trading_hours(self.symbol,date=self.listed_date,frequency='tick',expected_fmt='datetime')
        self.time_dn_pairs = self._get_trading_times_dn(trading_hours)
        trading_hours0 = [(start.replace(tzinfo=CHINA_TZ),stop.replace(tzinfo=CHINA_TZ)) for start,stop in trading_hours]
        self.trade_date_index[self.listed_date] = (0,trading_hours0)
        for day in range(1,days):
            td = self.trade_index_date[day]
            trade_datetimes = []
            for (start,dn1),(stop,dn2) in self.time_dn_pairs:
                #start:开始时间,dn1:相对交易日前推天数,
                #stop :开始时间,dn2:相对开始时间后推天数     
                d = self.trade_index_date[day+dn1]
                trade_datetimes.append((
                    datetime.datetime.combine(d,start,tzinfo=CHINA_TZ),
                    datetime.datetime.combine(d,stop,tzinfo=CHINA_TZ)+datetime.timedelta(days=dn2)))
            self.trade_date_index[td] = (day,trade_datetimes)
    def _get_trading_times_dn(self,trading_hours:List[Tuple[datetime.datetime,datetime.datetime]]): 
        """ 
        交易时间跨天处理,不推荐外部使用 。
        产生的结果:[((start1,dn11),(stop1,dn21)),((start2,dn12),(stop2,dn22)),...,((startN,dn1N),(stopN,dn2N))]
        其中:
            startN:开始时间,dn1N:相对交易日前推天数,
            stopN:开始时间,dn2N:相对开始时间后推天数      
        """
        ilen = len(trading_hours)
        if ilen == 0:
            return []
        start_stops = []
        for start,stop in trading_hours:
            start_stops.insert(0,(start.time(),stop.time()))
        pre_start,pre_stop = start_stops[0]
        dn1 = 0
        dn2 = 1 if pre_start > pre_stop else 0
        time_dn_pairs = [((pre_start,dn1),(pre_stop,dn2))]
        for start,stop in start_stops[1:]:
            if start > pre_start:
                dn1 -= 1
            dn2 = 1 if start > stop else 0
            time_dn_pairs.insert(0,((start,dn1),(stop,dn2)))
            pre_start,pre_stop = start,stop
        return time_dn_pairs
    def get_date_tradetimes(self,date:datetime.date):
        """ 得到合约date日期的交易时间段 """
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        return idx,trade_times
    def get_trade_datetimes(self,dt:datetime,allday:bool=False):
        """ 得到合约date日期的交易时间段 """
        # 得到最早的交易时间
        idx0,trade_times0 = self.get_date_tradetimes(self.listed_date)
        start0,stop0 = trade_times0[0]
        if dt < start0:
            return None,[]
        # 首先找到dt日期自上市以来的交易天数
        date,dn = dt.date(),0
        days = None
        while date < self.de_listed_date:
            days,ths = self.trade_date_index.get(date,(None,[]))
            if not days:
                dn += 1
                date = (dt+datetime.timedelta(days=dn)).date()
            else:
                break
        # 如果超出交割日也没有找到,那这就不是一个有效的交易时间
        if days is None:
            return (None,[])
        index_3 = [days,days+1,days-1]  # 前后三天的
        date_3d = []
        for day in index_3: 
            date = self.trade_index_date.get(day,None)
            date_3d.append(date)
        # print(date_3d)
        for date in date_3d:
            if not date:
                # print(f"{date} is not trade date")
                continue
            idx,trade_dts = self.get_date_tradetimes(date)
            # print(f"{date} tradetimes {trade_dts}")
            ilen = len(trade_dts)
            if ilen > 0:
                start0,stop = trade_dts[0]      # start0 是date交易日的开始时间
                start,stop0 = trade_dts[-1]
            if dt<start0 or dt>stop0:
                continue
            for start,stop in trade_dts:
                if dt>=start and dt < stop:
                    if allday:
                        return idx,trade_dts
                    else:
                        return idx,[(start,stop)]
        return None,[]
    def get_trade_time_perday(self):
        """ 计算每日的交易总时长(单位:分钟) """
        TTPD = datetime.timedelta(0,0,0)
        datetimes = []
        today = datetime.datetime.now().date()
        for (start,dn1),(stop,dn2) in self.time_dn_pairs:
            start_dt = datetime.datetime.combine(today,start,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn1)
            stop_dt = datetime.datetime.combine(today,stop,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn2)
            time_delta = stop_dt - start_dt
            TTPD = TTPD + time_delta
        return int(TTPD.seconds/60)
    def get_trade_time_inday(self,dt:datetime,unit:Timeunit=Timeunit.MINUTE):
        """ 
        计算dt在交易日内的分钟数 
        unit: '1s':second;'1m':minute;'1h';1h
        """
        TTID = datetime.timedelta(0,0,0)
        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            return None
        for start,stop in trade_times:
            if dt > stop:
                time_delta = stop - start
                TTID += time_delta
            elif dt > start:
                time_delta = dt - start
                TTID += time_delta     
                break
            else:
                break          
        if unit == Timeunit.SECOND:
            return TTID.seconds
        elif unit == Timeunit.MINUTE:
            return int(TTID.seconds/60) 
        elif unit == Timeunit.HOUR:
            return int(TTID.seconds/3600) 
        else:
            return TTID
    def convet_to_datetime(self,day:int,minutes:int):
        """ 计算minutes在第day交易日内的datetime形式的时间 """
        date = self.trade_index_date.get(day,None)
        if date is None:
            return None
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        if not trade_times:     # 不一定必要
            return None
        for (start,stop) in trade_times:
            timedelta = stop - start 
            if minutes < int(timedelta.seconds/60):
                return start + datetime.timedelta(minutes=minutes)
            else:
                minutes -= int(timedelta.seconds/60)
        return None
    def get_bar_window(self,dt:datetime,window:int,interval:Interval=Interval.MINUTE):
        """ 计算dt所在K线的起止时间 """
        bar_windows = (None,None)
        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            # print(f"day={day} trade_times={trade_times}")
            return bar_windows
        # 求每个交易日的交易时间分钟数
        TTPD = self.get_trade_time_perday()
        # 求dt在交易日内的分钟数
        TTID = self.get_trade_time_inday(dt,unit=Timeunit.MINUTE)
        # 得到dt时刻K线的起止时间 
        total_minites = day*TTPD + TTID
        # 计算K线宽度(分钟数)
        if interval == Interval.MINUTE:
            bar_width = window
        elif interval == Interval.HOUR:
            bar_width = 60*window
        elif interval == Interval.DAILY:
            bar_width = TTPD*window
        elif interval == Interval.WEEKLY:
            bar_width = TTPD*window*5
        else:
            return bar_windows
        # 求K线的开始时间的和结束的分钟形式
        start_m = int(total_minites/bar_width)*bar_width
        stop_m = start_m + bar_width
        # 计算K开始时间的datetime形式
        start_d = int(start_m / TTPD)
        minites = start_m % TTPD
        start_dt = self.convet_to_datetime(start_d,minites)
        # print(f"start_d={start_d} minites={minites}---->{start_dt}")
        # 计算K结束时间的datetime形式
        stop_d = int(stop_m / TTPD)
        minites = stop_m % TTPD
        stop_dt = self.convet_to_datetime(stop_d,minites)
        # print(f"stop_d={stop_d} minites={minites}---->{stop_dt}")
        return start_dt,stop_dt
class FixedBarGenerator(BarGenerator):
    """ 固定位置K线生成器 """ 
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE,
        symbol:str=''
    ):
        super().__init__(on_bar,window,on_window_bar,interval)
        self.trade_hours = TradeHours(symbol)
        self.kx_start,self.kx_stop = None,None
    def update_bar(self,bar:BarData) -> None:
        """
        Update 1 minute bar into generator
        """
        # 如果bar的时间戳笔windows的时间戳还早,丢弃bar
        if self.window_bar and bar.datetime < self.window_bar.datetime:
            return
        if (self.kx_start,self.kx_stop) == (None,None):
            self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
            if (self.kx_start,self.kx_stop) == (None,None):
                return
        # If not inited, creaate window bar object
        if (not self.window_bar):
            # 获得K线的交易起止时间
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=self.kx_start,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )         
        elif self.kx_start <= bar.datetime and bar.datetime < self.kx_stop:
            # 1分钟K线属于当前K线
            self.window_bar.high_price = max(
                self.window_bar.high_price, bar.high_price)
            self.window_bar.low_price = min(
                self.window_bar.low_price, bar.low_price)
        elif bar.datetime >= self.kx_stop:       # Check if window bar completed
            self.on_window_bar(self.window_bar)
            self.window_bar = None
            self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
            if (self.kx_start,self.kx_stop) == (None,None): 
                # 不在交易时段
                return
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=self.kx_start,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )       
        # Update close price/volume into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += int(bar.volume)
        self.window_bar.open_interest = bar.open_interest
创建一个可显示K线的策略FixedKxStrategy
以下代码为 [用户目录]\strategies\FixedKxStrategy.py的内容
from typing import Any,List,Dict,Tuple
import copy
from vnpy.app.cta_strategy import (
    CtaTemplate,
    BarGenerator,
    ArrayManager,
    StopOrder,
    Direction
)
from vnpy.trader.engine import MainEngine,EventEngine
from vnpy.app.cta_strategy.engine import CtaEngine
from vnpy.event.engine import Event
from vnpy.trader.object import (
    LogData,
    TickData,
    BarData,
    TradeData,
    OrderData,
)
from vnpy.app.cta_strategy import StopOrder
from vnpy.app.cta_strategy.base import EngineType
from vnpy.trader.constant import Interval
from vnpy.app.cta_strategy.base import (
    APP_NAME,
    EVENT_CTA_LOG,
    EVENT_CTA_TICK,
    EVENT_CTA_HISTORY_BAR,
    EVENT_CTA_BAR,
    EVENT_CTA_ORDER,
    EVENT_CTA_TRADE,    
    EVENT_CTA_STOPORDER,
    EVENT_CTA_STRATEGY,
)
from vnpy.usertools.kx_chart import (   # hxxjava add
    NewChartWidget,
    CandleItem,
    VolumeItem, 
    LineItem,
    SmaItem,
    RsiItem,
    MacdItem,
)
from vnpy.usertools.my_strategy_tool import FixedBarGenerator
class FixedKxStrategy(CtaTemplate):
    """"""
    author = "hxxjava"
    kx_interval = 1
    show_chart = False  # 显示K线图表 
    parameters = [
        "kx_interval",
        "show_chart"
    ]
    kx_count:int = 0
    cta_manager = None
    variables = ["kx_count"]
    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ):
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)
        symbol,exchange = self.vt_symbol.split('.')
        self.bg = FixedBarGenerator(self.on_bar,self.kx_interval,self.on_Nmin_bar,symbol=symbol.upper())
        self.am = ArrayManager()
        cta_engine:CtaEngine = self.cta_engine
        self.engine_type = cta_engine.engine_type
        self.even_engine = cta_engine.main_engine.event_engine
        # 必须在这里声明,因为它们是实例变量
        self.all_bars:List[BarData] = [] 
        self.cur_window_bar:[BarData] = None
        self.bar_updated = False
    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.load_bar(20)
        if len(self.all_bars)>0:
            self.send_event(EVENT_CTA_HISTORY_BAR,self.all_bars)
    def on_start(self):
        """ """
        self.write_log("已开始")
    def on_stop(self):
        """"""
        self.write_log("_kx_strategy 已停止")
    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bar_updated = False
        self.current_tick = tick    # 记录最新tick 
        # 再更新tick,产生1分钟K线乃至N 分钟线
        self.bg.update_tick(tick)
        if self.inited:     
            # 先产生当前临时K线
            self.cur_window_bar = self.get_cur_window_bar()  
            if self.cur_window_bar:
                # 发送当前临时K线更新消息
                self.send_event(EVENT_CTA_BAR,self.cur_window_bar)           
            self.send_event(EVENT_CTA_TICK,tick)  
    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        if self.inited:   
            self.write_log(f"I got a 1min BarData at {bar.datetime}")
        self.bg.update_bar(bar)
        self.bar_updated = True
    def on_Nmin_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.all_bars.append(bar)
        self.kx_count = len(self.all_bars)
        if self.inited:
            self.write_log(f"I got a {self.kx_interval}min BarData at {bar.datetime}")
            self.send_event(EVENT_CTA_BAR,bar)
        self.put_event()
    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.send_event(EVENT_CTA_TRADE,trade)
    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        self.send_event(EVENT_CTA_ORDER,order)
    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        self.send_event(EVENT_CTA_STOPORDER,stop_order)
    def get_cur_window_bar(self):
        window_bar = copy.deepcopy(self.bg.window_bar)
        bar = self.bg.bar
        if not(window_bar): # 刚产生过window_bar
            return None
        if self.bar_updated: # 刚更新过window_bar
            return window_bar
        # 上一分钟window_bar和当前bar合成出临时window bar
        window_bar.high_price = max(window_bar.high_price, bar.high_price)
        window_bar.low_price = min(window_bar.low_price, bar.low_price)
        # Update close price/volume into window bar
        window_bar.close_price = bar.close_price
        window_bar.volume += int(bar.volume)
        window_bar.open_interest = bar.open_interest
        return window_bar
    def send_event(self,event_type:str,data:Any):   
        """
        只在实盘引擎并且配置为显示K线图表的情况下发送小线
        """     
        if self.engine_type==EngineType.LIVE and self.show_chart:     # "如果显示K线图表"
            self.even_engine.put(Event(event_type,(self.strategy_name,data)))
    def init_kx_chart(self,kx_chart:NewChartWidget=None):    # hxxjava add ----- 提供给外部调用
        # self.write_log("init_kx_chart executed !!!")
        if kx_chart:
            kx_chart.add_plot("candle", hide_x_axis=True)
            kx_chart.add_plot("volume", maximum_height=150)
            kx_chart.add_plot("rsi", maximum_height=150)
            kx_chart.add_plot("macd", maximum_height=150)
            kx_chart.add_item(CandleItem, "candle", "candle")
            kx_chart.add_item(VolumeItem, "volume", "volume")
            kx_chart.add_item(LineItem, "line", "candle")
            kx_chart.add_item(SmaItem, "sma", "candle")
            kx_chart.add_item(RsiItem, "rsi", "rsi")
            kx_chart.add_item(MacdItem, "macd", "macd")
            kx_chart.add_last_price_line()
            kx_chart.add_cursor()
运行策略FixedKxStrategy
具体操作方法以及缺少的NewChartWidget等组件参见:
https://www.vnpy.com/forum/topic/3920-wei-kxian-tu-biao-tian-zhuan-jia-wa-rang-ctace-lue-de-yun-xing-kan-de-jian-1
设置方法
按下图的步骤创建一个可以显示K线图的用户策略

运行效果
以下为ag2012.SHFE的30分钟K线图,它显示的K线是等交易时长,位置也是固定的。
