VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 419
声望: 171

先厘清大思路,后面逐步完成。

1.搞量化交易,一个好用的K线生成器是最基本的要求!

vnpy系统自带了一个BarGenerator,它可以帮助我们生成1分钟,n分钟,n小时,日周期的K线,也叫bar。可是除了1分钟比较完美之外,有很多问题。它在读取历史数据、回测的时候多K线的处理和实盘却有不一样的效果。具体的问题我已经在解决vnpy 2.9.0版本的BarGenerator产生30分钟Bar的错误!这个帖子中做过尝试,但也不是很成功。因为系统的BarGenerator靠时间窗口与1分钟bar的时间分钟关系来决定是否该新建和结束一个bar,这个有问题。于是我改用对1分钟bar进行计数来决定是否该新建和结束一个bar,这也是有不可靠的问题,遇到行情比较清淡的时候,可能有的分钟就没有1分钟bar产生,这是完全有可能的!
K线几乎是绝大部分交易策略分析的基础,除非你从事的是极高频交易,否则你就得用它。可是如果你连生成一个稳健可靠的K线都不能够保证,那么运行在K线基础上的指标及由此产生的交易信号就无从谈起,K线错了,它们就是错误的,以此为依据做出点交易指令有可能是南辕北辙,所以必须解决它!

2.日内对齐等交易时长K线需要什么参数

2.1 日内对齐等交易时长K线是最常用的

K线不是交易所发布的,它有很多种产生机制。其对齐方式、表现形式多种多样。关于K线的分类本人在以往的帖子中做出过比较详细的说明,有兴趣的读者可以去我以往的帖子中查看,这里就不再赘述。
市面上的绝大部分软件如通达信、大智慧、文华财经等软件,除非用户特别设定,他们最常提供给用户的K线多是日内对齐等交易时长K线。常用是一定是有道理的,因为它们已经为广大用户和投资者所接受。

2.2 日内对齐等交易时长K线需要什么参数

1)什么是日内对齐等交易时长K线?
它具有这些特点:以每日开盘为起点,每根K线都包含相同交易时间的数据,跳过中间的休市时间,直至当前交易日的收盘,收盘不足n分钟也就是K线。实盘中,每日的第一个n分钟K线含集合竞价的那个tick数据。
2)为什么这种K线能够被普遍接受?
为它尽可能地保证一个交易日内的所有K线所表达的意义内容上是一致的,它们包含相等的交易时长。这非常重要,因为你把一个5分钟时长的K线与一个30分钟时长的K线放在一起谈论是没有意义的。但是如果为了保证K线在交易时长上的一致性,让n分钟K线跨日的话也是不太合理,因为这跨日,跨周末时间太长,这中间会发生什么意外事情,可能会产生出非常巨大的幅度大K线,掩盖了隔日跳空的行情变化,这对解读行情是不利的。当然n日的K线日跨日的,但是它是n个交易日的K线融合而成的,不过其融合的每个日K线也是对齐各自的日开盘的。
另外日内对齐等交易时长K线还有一个好处,那就是你以任何之前的时间为起点,在读取历史数据重新生成该日的n分钟K线的时候,得到的改日的K线是一致的。举个例子,如果我们的CTA策略在init()中常常是这么一句:

self.load_bar(20)  # 先加载20日历史1分钟数据

这么简单的一句,包含着很多你意识不到的变化——你今天运行策略和明天运行你的策略,其中的历史数据的范围发生了变化,也就是说加载数据的起点变了。如果我们合成的K线的对齐方式不采用日内对齐的话,而采用对齐加载时间起点的话,你今天、明天加载出来之前的某日的K线就可能完全是不同的。而采用日内对齐等交易时长的K线则不存在这个问题。

3)需要知道合约的交易时间段
既然要对齐每日开盘,还有跳过各个休市时间,还要知道收市时间,那么我们就知道生成这种K线必须知道其所表达合约或对象的交易时间段,交易时间段中包含了这些信息,不知道这些信息,BarGenerator就不知道如何生成这种bar。这是必须的!

3. 如何获取合约的交易时间段

3.1 vnpy系统和CTP接口找不到交易时间段信息

目前vnpy系统中的是没有合约的交易时间段的。到哪里获取合约的交易时间段的呢?
1) 它与合约相关,应该到保存合约的数据类ContractData中去找,没有找到。
2) 是否可以提供接口,从交易所获得,这个也是比较基础的数据。于是到CTP接口中(我使用的是CTP接口,您也许不一样) ,在最新版本的CTP接口文档中也没有找到任何与交易时间段相关的信息,绝望!

3.2 有两种方法可以得到交易时间段信息

  1. 米筐数据接口中有,其中有个得到市场中所有合约的函数all_instruments(),它的返回值中就包含交易时间段信息trading_hours,还有另外一些如get_trading_hours()也可以直接获得这些交易时间段信息,好!
  2. 实在没有的话,咱们手工创建一个文件,按照一定格式,把我们需要创建K线的合约准备好交易时间段信息,这也是可行的。

3.3 直接从米筐数据接口获取交易时间段信息的问题

  1. 你必须购买过米筐数,否则无从获得
  2. 直接使用从米筐数据接口获取交易时间段信息,会有效率问题。本人试过,运行get_trading_hours()常会用卡顿,其目前是在65000多个合约中为你寻找一个合约的交易时间段信息,在K线合成这种使用非常频繁的地方,效率是必须的!况且米筐对每个用户的流量也是有限制的,如果超过了也是会限流的!
  3. 对于有些米筐没有的品种难道我们就不能使用K线了吗?

解决方法:

  1. 基于以上这些原因,采用解耦具体数据提供商的方法会更好!把这些信息保存到一个文件或者数据库中,只要您能够办法获得这些信息,按照规定的格式存储,哪怕是手工输入也是可以的。
  2. 新的K线生成器只需要对规定格式的交易时间段信息进行处理,按照一定的规则就可以进行K线生成了!

3.4 从米筐获取交易时间段信息

3.4.1 扩展DataFeed

打开vnpy.trader.datafeed.py文件为Datafeed的基类BaseDatafeed扩展下面的接口

class BaseDatafeed(ABC):
    """
    Abstract datafeed class for connecting to different datafeed.
    """

    def init(self) -> bool:
        """
        Initialize datafeed service connection.
        """
        pass

    def update_all_trading_hours(self) -> bool:     # hxxjava add 
        """ 更新所有合约的交易时间段到trading_hours.json文件中 """
        pass

    def load_all_trading_hours(self) -> dict:       # hxxjava add 
        """ 从trading_hours.json文件中读取所有合约的交易时间段 """
        pass

    def query_bar_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
        """
        Query history bar data.
        """
        pass

    def query_tick_history(self, req: HistoryRequest) -> Optional[List[TickData]]:
        """
        Query history tick data.
        """
        pass

其中的trading_hours.json文件我会在后面的文章中做详细的介绍。有了它我们才能展开其他的设计。

3.4.2 扩展RqdataDataFeed

在vnpy_rqdata\rqdata_datafeed.py中增加下面的代码

  • 引用部分增加:
from datetime import timedelta,date # hxxjava add
  • 在class RqdataDatafeed(BaseDatafeed)中增加下面的代码 :

    def update_all_trading_hours(self) -> bool:     # hxxjava add 
        """ 更新所有合约的交易时间段到trading_hours.json文件中 """

        if not self.inited:
            self.init()

        if not self.inited:
            return False

        ths_dict = load_json(self.trading_hours_file)

        # instruments = all_instruments(type=['Future','Stock','Index','Spot'])

        trade_hours = {}

        for stype in ['Future','Stock','Index','Fund','Spot']:
            instruments = all_instruments(type=[stype])
            # print(f"{stype} instruments count={len(instruments)}")

            for idx,inst in instruments.iterrows():
                # 获取每个最新发布的合约的建议时间段
                if ('trading_hours' not in inst) or not(isinstance(inst.trading_hours,str)):
                    # 跳过没有交易时间段或者交易时间段无效的合约
                    continue

                inst_name = inst.trading_code if stype == 'Future' else inst.order_book_id 
                inst_name = inst_name.upper() 
                if inst_name.find('.') < 0:
                    inst_name += '.' + inst.exchange

                if inst_name not in ths_dict:
                    str_trading_hours = inst.trading_hours

                    # 把'01-'或'31-'者替换成'00-'或'30-'
                    suffix_pair = [('1-','0-'),('6-','5-')]
                    for s1,s2 in suffix_pair:
                        str_trading_hours = str_trading_hours.replace(s1,s2)

                    # 如果原来没有,提取出来
                    trade_hours[inst_name] = {"name": inst.symbol,"trading_hours": str_trading_hours}

        # print(f"trade_hours old count {len(ths_dict)},append count={len(trade_hours)}")
        if trade_hours:
            ths_dict.update(trade_hours)
            save_json(self.trading_hours_file,ths_dict)

        return True

    def load_all_trading_hours(self) -> dict:       # hxxjava add 
        """ 从trading_hours.json文件中读取所有合约的交易时间段 """
        json_file = get_file_path(self.trading_hours_file)
        if not json_file.exists():
            return {}
        else:
            return load_json(self.trading_hours_file)

3.4.3 为main_engine安装一个可以获取交易时间段信息的接口

在vnpy\trader\engine.py中:

  • 该文件的引用部分:
from .datafeed import get_datafeed                  # hxxjava add
  • 为MainEngine类增加下面函数

    def get_trading_hours(self,vt_symbol:str) -> str:   # hxxjava add
        """ get vt_symbol's trading hours """
        ths = self.all_trading_hours.get(vt_symbol.upper(),"")       
        return ths["trading_hours"] if ths else ""

为什么在MainEngine类增加可以获取交易时间段信息的接口?

因为无论你运行vnpy中的哪个app,你都会启动main_engine,无需绕弯子就可以得到这些信息,而我们的用户策略中都包含各自策略的引擎,这样就方便获取交易时间段信息。

如CTA策略中包含cta_engine,而cta_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取您交易品种的交易时间段信息:

       trading_hours = self.cta_engine.main_engine.get_trading_hours(selt.vt_symbol)

如PortFolioStrategy策略中包含strategy_engine,而strategy_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取多个交易品种的交易时间段信息:

       trading_hours_list = [self.cta_engine.main_engine.get_trading_hours(vt_symbol) for vt_symbol in self.vt_symbols]

是不是很方便呢?

3.4.4 在系统的投研中执行更新所有品种(含期货、股票、指数和基金)的交易时间段

vnpy 3.0的启动界面中已经集成了一个叫“投研”的功能,其实它是jupyter lab,启动之后输入下面的代码:

# 测试update_all_trading_hours()函数和load_all_trading_hours()
from vnpy.trader.datafeed import get_datafeed

df = get_datafeed()
df.init()

df.update_all_trading_hours()   # 更新所有合约的交易时间段到本地文件中

ths = df.load_all_trading_hours() # 从本地文件中读取所有合约的交易时间段

当然您可以在vnpy的trader中主界面的菜单中增加一项,方便您在需要的时候执行下面语句。不过这个更新交易时间段的功能并不需要频繁执行,手动也就够了,记得就好。

3.4.5 你还可以手工打开trading_hours.json,直接输入

经过上面步骤3.4.4,您就在本地得到了一个trading_hours.json文件,该文件在您的用户目录下的.vntrader\中,其内容如下:

{
    "A0303.DCE": {
        "name": "豆一0303",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0305.DCE": {
        "name": "豆一0305",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0307.DCE": {
        "name": "豆一0307",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0309.DCE": {
        "name": "豆一0309",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0311.DCE": {
        "name": "豆一0311",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0401.DCE": {
        "name": "豆一0401",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
   ... ...
}

观察其格式,在你没有米筐数据接口或者这里没有的合约,您也可以手动输入合约交易时间段信息。

按照程序中算法,这个文件文件中一共包含约16500多个合约的交易时间段信息。可以覆盖国内金融市场几乎全部都产品,但是不包括金融二次衍生品期权。
为什么没有期权交易时间段信息,因为不需要。期权合约有其对应的标的物,从其名称和编号就可以解析出来。期权合约的交易时间段其和标的物的交易时间段是完全相同的,因此不需要保存到该文件中。

Member
avatar
加入于:
帖子: 419
声望: 171

文章太长,再分一贴吧。

4. 交易时间段的处理

4.1 交易时间段处理的复杂性

一个合约的交易时间段信息,就包含在一个字符串中。通常看起来是这样的:

"21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"

它看似简单,实则非常复杂!简单在于它只是一个字符串,其实它能够表达非常复杂的交易时间规定。例如交易时间可以少到只有1段,也可以4到5个段,可跨日,也可以跨多日,如遇到周末或者长假。但是长假太难处理了,我们这也不处理各种各样的假日规定,因为那个太复杂了!不过好在时下很多软件,著名的和非著名的软件,几乎都不处理跨长假的问题,不处理的原因也是和我分析的一样,不过这也没有影响他们多软件被广大用户接受的程度。所以我们也不处理跨长假的问题。
当然想处理跨长假也不成,条件不具备呀。因为毕竟我们不是交易所,不知道各种各样的休假规定,不同市场,不同国家的节假日,千奇百怪,太难处理了。而且我们也不能说不处理哪个市场或者国家的投资品种吧?绝大部分软件都不处理长假对K线对齐方式的影响,原因就在于此,没有什么别的说辞!

4.2 交易时间段处理具有的功能

  • 从交易时间段字符串中提取出各段的起止时间(天内的秒数) 列表
  • 给定一个时间,可以得到其交易日及日期时间格式的交易时间段列表,无效交易时间返回空
  • 给定一个时间,得到其在日内的交易时间、所在窗口的索引、窗口开始时间和截止时间

4.3 交易时间段处理的实现

在vnpy\usertools下创建一个名称为trading_hours.py,其代码如下:

"""
本文件主要实现合约的交易时间段:TradingHours
作者:hxxjava
日期:2022-03-28

修改:2022-06-09  修改内容:TradingHours的get_intraday_window()处理时间段错误
"""

from calendar import month
from typing import Callable,List,Dict, Tuple, Union
from enum import Enum

from datetime import datetime,date,timedelta, tzinfo 
import numpy as np

import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")

from vnpy.trader.constant import Interval


def to_china_tz(dt: datetime) -> datetime:
    """
    Convert a datetime object to a CHINA_TZ localize datetime object
    """
    return CHINA_TZ.localize(dt.replace(tzinfo=None))

INTERVAL_MAP = {
    Interval.MINUTE:60,
    Interval.HOUR:3600,
    Interval.DAILY:3600*24,
    Interval.WEEKLY:3600*24*7,
}

def get_time_segments(trading_hours:str) -> List:
    """ 
    从交易时间段字符串中提取出各段的起止时间(天内的秒数) 列表

    """
    time_sepments = []

    # 提取各段
    str_segments = trading_hours.split(',')

    pre_start,day_offset = None,0

    for s in reversed(str_segments):  # 反向遍历各段
        # 提取段的起止时间

        start,stop = s.split('-')
        # 计算开始时间天内秒
        hh,mm = start.split(':')
        start_s = int(hh)*3600+int(mm)*60
        # 计算截止时间天内秒
        hh,mm = stop.split(':')
        stop_s = int(hh)*3600+int(mm)*60

        if pre_start and start > pre_start:
            day_offset -= 1
        pre_start = start

        # 加入列表
        time_sepments.insert(0,(day_offset,start_s,stop_s))

    return time_sepments

def in_segments(trade_segments:List,trade_dt:datetime):
    """ 判断一个时间是否在一个交易时间段列表中 """
    trade_dt = to_china_tz(trade_dt)
    for start,stop in trade_segments:
        if start <= trade_dt < stop:
            return True

    return False

class TradingHours(object):
    """ 
    交易时间段处理
    """
    def __init__(self,trading_hours:str):
        """ 
        初始化函数 。 
        参数说明:
            trading_hours:交易时间段字符串,例如:"21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
            pre_open: 集合竞价时段长度,单位分钟。例如:国内期货pre_open=5
            after_close: 交易日收盘后结算时长。例如国内期货持续到15:20,那么after_close=20
        """

        self.time_segments:List[Tuple[int,int,int]] = get_time_segments(trading_hours)   

    def day_trade_time(self,interval:Interval) -> int:
        """ 
        一个交易日的交易时长,单位由interval 规定,不足的部分+1
        """
        seconds = 0.0
        for _,start,stop in self.time_segments:
            seconds += stop - start + (0 if start < stop else INTERVAL_MAP[Interval.DAILY])

        if not interval:
            return seconds
        else:
            return np.ceil(seconds/INTERVAL_MAP[interval])

    def get_auction_closes_segments(self,trade_dt:datetime) -> Tuple[date,List]:
        """ 
        得到一个交易时间所在的交易日及集合竞价时间段和所有休市时段的列表 
        """
        if not self.auction_closes:
            return (None,[])

        trade_dt = to_china_tz(trade_dt)

        dates = [trade_dt.date()+timedelta(days=i) for i in range(-3,4)]

        # 根据 self.auction_closes 构造出一周内的日期时间格式的非连续交易时间段字典
        week_seqments = {
            dt:
            [(to_china_tz(datetime(dt.year,dt.month,dt.day))+timedelta(days=days-(2 if days == -1 and dt.weekday()==0 else 0),seconds=start),
            to_china_tz(datetime(dt.year,dt.month,dt.day))+timedelta(days=days-(2 if days == -1 and dt.weekday()==0 else 0)+(1 if start>stop else 0),seconds=stop)) 
                for days,start,stop in self.auction_closes] 
            for dt in dates if dt.weekday() not in [5,6]   
        }

        # 在非交易时间段字典中查找trade_dt所在集合竞价时间段,确定所属交易日
        for dt,datetime_segments in week_seqments.items():
            # 遍历一周中的每日
            if in_segments(datetime_segments,trade_dt):
                return (dt,datetime_segments)

        return (None,[])

    def get_trade_hours(self,trade_dt:datetime) -> Tuple[date,List[Tuple[datetime,datetime]]]:
        """ 
        得到一个时间的交易日及日期时间格式的交易时间段列表,无效交易时间返回空 
        """
        # 构造trade_dt加前后三天共7的日期
        trade_dt = to_china_tz(trade_dt)
        dates = [trade_dt.date()+timedelta(days=i) for i in range(-3,4)]

        # 根据 self.time_segments 构造出一周内的日期时间格式的交易时间段字典
        week_seqments = {
            dt:
            [(to_china_tz(datetime(dt.year,dt.month,dt.day))+timedelta(days=days-(2 if days == -1 and dt.weekday()==0 else 0),seconds=start),
            to_china_tz(datetime(dt.year,dt.month,dt.day))+timedelta(days=days-(2 if days == -1 and dt.weekday()==0 else 0)+(1 if start>stop else 0),seconds=stop)) 
                for days,start,stop in self.time_segments] 
            for dt in dates if dt.weekday() not in [5,6]   
        }

        trade_day,trading_segments = None,[]
        # 在交易时间段字典中查找trade_dt所在交易时间段,确定所属交易日
        for dt,datetime_segments in week_seqments.items():
            # 遍历一周中的每日
            for start,stop in datetime_segments:
                # 遍历一日中的每个交易时间段
                if start <= trade_dt < stop:
                    # 找到了,确定dt为trade_dt的交易日
                    trade_day = dt
                    break

            if trade_day:
                # 已经找到,停止
                trading_segments = datetime_segments
                break

        return (trade_day,trading_segments)

    def get_trading_segments(self,tradeday:date): # List[Tuple[datetime,datetime]]
        """
        得到某个交易日的就要时间段。注:只考虑周末,不考虑法定假
        """
        segments = []
        weekday = tradeday.weekday()
        if weekday not in [5,6]:
            # 周一至周五
            # 周一跨日需插入2天
            insert_days = -2 if tradeday.weekday() == 0 else 0 

            y,m,d = tradeday.year,tradeday.month,tradeday.day

            for day,start,stop in self.time_segments:
                days = insert_days + day if day < 0 else day
                start_dt = datetime(y,m,d,0,0,0) + timedelta(days=days,seconds=start)
                stop_dt = datetime(y,m,d,0,0,0) + timedelta(days=days+(0 if start < stop else 1),seconds=stop)
                segments.append((start_dt,stop_dt))

        return segments

    def get_intraday_window(self,trade_dt:datetime,window:int) -> Tuple[date,List[Tuple[datetime,datetime]]]:
        """ 
        得到一个时间的日内交易时间、窗口索引、窗口开始时间和截止时间 
        """
        trade_dt = to_china_tz(trade_dt)
        interval = Interval.MINUTE
        oneday_minutes = self.day_trade_time(interval)
        if window > oneday_minutes:
            raise f"In day window can't exceed {oneday_minutes} minutes !"

        result = (None,[])
        if window == 0:
            # window==0 无意义
            return result

        # 求dt的交易日
        trade_day,segment_datetimes = self.get_trade_hours(trade_dt)
        if not trade_day:
            # 无效的交易日
            return result

        if np.sum([start <= trade_dt < stop for start,stop in segment_datetimes]) == 0:
            # 如果dt不在各个交易时间段内为无效的交易时间
            return result

        # 交易日的开盘时间
        t0 = segment_datetimes[0][0]

        # 构造各个交易时间段的起止数组
        starts = np.array([(seg_dt[0]-t0).seconds*1.0 for seg_dt in segment_datetimes])
        stops = np.array([(seg_dt[1]-t0).seconds*1.0 for seg_dt in segment_datetimes])

        # 求dt在交易日中的自然时间
        nature_t = (trade_dt - t0).seconds

        # 求dt已经走过的交易时间
        traded_t = np.sum(nature_t - starts[starts<=nature_t]) - np.sum(nature_t-stops[stops<nature_t])
        if traded_t < 0:
            # 开盘之前的为无效交易时间
            return result

        # 求当前所在窗口的宽度、索引、开始交易时间及截止时间
        window_width = window * INTERVAL_MAP[interval]
        window_idx = np.floor(traded_t/window_width)
        window_start = window_idx * window_width
        window_stop = window_start + window_width

        # 求各个交易时间段的宽度
        segment_widths = stops - starts
        # print("!!!3",window_start,window_stop,segment_widths)

        # 求各个交易时间段累计日内交易时间
        sums = [np.sum(segment_widths[:(i+1)]) for i in range(len(segment_widths))]
        if window_stop > sums[-1]:
            # 不可以跨日处理
            window_stop = sums[-1]

        # 累计日内交易时间数组
        seg_sum = np.array(sums)    
        # 每段开始累计日内交易时间数组
        seg_start_sum = np.array([0] + sums)   

        # 求窗口开始和截止时间的时间段索引
        s1,s2 = seg_sum - window_start,seg_sum - window_stop
        start_idx,stop_idx = np.sum(s1 <= 0),np.sum(s2<0)

        # 求窗口开始和截止时间的在其时间段中的偏移量
        start_offset = (window_start-seg_start_sum)[start_idx]
        stop_offset = (window_stop-seg_start_sum)[stop_idx]

        # 求窗口包含的时间片段列表
        window_segments = []
        for idx in range(start_idx,stop_idx+1):
            start,stop = segment_datetimes[idx]
            t1 = start + timedelta(seconds=start_offset) if idx == start_idx else start
            t2 = start + timedelta(seconds=stop_offset) if idx == stop_idx else stop
            window_segments.append((t1,t2))

        # 窗口所属交易日及包含的时间片段列表
        result = (trade_day,window_segments)
        return result

    def get_week_tradedays(self,trade_dt:datetime) -> List[date]:
        """ 得到一个交易时间所在周的交易日 """
        trade_dt = to_china_tz(trade_dt)
        trade_day,trade_segments = self.get_trade_hours(trade_dt)
        if not trade_day:
            return []

        monday = trade_dt.date() - timedelta(days=trade_dt.weekday())
        week_dates = [monday + timedelta(days=i) for i in range(5)]

        if trade_day not in week_dates:
            next_7days = [(trade_dt + timedelta(days=i+1)) for i in range(7)]
            week_dates = [day.date() for day in next_7days if day.weekday() not in [5,6]]

        return week_dates

    def get_month_tradedays(self,trade_dt:datetime) -> List[date]:
        """ 得到一个交易时间所在月的交易日 """
        trade_dt = to_china_tz(trade_dt)
        trade_day,trade_segments = self.get_trade_hours(trade_dt)
        if not trade_day:
            return []

        first_day = date(year=trade_day.year,month=trade_day.month,day=1)
        this_month = trade_day.month
        days32 = [first_day + timedelta(days = i) for i in range(32)]

        month_dates = [day for day in days32 if day.weekday() not in [5,6] and day.month==this_month]
        return month_dates

    def get_year_tradedays(self,trade_dt:datetime) -> List[date]:
        """ 得到一个交易时间所在年的交易日 """
        trade_dt = to_china_tz(trade_dt)
        trade_day,trade_segments = self.get_trade_hours(trade_dt)
        if not trade_day:
            return []

        new_years_day = date(year=trade_day.year,month=1,day=1)
        this_year = trade_day.year
        days366 = [new_years_day + timedelta(days = i) for i in range(366)]
        trade_dates = [day for day in days366 if day.weekday() not in [5,6] and day.year==this_year]

        return trade_dates

    def has_night_tradetime(self) -> bool:
        """ 有夜盘交易时间吗? """
        for (days,start,stop) in self.time_segments:
            if start >= 18*INTERVAL_MAP(Interval.HOUR):
                return True
        return False

    def has_day_tradetime(self) -> bool:
        """ 有日盘交易时间吗 ? """
        for (days,start,stop) in self.time_segments:
            if start < 18*INTERVAL_MAP(Interval.HOUR):
                return True
        return False

5. 日内对齐等交易时长K线生成器的实现

5.1 确定K线生成器MyBarGenerator的生成规则

5.1.1 一步到位地解决问题

先给它取个名称,就叫MyBarGenerator吧,它是对BarGenerator的扩展。
不过在构思MyBarGenerator的时候,我发现它其实不应该叫“日内对齐等交易时长K线生成器”。因为我们不应该只局限于日内的n分钟K线生成器,难道vnpy系统就不应该、不能够或者不使用日线以上的K线了吗?我们只能够使用日内K线进行量化交易吗?难道大家都没有过这方面的需求吗?我想答案是否定的。
那好,所幸就设计一个全功能的K线生成器:MyBarGenerator。
为此我们需要扩展Interval的定义,因为Interval是表示K线周期的常量,可是它的格局不够,最大只能到周一级WEEKLY。也就是说您用目前的Interval是没有办法表达月和年这样的周期的。

class Interval(Enum):
    """
    Interval of bar data.
    """
    MINUTE = "1m"
    HOUR = "1h"
    DAILY = "d"
    WEEKLY = "w"
    TICK = "tick"
    MONTHLY = "month"   # hxxjava add
    YEARLY = "year"     # hxxjava add

顺便在这里吐槽一下BarGenerator:

  • 目前的BarData中包含了一个interval字段的,可是它在BarGenerator的时候根本就没有使用过,而使用它本是信手拈来的事情,但是没有却没有使用。如果不信,你可以去看看用它产生出来的bar的内容。
  • 另外本来还应该增加一个秒单位(SECONDLY = "1s")的,这个单位其实对高频交易也是很有需求的,可是现在却没有。不知道大家对此有什么看法。

5.1.2 按周期对K线分类

在系统且并详细分析之后,把K线分类为:日内K线、日K线,周K线、月K线及年K线等周期K线五类。
1)日内K线包括1~n分钟K线,如1分钟、n分钟两类,其中n小于正常交易日的最大交易分钟数。日内K线取消对小时周期单位支持,因为可以通过n分钟的方式来实现。如:

  • 1小时K线可以通过60分钟来表达
  • 2小时K线可以通过120分钟来表达
  • 4小时K线可以通过240分钟来表达
    这么做的好处是:非常容易地实现90分钟的日内K线,而这是系统自带BarGenerator无法做到的。

2)日K线:每个交易日产生一个,它包含一到多个交易时间段。根据是否包含夜盘交易时间段,又可以分为跨日K线和不跨日K线。
3)周K线:由周一至周五中所有交易日的交易数据合成得到,它其实是一种特殊的n日K线,只是n<=5而已。
4)月K线:由每月1日至月末最后一个交易日的交易数据合成得到,除去所有周末,它最多包含23个交易日,遇到本月有长假日,其所包含的交易日会更少。
5)年K线:由每年1月1日至12月31日中的所有交易日的交易数据合成得到,除去所有周末。它可以理解为由一年中的所有交易日数据合成的,也可以理解为由一年中的12个月的交易日数据合成的。

5.1.3 确定K线生成规则:

1)日内K线(包括1~n分钟K线)生成规则:

  • K线对齐交易日的开盘
  • 等交易时长生成
  • 忽略中间休市时间
  • 不跨日生成,遇收市强行截止
  • 周期单位必须为分钟,n小于日交易最大分钟数

2)日K线生成规则:

  • 对齐其交易日的开盘时间
  • 休市时间收到的数据为非法数据
  • 交易日收盘时间生成或者在收到下收到大于收盘时间交易数据时生成

3)周K线生成规则:

  • 对齐周一开盘时间
  • 收到周一或者第一个交易日的日K线时创建
  • 收到周二到四等交易日日K线时继续合成
  • 收到周五或者下周交易日日K线时生成

4)月K线生成规则:

  • 对齐当月1日的开盘时间,去除所有周末构成本月可能的交易日期
  • 收到当月第一个交易日的日K线时创建
  • 月K线创建后在未收到本月可能的交易日期的日K线时继续合成
  • 收到月可能的交易日期的最后一个交易日K线或者下个月的第一交易日日K线时生成

5)年K线生成规则:
年K线可以由两种方式进行合成:一种是用日K线合成,另一种是用月K线合成。我们这里选择用日K线来合成。

  • 年K线对齐每年的1月1日,从1月1日至12月31日,去除所有周末,构成所有的可能的交易日
  • 遇到当年的第一个日K线时创建年K线
  • 年K线创建后,在收到日K线的交易日期未到最后一个可能的交易日时继续合成
  • 收到日K线的交易日为本年可能的交易日期或者下一年的交易日时生成

5.2 MyBargenerator的实现

在vnpy\usertools\utility.py中加入如下面的两个部分:

5.2.1 加入引用部分:

from copy import deepcopy
from typing import List,Dict,Tuple,Optional,Sequence,Callable
from datetime import date,datetime,timedelta

from vnpy.trader.constant import Interval
from vnpy.trader.object import TickData,BarData
from vnpy.trader.utility import extract_vt_symbol
from vnpy.usertools.trading_hours import TradingHours,in_segments

from vnpy.usertools.trade_hours import CHINA_TZ

def generate_temp_bar(small:BarData,big:BarData,interval:Interval):
    """ get temp intra day small_bar """
    small_bar:BarData = deepcopy(small)    # 1 minute small_bar
    big_bar:BarData = deepcopy(big)

    if big_bar and small_bar:
        big_bar.high_price = max(big_bar.high_price,small_bar.high_price)
        big_bar.low_price = min(big_bar.low_price,small_bar.low_price)
        big_bar.close_price = small_bar.close_price
        big_bar.open_interest = small_bar.open_interest
        big_bar.volume += small_bar.volume
        big_bar.turnover += small_bar.turnover

    elif not big_bar and small_bar:
        big_bar = BarData(
            symbol=small_bar.symbol,
            exchange=small_bar.exchange,
            interval=interval,
            datetime=small_bar.datetime,
            gateway_name=small_bar.gateway_name,
            open_price=small_bar.open_price,
            high_price=small_bar.high_price,
            low_price=small_bar.low_price,
            close_price = small_bar.close_price,
            open_interest = small_bar.open_interest,
            volume = small_bar.volume,
            turnover = small_bar.turnover
        )

    return big_bar

5.2.2 MyBarGenerator的完整代码

class MyBarGenerator():
    """ 
    An align bar generator.
    Comment's for parameters:
        on_bar : callback function on 1 minute bar is generated.
        window : window bar's width.
        on_window_bar : callback function on x interval bar is generated.
        interval : window bar's unit.
        trading_hours: trading hours with which the window bar can be generated. 
    """

    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE,
        trading_hours:str = ""      
    ):
        """ Constructor """
        self.bar: BarData = None
        self.on_bar: Callable = on_bar

        self.interval: Interval = interval
        self.interval_count: int = 0

        self.intra_day_bar: BarData = None
        self.day_bar: BarData = None
        self.week_bar: BarData = None
        self.month_bar: BarData = None
        self.year_bar: BarData = None

        self.day_bar_cnt:int = 0        # 日K线的1分钟K线计数
        self.week_daybar_cnt:int = 0    # 周K线的日K线计数

        self.window: int = window
        self.on_window_bar: Callable = on_window_bar

        self.last_tick: TickData = None

        if interval not in [Interval.MINUTE,Interval.DAILY,Interval.WEEKLY,Interval.MONTHLY,Interval.YEARLY]:
            raise ValueError(f"MyBarGenerator support MINUTE,DAILY,WEEKLY,MONTHLY and YEARLY bar generation only , please check it !")

        if not trading_hours:
            raise ValueError(f"MyBarGenerator need trading hours setting , please check it !")

        # trading hours object
        self.trading_hours = TradingHours(trading_hours)

        self.day_total_minutes = int(self.trading_hours.day_trade_time(Interval.MINUTE))

        self.tick_windows = (None,[])

        # current intraday window bar's contains trading day and time segments list
        self.intraday_bar_window = (None,[]) # (trade_day,[])
        # current daily bar's window containts trading day and time segment list 
        self.daily_bar_window = (None,[])
        # current weekly bar's window containts all trade days 
        self.weekly_bar_window = []
        # current monthly bar's window containts all trade days 
        self.monthly_bar_window = []
        # current yearly bar's window containts all trade days
        self.yearly_bar_window = []

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        # Filter tick data with older timestamp
        if self.last_tick and tick.datetime < self.last_tick.datetime:
            print(f"特别tick【{tick}】!")
            return

        if self.tick_windows == (None,[]) or not in_segments(self.tick_windows[1],tick.datetime):
            # 判断tick是否在连续交易时间段或者集合竞价时间段中
            self.tick_windows = self.trading_hours.get_trade_hours(tick.datetime)
            if self.tick_windows == (None,[]):
                # 不在连续交易时间段
                print(f"特别tick【{tick}】")
                return

        if not self.bar:
            new_minute = True
        elif (
            (self.bar.datetime.minute != tick.datetime.minute)
            or (self.bar.datetime.hour != tick.datetime.hour)
        ):
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )
            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=to_china_tz(tick.datetime),
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            if tick.high_price > self.last_tick.high_price:
                self.bar.high_price = max(self.bar.high_price, tick.high_price)

            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            if tick.low_price < self.last_tick.low_price:
                self.bar.low_price = min(self.bar.low_price, tick.low_price)

            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = to_china_tz(tick.datetime)

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

            turnover_change = tick.turnover - self.last_tick.turnover
            self.bar.turnover += max(turnover_change, 0)

        self.last_tick = tick

    def update_bar(self, bar: BarData) -> None:
        """
        Update 1 minute bar into generator
        """
        if self.interval == Interval.MINUTE and self.window > 0:
            # update inday bar
            self.update_intraday_bar(bar)

        elif self.interval in [Interval.DAILY,Interval.WEEKLY,Interval.MONTHLY,Interval.YEARLY]:
            # update daily,weekly,monthly or yearly bar
            self.update_daily_bar(bar)

    def update_intraday_bar(self, bar: BarData) -> None:
        """ update intra day x window bar """
        if bar:
            bar.datetime = to_china_tz(bar.datetime)

        if self.interval != Interval.MINUTE or self.window <= 1:
            return 

        if self.intraday_bar_window == (None,[]):
            # 首次调用日内K线更新函数
            trade_day,time_segments = self.trading_hours.get_intraday_window(bar.datetime,self.window)
            if (trade_day,time_segments) == (None,[]):
                # 无效的1分钟K线
                return 

            # 更新当前日内K线交易时间
            self.intraday_bar_window = (trade_day,time_segments)
            # 创建新的日内K线
            self.intra_day_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.MINUTE,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )         

        elif not in_segments(self.intraday_bar_window[1],bar.datetime):
            # 1分钟K线不属于当前日内K线
            str1 = f"bar.datetime={bar.datetime}\nintraday_bar_window:{self.intraday_bar_window}"
            trade_day,time_segments = self.trading_hours.get_intraday_window(bar.datetime,self.window)
            if (trade_day,time_segments) == (None,[]):
                # 无效的1分钟K线
                return 

            # 当前日内K线已经生成,推送当前日内K线
            if self.on_window_bar:
                self.on_window_bar(self.intra_day_bar)

            # 更新当前日内K线交易时间
            self.intraday_bar_window = (trade_day,time_segments)
            str1 += f"\nintraday_bar_window:{self.intraday_bar_window}"
            print(str1)
            # 创建新的日内K线
            self.intra_day_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.MINUTE,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )      

        # 1分钟K线属于当前日内K线
        # 更新当前日内K线
        self.intra_day_bar.high_price = max(self.intra_day_bar.high_price,bar.high_price)
        self.intra_day_bar.low_price = min(self.intra_day_bar.low_price,bar.low_price)
        self.intra_day_bar.close_price = bar.close_price
        self.intra_day_bar.open_interest = bar.open_interest
        self.intra_day_bar.volume += bar.volume
        self.intra_day_bar.turnover += bar.turnover

        # 判断当前日内K线是否结束
        close_time = self.intraday_bar_window[1][-1][1]
        next_minute_dt = bar.datetime + timedelta(minutes=1)
        if close_time <= next_minute_dt:
            # 当前日K内线已经结束

            # 当前日内K线已经生成,推送之
            if self.on_window_bar:
                print(f"close_time={close_time},next_minute_dt={next_minute_dt}")
                self.on_window_bar(self.intra_day_bar)

            self.intraday_bar_window = (None,[])
            self.intra_day_bar = None

    def update_daily_bar(self, bar: BarData) -> bool:
        """ update daily bar using 1 minute bar """
        result = False
        if bar:
            bar.datetime = to_china_tz(bar.datetime)

        if self.daily_bar_window == (None,[]):
            # 首次调用日K线更新函数
            trade_day,trade_segments = self.trading_hours.get_trade_hours(bar.datetime)
            if (trade_day,trade_segments) == (None,[]):
                # 无效的1分钟K线
                return result

            # 更新当前日K线交易时间
            self.daily_bar_window = (trade_day,trade_segments)
            # 创建新的日K线
            self.day_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.DAILY,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )          
            self.day_bar_cnt = 1          


        if not in_segments(self.daily_bar_window[1],bar.datetime):
            # 1分钟K线不属于当前日K线
            trade_day,trade_segments = self.trading_hours.get_trade_hours(bar.datetime)
            if (trade_day,trade_segments) == (None,[]):
                # 无效的1分钟K线
                return

            # 当前日K线已经生成
            if self.interval == Interval.DAILY:
                # 推送当前日K线
                if self.on_window_bar:
                    self.on_window_bar(self.day_bar)
                self.day_bar_cnt = 0

            else:
                # 更新更大周期K线
                if self.update_weekly_bar(self.day_bar):
                    self.week_daybar_cnt += 1
                self.update_monthly_bar(self.day_bar)
                self.update_yearly_bar(self.day_bar)

            # 更新当前日K线交易时间
            self.daily_bar_window = (trade_day,trade_segments)
            # 创建新的日K线
            self.day_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.DAILY,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )                  

        # 1分钟K线属于当前交易日
        # 更新当前日K线
        self.day_bar.high_price = max(self.day_bar.high_price,bar.high_price)
        self.day_bar.low_price = min(self.day_bar.low_price,bar.low_price)
        self.day_bar.close_price = bar.close_price
        self.day_bar.open_interest = bar.open_interest
        self.day_bar.volume += bar.volume
        self.day_bar.turnover += bar.turnover
        result = True

        # 判断当前日K线是否结束
        close_time = self.daily_bar_window[1][-1][1]
        next_minute_dt = bar.datetime + timedelta(minutes=1)
        if close_time <= next_minute_dt or self.day_total_minutes == self.day_bar_cnt:
            # 当前日K线已经结束
            # 当前日K线已经生成
            if self.interval == Interval.DAILY:
                # 推送当前日K线
                if self.on_window_bar:
                    self.on_window_bar(self.day_bar)

            else:
                # 更新更大周期K线
                if self.update_weekly_bar(self.day_bar):
                    self.week_daybar_cnt += 1
                self.update_monthly_bar(self.day_bar)
                self.update_yearly_bar(self.day_bar)

            self.daily_bar_window = (None,[])
            self.day_bar = None
            self.day_bar_cnt = 0

        return result

    def update_weekly_bar(self, bar: BarData) -> bool:        
        """ update weekly bar using a daily bar """
        result = False
        if bar:
            bar.datetime = to_china_tz(bar.datetime)

        if self.interval != Interval.WEEKLY:
            # 设定周期单位不是周,不处理
            return result

        if not self.weekly_bar_window:
            # 首次调用周K线更新函数
            week_tradedays = self.trading_hours.get_week_tradedays(bar.datetime)
            if not week_tradedays:
                # 无效的日K线
                return result

            # 更新当前周K线交易日列表
            self.weekly_bar_window = week_tradedays
            # 创建新的周K线
            self.week_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.WEEKLY,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )                    


        if bar.datetime not in self.weekly_bar_window:
            # 日线不属于当前周K线
            week_tradedays = self.trading_hours.get_week_tradedays(bar.datetime)
            if not week_tradedays:
                # 无效的日K线
                return result

            # 当前周K线已经生成,推送
            if self.on_window_bar:
                self.on_window_bar(self.week_bar)
            self.week_daybar_cnt = 0

            # 更新当前周K线交易日列表
            self.weekly_bar_window = week_tradedays
            # 创建新的周K线
            self.week_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.WEEKLY,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )                  


        # 更新当前周K线
        self.week_bar.high_price = max(self.week_bar.high_price,bar.high_price)
        self.week_bar.low_price = min(self.week_bar.low_price,bar.low_price)
        self.week_bar.close_price = bar.close_price
        self.week_bar.open_interest = bar.open_interest
        self.week_bar.volume += bar.volume
        self.week_bar.turnover += bar.turnover
        result = True

        # 判断当前周K线是否结束
        trade_day,_ = self.trading_hours.get_trade_hours(bar.datetime)
        if trade_day >= self.weekly_bar_window[-1] or self.week_daybar_cnt == 5:
            # 当前周K线已经结束,推送当前周K线
            if self.on_window_bar:
                self.on_window_bar(self.week_bar)
            self.week_daybar_cnt = 0

            # 复位当前周交易日列表及周K线
            self.weekly_bar_window = []
            self.week_bar = None

        return result

    def update_monthly_bar(self, bar: BarData) -> bool:
        """ update monthly bar using a daily bar """
        result = False
        if bar:
            bar.datetime = to_china_tz(bar.datetime)

        if self.interval != Interval.MONTHLY:
            # 设定周期单位不是月,不处理
            return result

        if not self.monthly_bar_window:
            # 首次调用月K线更新函数
            month_tradedays = self.trading_hours.get_month_tradedays(bar.datetime)
            if not month_tradedays:
                # 无效的日K线
                return result

            # 更新当前月K线交易日列表
            self.monthly_bar_window = month_tradedays
            # 创建新的月K线
            self.month_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.MONTHLY,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )                    


        if bar.datetime not in self.monthly_bar_window:
            # 日线不属于当前月K线
            month_tradedays = self.trading_hours.get_month_tradedays(bar.datetime)
            if not month_tradedays:
                # 无效的日K线
                return result

            # 当前月K线已经生成,推送
            if self.on_window_bar:
                self.on_window_bar(self.month_bar)

            # 更新当前月交易日列表
            self.monthly_bar_window = month_tradedays
            # 创建新的月K线
            self.month_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.MONTHLY,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )                  


        # 更新当前月K线
        self.month_bar.high_price = max(self.month_bar.high_price,bar.high_price)
        self.month_bar.low_price = min(self.month_bar.low_price,bar.low_price)
        self.month_bar.close_price = bar.close_price
        self.month_bar.open_interest = bar.open_interest
        self.month_bar.volume += bar.volume
        self.month_bar.turnover += bar.turnover
        result = True

        # 判断当前月K线是否结束
        trade_day,_ = self.trading_hours.get_trade_hours(bar.datetime)
        if trade_day >= self.monthly_bar_window[-1]:
            # 当前月K线已经结束,推送当前月K线
            if self.on_window_bar:
                self.on_window_bar(self.month_bar)

            # 复位当前月交易日列表及月K线
            self.monthly_bar_window = []
            self.month_bar = None

        return result

    def update_yearly_bar(self, bar: BarData) -> bool:
        """ update yearly bar using a daily bar """
        result = False
        if bar:
            bar.datetime = to_china_tz(bar.datetime)

        if self.interval != Interval.YEARLY:
            # 设定周期单位不是年,不处理
            return result

        if not self.yearly_bar_window:
            # 首次调用年K线更新函数
            year_tradedays = self.trading_hours.get_year_tradedays(bar.datetime)
            if not year_tradedays:
                # 无效的日K线
                return result

            # 更新当前年K线交易日列表
            self.yearly_bar_window = year_tradedays
            # 创建新的年K线
            self.year_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.YEARLY,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )                    

        if bar.datetime not in self.yearly_bar_window:
            # 日线不属于当前年K线
            year_tradedays = self.trading_hours.get_year_tradedays(bar.datetime)
            if not year_tradedays:
                # 无效的日K线
                return result

            # 当前年K线已经生成,推送
            if self.on_window_bar:
                self.on_window_bar(self.year_bar)

            # 更新当前年交易日列表
            self.yearly_bar_window = year_tradedays
            # 创建新的年K线
            self.year_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval=Interval.YEARLY,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )                  

        # 更新当前年K线
        self.year_bar.high_price = max(self.year_bar.high_price,bar.high_price)
        self.year_bar.low_price = min(self.year_bar.low_price,bar.low_price)
        self.year_bar.close_price = bar.close_price
        self.year_bar.open_interest = bar.open_interest
        self.year_bar.volume += bar.volume
        self.year_bar.turnover += bar.turnover
        result = True

        # 判断当前年K线是否结束
        trade_day,_ = self.trading_hours.get_trade_hours(bar.datetime)
        if trade_day >= self.yearly_bar_window[-1]:
            # 当前年K线已经结束,推送当前年K线
            if self.on_window_bar:
                self.on_window_bar(self.year_bar)

            # 复位当前年交易日列表及年K线
            self.yearly_bar_window = []
            self.year_bar = None

        return result

    def get_temp_bar(self) -> BarData:
        """ 返回临时1分钟K线 """
        bar = deepcopy(self.bar)
        if bar:
            bar.datetime = bar.datetime.replace(second=0,microsecond=0)
        return bar

    def get_temp_window_bar(self,bar:BarData = None) -> BarData:
        """ 
        返回临时窗口K线 
        """
        temp_bar:BarData = None
        if not bar:
            # 如果没有传入1分钟K线,取当前生成器的1分钟K线
            bar = self.bar

        if self.interval == Interval.MINUTE:
            if self.window == 0:
                temp_bar = deepcopy(self.bar)
            else:
                temp_bar = generate_temp_bar(bar,self.intra_day_bar,Interval.MINUTE)

        elif self.interval == Interval.DAILY:
            temp_bar = generate_temp_bar(bar,self.day_bar,Interval.DAILY)

        elif self.interval == Interval.WEEKLY:
            day_bar = generate_temp_bar(bar,self.day_bar,Interval.DAILY)
            temp_bar = generate_temp_bar(day_bar,self.week_bar,Interval.WEEKLY)

        elif self.interval == Interval.MONTHLY:
            day_bar = generate_temp_bar(bar,self.day_bar,Interval.DAILY)
            temp_bar = generate_temp_bar(day_bar,self.month_bar,Interval.MONTHLY)

        elif self.interval == Interval.YEARLY:
            day_bar = generate_temp_bar(bar,self.day_bar,Interval.DAILY)
            temp_bar = generate_temp_bar(day_bar,self.year_bar,Interval.YEARLY)

        return temp_bar


    def generate(self) -> Optional[BarData]:
        """
        Generate the bar data and call callback immediately.
        """
        bar = self.bar

        if self.bar:
            bar.datetime = bar.datetime.replace(second=0, microsecond=0)
            self.on_bar(bar)

        self.bar = None
        return bar

6. 对集合竞价tick和休市期间收到的tick的特别处理

交易时间段是交易所对一个合约连续交易时间的规定,它只规定了在哪些时间段内市场是可以连续交易的,也就是说投资者交易开仓、平仓和撤单的。
但是交易时间段不包括一个合约交易的所有交易时间的规定,例如集合竞价时间段、日内中间休市时间段和交易日收盘休市时间段这三类时间段的规定。

6.1 集合竞价时间段

集合竞价时间段在交易日的开盘时间之前。能够该时间段的参与的投资者可能有资格的限制,就是说可能不是市场的参与者都有资格能够在在集合竞价时段中进行交易的。
而且不同市场,不同合约的集合竞价时间段的长度是不一样的,不同的交易日也可能不同,例如:
1)国内市场

  • 国内期货,期权,有夜盘品种是20:55-21:00,遇有长假则位于日盘的第一个交易时段前5分钟;只有日盘品种是8:55-9:00。
  • 国内股票,9:25-9:30,因为A股全部是日盘,所以没有长假带来的问题集合竞价发生变化的问题。
  • 国内市场的集合竞价通常包括前4分钟为撮合成交,在开盘前1分钟推送一个包括集合竞价tick,它的last_price就是该交易日的开盘价。

2)国外市场

  • WTI原油期货的Pre-Open时间,其实就是集合竞价时段。它更加复制,周日的盘前议价期为开盘前1小时,其他交易日的盘前议价期为开盘前15分钟。在此期间,客户可以输入、修改和撤销报单,但报单在该时段不会被撮合成交。此外,在盘前议价期快结束时,即开盘前30秒,不可以进行修改和撤销报单,但是可以下新的报单。所有报单在开盘后的连续交易时段才会被撮合成交。

description

总之,集合竞价时段变化多端,非常复杂,在K线时长上需要特别关注和处理,否则您生成的是什么K线,正确与否是无从谈起的。没准您多了个莫名其妙的K线都不知道。

6.2 集合竞价和日内的休市时间段对K线合成处理的影响

6.2.1 在一个交易日中,用户接口收到的交易所中的tick为4类:

  • 上一交易日结算时间之后~本交易日集合竞价开始之前收到的tick,为垃圾无效tick数据;
  • 本交易日集合竞价期间收到唯一个包含本交易日开盘价的tick,为集合竞价tick数据;
  • 在各个连续竞价时间段收到的tick,为连续竞价tick数据;
  • 在日内的各个休市时间段收到tick,为休市tick数据。

6.2.2 以上4类tick的在K线合成方面的不同处理

  • 收到无效tick,直接做丢弃处理
  • 收到连续竞价tick,进行正常K线合成处理
  • 收到集合竞价tick,将其时间修改为集合竞价时间段的截止时间,之后与连续竞价tick一样处理
  • 收到休市tick,将其时间修改为所在休市时间段的开始时间减去1毫秒,之后与连续竞价tick一样处理

6.2.3 也可以参考利用合约交易状态信息来处理集合竞价tick

这种特别处理请参考:分析一下盘中启动CTA策略带来的第一根$K线错误

7. 该K线生成器的使用

7.1 解决回测中缺少合约信息和交易时间段信息

修改vnpy_ctastrategy\backtesting.py,修改后全部内容如下:

from collections import defaultdict
from datetime import date, datetime, timedelta
import imp
from pipes import Template
from typing import Callable, List
from functools import lru_cache, partial
import traceback

import numpy as np
from pandas import DataFrame
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from vnpy.trader.constant import (Direction, Offset, Exchange,
                                  Interval, Status)
from vnpy.trader.database import get_database
from vnpy.trader.object import OrderData, TradeData, BarData, TickData, ContractData    # hxxjava add ContractData
from vnpy.trader.utility import round_to
from vnpy.trader.optimize import (
    OptimizationSetting,
    check_optimization_setting,
    run_bf_optimization,
    run_ga_optimization
)

from .base import (
    BacktestingMode,
    EngineType,
    STOPORDER_PREFIX,
    StopOrder,
    StopOrderStatus,
    INTERVAL_DELTA_MAP
)
from .template import CtaTemplate

class BacktestingEngine:
    """"""

    engine_type = EngineType.BACKTESTING
    gateway_name = "BACKTESTING"

    def __init__(self):
        """"""
        self.vt_symbol = ""
        self.symbol = ""
        self.exchange = None
        self.start = None
        self.end = None
        self.rate = 0
        self.slippage = 0
        self.size = 1
        self.pricetick = 0
        self.capital = 1_000_000
        self.risk_free: float = 0
        self.annual_days: int = 240
        self.mode = BacktestingMode.BAR
        self.inverse = False

        self.strategy_class = None
        self.strategy = None
        self.tick: TickData
        self.bar: BarData
        self.datetime = None

        self.interval = None
        self.days = 0
        self.callback = None
        self.history_data = []

        self.stop_order_count = 0
        self.stop_orders = {}
        self.active_stop_orders = {}

        self.limit_order_count = 0
        self.limit_orders = {}
        self.active_limit_orders = {}

        self.trade_count = 0
        self.trades = {}

        self.logs = []

        self.daily_results = {}
        self.daily_df = None

        self.load_all_trading_hours()   # hxxjava add
        self.load_contracts()           # hxxjava add

    def clear_data(self):
        """
        Clear all data of last backtesting.
        """
        self.strategy = None
        self.tick = None
        self.bar = None
        self.datetime = None

        self.stop_order_count = 0
        self.stop_orders.clear()
        self.active_stop_orders.clear()

        self.limit_order_count = 0
        self.limit_orders.clear()
        self.active_limit_orders.clear()

        self.trade_count = 0
        self.trades.clear()

        self.logs.clear()
        self.daily_results.clear()

    def set_parameters(
        self,
        vt_symbol: str,
        interval: Interval,
        start: datetime,
        rate: float,
        slippage: float,
        size: float,
        pricetick: float,
        capital: int = 0,
        end: datetime = None,
        mode: BacktestingMode = BacktestingMode.BAR,
        inverse: bool = False,
        risk_free: float = 0,
        annual_days: int = 240
    ):
        """"""
        self.mode = mode
        self.vt_symbol = vt_symbol
        self.interval = Interval(interval)
        self.rate = rate
        self.slippage = slippage
        self.size = size
        self.pricetick = pricetick
        self.start = start

        self.symbol, exchange_str = self.vt_symbol.split(".")
        self.exchange = Exchange(exchange_str)

        self.capital = capital
        self.end = end
        self.mode = mode
        self.inverse = inverse
        self.risk_free = risk_free
        self.annual_days = annual_days

    def load_all_trading_hours(self) -> None: # hxxjava add end
        """  """
        from vnpy.trader.datafeed import get_datafeed

        df = get_datafeed()
        if not df.inited:
            df.init() 
        self.all_trading_hours = df.load_all_trading_hours() 
        print(f"BachtestingEngine.all_trading_hours len={len(self.all_trading_hours)}")

    def load_contracts(self) -> None: # hxxjava add end
        """  """
        database = get_database()

        contracts:List[ContractData] = database.load_contract_data()
        self.contracts = {}
        for c in contracts:
            self.contracts[c.vt_symbol] = c

        print(f"BachtestingEngine.contracts len={len(self.contracts)}")

    def get_trading_hours(self,strategy:CtaTemplate) -> str:   # hxxjava add
        """ 
        get vt_symbol's trading hours 
        """
        ths = self.all_trading_hours.get(strategy.vt_symbol.upper(),"")      

        return ths["trading_hours"] if ths else ""

    def get_contract(self, strategy:CtaTemplate) :# -> Optional[ContractData]:
        """
        Get contract data by vt_symbol.
        """
        return self.contracts.get(strategy.vt_symbol,None)

    def add_strategy(self, strategy_class: type, setting: dict):
        """"""
        self.strategy_class = strategy_class
        self.strategy = strategy_class(
            self, strategy_class.__name__, self.vt_symbol, setting
        )

    def load_data(self):
        """"""
        self.output("开始加载历史数据")

        if not self.end:
            self.end = datetime.now()

        if self.start >= self.end:
            self.output("起始日期必须小于结束日期")
            return

        self.history_data.clear()       # Clear previously loaded history data

        # Load 30 days of data each time and allow for progress update
        total_days = (self.end - self.start).days
        progress_days = max(int(total_days / 10), 1)
        progress_delta = timedelta(days=progress_days)
        interval_delta = INTERVAL_DELTA_MAP[self.interval]

        start = self.start
        end = self.start + progress_delta
        progress = 0

        while start < self.end:
            progress_bar = "#" * int(progress * 10 + 1)
            self.output(f"加载进度:{progress_bar} [{progress:.0%}]")

            end = min(end, self.end)  # Make sure end time stays within set range

            if self.mode == BacktestingMode.BAR:
                data = load_bar_data(
                    self.symbol,
                    self.exchange,
                    self.interval,
                    start,
                    end
                )
            else:
                data = load_tick_data(
                    self.symbol,
                    self.exchange,
                    start,
                    end
                )

            self.history_data.extend(data)

            progress += progress_days / total_days
            progress = min(progress, 1)

            start = end + interval_delta
            end += progress_delta

        self.output(f"历史数据加载完成,数据量:{len(self.history_data)}")

    def run_backtesting(self):
        """"""
        if self.mode == BacktestingMode.BAR:
            func = self.new_bar
        else:
            func = self.new_tick

        self.strategy.on_init()

        # Use the first [days] of history data for initializing strategy
        day_count = 0
        ix = 0

        for ix, data in enumerate(self.history_data):
            if self.datetime and data.datetime.day != self.datetime.day:
                day_count += 1
                if day_count >= self.days:
                    break

            self.datetime = data.datetime

            try:
                self.callback(data)
            except Exception:
                self.output("触发异常,回测终止")
                self.output(traceback.format_exc())
                return

        self.strategy.inited = True
        self.output("策略初始化完成")

        self.strategy.on_start()
        self.strategy.trading = True
        self.output("开始回放历史数据")

        # Use the rest of history data for running backtesting
        backtesting_data = self.history_data[ix:]
        if len(backtesting_data) <= 1:
            self.output("历史数据不足,回测终止")
            return

        total_size = len(backtesting_data)
        batch_size = max(int(total_size / 10), 1)

        for ix, i in enumerate(range(0, total_size, batch_size)):
            batch_data = backtesting_data[i: i + batch_size]
            for data in batch_data:
                try:
                    func(data)
                except Exception:
                    self.output("触发异常,回测终止")
                    self.output(traceback.format_exc())
                    return

            progress = min(ix / 10, 1)
            progress_bar = "=" * (ix + 1)
            self.output(f"回放进度:{progress_bar} [{progress:.0%}]")

        self.strategy.on_stop()
        self.output("历史数据回放结束")

    def calculate_result(self):
        """"""
        self.output("开始计算逐日盯市盈亏")

        if not self.trades:
            self.output("成交记录为空,无法计算")
            return

        # Add trade data into daily reuslt.
        for trade in self.trades.values():
            d = trade.datetime.date()
            daily_result = self.daily_results[d]
            daily_result.add_trade(trade)

        # Calculate daily result by iteration.
        pre_close = 0
        start_pos = 0

        for daily_result in self.daily_results.values():
            daily_result.calculate_pnl(
                pre_close,
                start_pos,
                self.size,
                self.rate,
                self.slippage,
                self.inverse
            )

            pre_close = daily_result.close_price
            start_pos = daily_result.end_pos

        # Generate dataframe
        results = defaultdict(list)

        for daily_result in self.daily_results.values():
            for key, value in daily_result.__dict__.items():
                results[key].append(value)

        self.daily_df = DataFrame.from_dict(results).set_index("date")

        self.output("逐日盯市盈亏计算完成")
        return self.daily_df

    def calculate_statistics(self, df: DataFrame = None, output=True):
        """"""
        self.output("开始计算策略统计指标")

        # Check DataFrame input exterior
        if df is None:
            df = self.daily_df

        # Check for init DataFrame
        if df is None:
            # Set all statistics to 0 if no trade.
            start_date = ""
            end_date = ""
            total_days = 0
            profit_days = 0
            loss_days = 0
            end_balance = 0
            max_drawdown = 0
            max_ddpercent = 0
            max_drawdown_duration = 0
            total_net_pnl = 0
            daily_net_pnl = 0
            total_commission = 0
            daily_commission = 0
            total_slippage = 0
            daily_slippage = 0
            total_turnover = 0
            daily_turnover = 0
            total_trade_count = 0
            daily_trade_count = 0
            total_return = 0
            annual_return = 0
            daily_return = 0
            return_std = 0
            sharpe_ratio = 0
            return_drawdown_ratio = 0
        else:
            # Calculate balance related time series data
            df["balance"] = df["net_pnl"].cumsum() + self.capital

            # When balance falls below 0, set daily return to 0
            pre_balance = df["balance"].shift(1)
            pre_balance.iloc[0] = self.capital
            x = df["balance"] / pre_balance
            x[x <= 0] = np.nan
            df["return"] = np.log(x).fillna(0)

            df["highlevel"] = (
                df["balance"].rolling(
                    min_periods=1, window=len(df), center=False).max()
            )
            df["drawdown"] = df["balance"] - df["highlevel"]
            df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

            # Calculate statistics value
            start_date = df.index[0]
            end_date = df.index[-1]

            total_days = len(df)
            profit_days = len(df[df["net_pnl"] > 0])
            loss_days = len(df[df["net_pnl"] < 0])

            end_balance = df["balance"].iloc[-1]
            max_drawdown = df["drawdown"].min()
            max_ddpercent = df["ddpercent"].min()
            max_drawdown_end = df["drawdown"].idxmin()

            if isinstance(max_drawdown_end, date):
                max_drawdown_start = df["balance"][:max_drawdown_end].idxmax()
                max_drawdown_duration = (max_drawdown_end - max_drawdown_start).days
            else:
                max_drawdown_duration = 0

            total_net_pnl = df["net_pnl"].sum()
            daily_net_pnl = total_net_pnl / total_days

            total_commission = df["commission"].sum()
            daily_commission = total_commission / total_days

            total_slippage = df["slippage"].sum()
            daily_slippage = total_slippage / total_days

            total_turnover = df["turnover"].sum()
            daily_turnover = total_turnover / total_days

            total_trade_count = df["trade_count"].sum()
            daily_trade_count = total_trade_count / total_days

            total_return = (end_balance / self.capital - 1) * 100
            annual_return = total_return / total_days * self.annual_days
            daily_return = df["return"].mean() * 100
            return_std = df["return"].std() * 100

            if return_std:
                daily_risk_free = self.risk_free / np.sqrt(self.annual_days)
                sharpe_ratio = (daily_return - daily_risk_free) / return_std * np.sqrt(self.annual_days)
            else:
                sharpe_ratio = 0

            return_drawdown_ratio = -total_return / max_ddpercent

        # Output
        if output:
            self.output("-" * 30)
            self.output(f"首个交易日:\t{start_date}")
            self.output(f"最后交易日:\t{end_date}")

            self.output(f"总交易日:\t{total_days}")
            self.output(f"盈利交易日:\t{profit_days}")
            self.output(f"亏损交易日:\t{loss_days}")

            self.output(f"起始资金:\t{self.capital:,.2f}")
            self.output(f"结束资金:\t{end_balance:,.2f}")

            self.output(f"总收益率:\t{total_return:,.2f}%")
            self.output(f"年化收益:\t{annual_return:,.2f}%")
            self.output(f"最大回撤: \t{max_drawdown:,.2f}")
            self.output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
            self.output(f"最长回撤天数: \t{max_drawdown_duration}")

            self.output(f"总盈亏:\t{total_net_pnl:,.2f}")
            self.output(f"总手续费:\t{total_commission:,.2f}")
            self.output(f"总滑点:\t{total_slippage:,.2f}")
            self.output(f"总成交金额:\t{total_turnover:,.2f}")
            self.output(f"总成交笔数:\t{total_trade_count}")

            self.output(f"日均盈亏:\t{daily_net_pnl:,.2f}")
            self.output(f"日均手续费:\t{daily_commission:,.2f}")
            self.output(f"日均滑点:\t{daily_slippage:,.2f}")
            self.output(f"日均成交金额:\t{daily_turnover:,.2f}")
            self.output(f"日均成交笔数:\t{daily_trade_count}")

            self.output(f"日均收益率:\t{daily_return:,.2f}%")
            self.output(f"收益标准差:\t{return_std:,.2f}%")
            self.output(f"Sharpe Ratio:\t{sharpe_ratio:,.2f}")
            self.output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")

        statistics = {
            "start_date": start_date,
            "end_date": end_date,
            "total_days": total_days,
            "profit_days": profit_days,
            "loss_days": loss_days,
            "capital": self.capital,
            "end_balance": end_balance,
            "max_drawdown": max_drawdown,
            "max_ddpercent": max_ddpercent,
            "max_drawdown_duration": max_drawdown_duration,
            "total_net_pnl": total_net_pnl,
            "daily_net_pnl": daily_net_pnl,
            "total_commission": total_commission,
            "daily_commission": daily_commission,
            "total_slippage": total_slippage,
            "daily_slippage": daily_slippage,
            "total_turnover": total_turnover,
            "daily_turnover": daily_turnover,
            "total_trade_count": total_trade_count,
            "daily_trade_count": daily_trade_count,
            "total_return": total_return,
            "annual_return": annual_return,
            "daily_return": daily_return,
            "return_std": return_std,
            "sharpe_ratio": sharpe_ratio,
            "return_drawdown_ratio": return_drawdown_ratio,
        }

        # Filter potential error infinite value
        for key, value in statistics.items():
            if value in (np.inf, -np.inf):
                value = 0
            statistics[key] = np.nan_to_num(value)

        self.output("策略统计指标计算完成")
        return statistics

    def show_chart(self, df: DataFrame = None):
        """"""
        # Check DataFrame input exterior
        if df is None:
            df = self.daily_df

        # Check for init DataFrame
        if df is None:
            return

        fig = make_subplots(
            rows=4,
            cols=1,
            subplot_titles=["Balance", "Drawdown", "Daily Pnl", "Pnl Distribution"],
            vertical_spacing=0.06
        )

        balance_line = go.Scatter(
            x=df.index,
            y=df["balance"],
            mode="lines",
            name="Balance"
        )
        drawdown_scatter = go.Scatter(
            x=df.index,
            y=df["drawdown"],
            fillcolor="red",
            fill='tozeroy',
            mode="lines",
            name="Drawdown"
        )
        pnl_bar = go.Bar(y=df["net_pnl"], name="Daily Pnl")
        pnl_histogram = go.Histogram(x=df["net_pnl"], nbinsx=100, name="Days")

        fig.add_trace(balance_line, row=1, col=1)
        fig.add_trace(drawdown_scatter, row=2, col=1)
        fig.add_trace(pnl_bar, row=3, col=1)
        fig.add_trace(pnl_histogram, row=4, col=1)

        fig.update_layout(height=1000, width=1000)
        fig.show()

    def run_bf_optimization(self, optimization_setting: OptimizationSetting, output=True):
        """"""
        if not check_optimization_setting(optimization_setting):
            return

        evaluate_func: callable = wrap_evaluate(self, optimization_setting.target_name)
        results = run_bf_optimization(
            evaluate_func,
            optimization_setting,
            get_target_value,
            output=self.output
        )

        if output:
            for result in results:
                msg: str = f"参数:{result[0]}, 目标:{result[1]}"
                self.output(msg)

        return results

    run_optimization = run_bf_optimization

    def run_ga_optimization(self, optimization_setting: OptimizationSetting, output=True):
        """"""
        if not check_optimization_setting(optimization_setting):
            return

        evaluate_func: callable = wrap_evaluate(self, optimization_setting.target_name)
        results = run_ga_optimization(
            evaluate_func,
            optimization_setting,
            get_target_value,
            output=self.output
        )

        if output:
            for result in results:
                msg: str = f"参数:{result[0]}, 目标:{result[1]}"
                self.output(msg)

        return results

    def update_daily_close(self, price: float):
        """"""
        d = self.datetime.date()

        daily_result = self.daily_results.get(d, None)
        if daily_result:
            daily_result.close_price = price
        else:
            self.daily_results[d] = DailyResult(d, price)

    def new_bar(self, bar: BarData):
        """"""
        self.bar = bar
        self.datetime = bar.datetime

        self.cross_limit_order()
        self.cross_stop_order()
        self.strategy.on_bar(bar)

        self.update_daily_close(bar.close_price)

    def new_tick(self, tick: TickData):
        """"""
        self.tick = tick
        self.datetime = tick.datetime

        self.cross_limit_order()
        self.cross_stop_order()
        self.strategy.on_tick(tick)

        self.update_daily_close(tick.last_price)

    def cross_limit_order(self):
        """
        Cross limit order with last bar/tick data.
        """
        if self.mode == BacktestingMode.BAR:
            long_cross_price = self.bar.low_price
            short_cross_price = self.bar.high_price
            long_best_price = self.bar.open_price
            short_best_price = self.bar.open_price
        else:
            long_cross_price = self.tick.ask_price_1
            short_cross_price = self.tick.bid_price_1
            long_best_price = long_cross_price
            short_best_price = short_cross_price

        for order in list(self.active_limit_orders.values()):
            # Push order update with status "not traded" (pending).
            if order.status == Status.SUBMITTING:
                order.status = Status.NOTTRADED
                self.strategy.on_order(order)

            # Check whether limit orders can be filled.
            long_cross = (
                order.direction == Direction.LONG
                and order.price >= long_cross_price
                and long_cross_price > 0
            )

            short_cross = (
                order.direction == Direction.SHORT
                and order.price <= short_cross_price
                and short_cross_price > 0
            )

            if not long_cross and not short_cross:
                continue

            # Push order udpate with status "all traded" (filled).
            order.traded = order.volume
            order.status = Status.ALLTRADED
            self.strategy.on_order(order)

            if order.vt_orderid in self.active_limit_orders:
                self.active_limit_orders.pop(order.vt_orderid)

            # Push trade update
            self.trade_count += 1

            if long_cross:
                trade_price = min(order.price, long_best_price)
                pos_change = order.volume
            else:
                trade_price = max(order.price, short_best_price)
                pos_change = -order.volume

            trade = TradeData(
                symbol=order.symbol,
                exchange=order.exchange,
                orderid=order.orderid,
                tradeid=str(self.trade_count),
                direction=order.direction,
                offset=order.offset,
                price=trade_price,
                volume=order.volume,
                datetime=self.datetime,
                gateway_name=self.gateway_name,
            )

            self.strategy.pos += pos_change
            self.strategy.on_trade(trade)

            self.trades[trade.vt_tradeid] = trade

    def cross_stop_order(self):
        """
        Cross stop order with last bar/tick data.
        """
        if self.mode == BacktestingMode.BAR:
            long_cross_price = self.bar.high_price
            short_cross_price = self.bar.low_price
            long_best_price = self.bar.open_price
            short_best_price = self.bar.open_price
        else:
            long_cross_price = self.tick.last_price
            short_cross_price = self.tick.last_price
            long_best_price = long_cross_price
            short_best_price = short_cross_price

        for stop_order in list(self.active_stop_orders.values()):
            # Check whether stop order can be triggered.
            long_cross = (
                stop_order.direction == Direction.LONG
                and stop_order.price <= long_cross_price
            )

            short_cross = (
                stop_order.direction == Direction.SHORT
                and stop_order.price >= short_cross_price
            )

            if not long_cross and not short_cross:
                continue

            # Create order data.
            self.limit_order_count += 1

            order = OrderData(
                symbol=self.symbol,
                exchange=self.exchange,
                orderid=str(self.limit_order_count),
                direction=stop_order.direction,
                offset=stop_order.offset,
                price=stop_order.price,
                volume=stop_order.volume,
                traded=stop_order.volume,
                status=Status.ALLTRADED,
                gateway_name=self.gateway_name,
                datetime=self.datetime
            )

            self.limit_orders[order.vt_orderid] = order

            # Create trade data.
            if long_cross:
                trade_price = max(stop_order.price, long_best_price)
                pos_change = order.volume
            else:
                trade_price = min(stop_order.price, short_best_price)
                pos_change = -order.volume

            self.trade_count += 1

            trade = TradeData(
                symbol=order.symbol,
                exchange=order.exchange,
                orderid=order.orderid,
                tradeid=str(self.trade_count),
                direction=order.direction,
                offset=order.offset,
                price=trade_price,
                volume=order.volume,
                datetime=self.datetime,
                gateway_name=self.gateway_name,
            )

            self.trades[trade.vt_tradeid] = trade

            # Update stop order.
            stop_order.vt_orderids.append(order.vt_orderid)
            stop_order.status = StopOrderStatus.TRIGGERED

            if stop_order.stop_orderid in self.active_stop_orders:
                self.active_stop_orders.pop(stop_order.stop_orderid)

            # Push update to strategy.
            self.strategy.on_stop_order(stop_order)
            self.strategy.on_order(order)

            self.strategy.pos += pos_change
            self.strategy.on_trade(trade)

    def load_bar(
        self,
        vt_symbol: str,
        days: int,
        interval: Interval,
        callback: Callable,
        use_database: bool
    ) -> List[BarData]:
        """"""
        self.days = days
        self.callback = callback
        return []

    def load_tick(self, vt_symbol: str, days: int, callback: Callable) -> List[TickData]:
        """"""
        self.days = days
        self.callback = callback
        return []

    def send_order(
        self,
        strategy: CtaTemplate,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool,
        lock: bool,
        net: bool
    ):
        """"""
        price = round_to(price, self.pricetick)
        if stop:
            vt_orderid = self.send_stop_order(direction, offset, price, volume)
        else:
            vt_orderid = self.send_limit_order(direction, offset, price, volume)
        return [vt_orderid]

    def send_stop_order(
        self,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float
    ):
        """"""
        self.stop_order_count += 1

        stop_order = StopOrder(
            vt_symbol=self.vt_symbol,
            direction=direction,
            offset=offset,
            price=price,
            volume=volume,
            datetime=self.datetime,
            stop_orderid=f"{STOPORDER_PREFIX}.{self.stop_order_count}",
            strategy_name=self.strategy.strategy_name,
        )

        self.active_stop_orders[stop_order.stop_orderid] = stop_order
        self.stop_orders[stop_order.stop_orderid] = stop_order

        return stop_order.stop_orderid

    def send_limit_order(
        self,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float
    ):
        """"""
        self.limit_order_count += 1

        order = OrderData(
            symbol=self.symbol,
            exchange=self.exchange,
            orderid=str(self.limit_order_count),
            direction=direction,
            offset=offset,
            price=price,
            volume=volume,
            status=Status.SUBMITTING,
            gateway_name=self.gateway_name,
            datetime=self.datetime
        )

        self.active_limit_orders[order.vt_orderid] = order
        self.limit_orders[order.vt_orderid] = order

        return order.vt_orderid

    def cancel_order(self, strategy: CtaTemplate, vt_orderid: str):
        """
        Cancel order by vt_orderid.
        """
        if vt_orderid.startswith(STOPORDER_PREFIX):
            self.cancel_stop_order(strategy, vt_orderid)
        else:
            self.cancel_limit_order(strategy, vt_orderid)

    def cancel_stop_order(self, strategy: CtaTemplate, vt_orderid: str):
        """"""
        if vt_orderid not in self.active_stop_orders:
            return
        stop_order = self.active_stop_orders.pop(vt_orderid)

        stop_order.status = StopOrderStatus.CANCELLED
        self.strategy.on_stop_order(stop_order)

    def cancel_limit_order(self, strategy: CtaTemplate, vt_orderid: str):
        """"""
        if vt_orderid not in self.active_limit_orders:
            return
        order = self.active_limit_orders.pop(vt_orderid)

        order.status = Status.CANCELLED
        self.strategy.on_order(order)

    def cancel_all(self, strategy: CtaTemplate):
        """
        Cancel all orders, both limit and stop.
        """
        vt_orderids = list(self.active_limit_orders.keys())
        for vt_orderid in vt_orderids:
            self.cancel_limit_order(strategy, vt_orderid)

        stop_orderids = list(self.active_stop_orders.keys())
        for vt_orderid in stop_orderids:
            self.cancel_stop_order(strategy, vt_orderid)

    def write_log(self, msg: str, strategy: CtaTemplate = None):
        """
        Write log message.
        """
        msg = f"{self.datetime}\t{msg}"
        self.logs.append(msg)

    def send_email(self, msg: str, strategy: CtaTemplate = None):
        """
        Send email to default receiver.
        """
        pass

    def sync_strategy_data(self, strategy: CtaTemplate):
        """
        Sync strategy data into json file.
        """
        pass

    def get_engine_type(self):
        """
        Return engine type.
        """
        return self.engine_type

    def get_pricetick(self, strategy: CtaTemplate):
        """
        Return contract pricetick data.
        """
        return self.pricetick

    def put_strategy_event(self, strategy: CtaTemplate):
        """
        Put an event to update strategy status.
        """
        pass

    def output(self, msg):
        """
        Output message of backtesting engine.
        """
        print(f"{datetime.now()}\t{msg}")

    def get_all_trades(self):
        """
        Return all trade data of current backtesting result.
        """
        return list(self.trades.values())

    def get_all_orders(self):
        """
        Return all limit order data of current backtesting result.
        """
        return list(self.limit_orders.values())

    def get_all_daily_results(self):
        """
        Return all daily result data.
        """
        return list(self.daily_results.values())


class DailyResult:
    """"""

    def __init__(self, date: date, close_price: float):
        """"""
        self.date = date
        self.close_price = close_price
        self.pre_close = 0

        self.trades = []
        self.trade_count = 0

        self.start_pos = 0
        self.end_pos = 0

        self.turnover = 0
        self.commission = 0
        self.slippage = 0

        self.trading_pnl = 0
        self.holding_pnl = 0
        self.total_pnl = 0
        self.net_pnl = 0

    def add_trade(self, trade: TradeData):
        """"""
        self.trades.append(trade)

    def calculate_pnl(
        self,
        pre_close: float,
        start_pos: float,
        size: int,
        rate: float,
        slippage: float,
        inverse: bool
    ):
        """"""
        # If no pre_close provided on the first day,
        # use value 1 to avoid zero division error
        if pre_close:
            self.pre_close = pre_close
        else:
            self.pre_close = 1

        # Holding pnl is the pnl from holding position at day start
        self.start_pos = start_pos
        self.end_pos = start_pos

        if not inverse:     # For normal contract
            self.holding_pnl = self.start_pos * \
                (self.close_price - self.pre_close) * size
        else:               # For crypto currency inverse contract
            self.holding_pnl = self.start_pos * \
                (1 / self.pre_close - 1 / self.close_price) * size

        # Trading pnl is the pnl from new trade during the day
        self.trade_count = len(self.trades)

        for trade in self.trades:
            if trade.direction == Direction.LONG:
                pos_change = trade.volume
            else:
                pos_change = -trade.volume

            self.end_pos += pos_change

            # For normal contract
            if not inverse:
                turnover = trade.volume * size * trade.price
                self.trading_pnl += pos_change * \
                    (self.close_price - trade.price) * size
                self.slippage += trade.volume * size * slippage
            # For crypto currency inverse contract
            else:
                turnover = trade.volume * size / trade.price
                self.trading_pnl += pos_change * \
                    (1 / trade.price - 1 / self.close_price) * size
                self.slippage += trade.volume * size * slippage / (trade.price ** 2)

            self.turnover += turnover
            self.commission += turnover * rate

        # Net pnl takes account of commission and slippage cost
        self.total_pnl = self.trading_pnl + self.holding_pnl
        self.net_pnl = self.total_pnl - self.commission - self.slippage


@lru_cache(maxsize=999)
def load_bar_data(
    symbol: str,
    exchange: Exchange,
    interval: Interval,
    start: datetime,
    end: datetime
):
    """"""
    database = get_database()

    return database.load_bar_data(
        symbol, exchange, interval, start, end
    )


@lru_cache(maxsize=999)
def load_tick_data(
    symbol: str,
    exchange: Exchange,
    start: datetime,
    end: datetime
):
    """"""
    database = get_database()

    return database.load_tick_data(
        symbol, exchange, start, end
    )


def evaluate(
    target_name: str,
    strategy_class: CtaTemplate,
    vt_symbol: str,
    interval: Interval,
    start: datetime,
    rate: float,
    slippage: float,
    size: float,
    pricetick: float,
    capital: int,
    end: datetime,
    mode: BacktestingMode,
    inverse: bool,
    setting: dict
):
    """
    Function for running in multiprocessing.pool
    """
    engine = BacktestingEngine()

    engine.set_parameters(
        vt_symbol=vt_symbol,
        interval=interval,
        start=start,
        rate=rate,
        slippage=slippage,
        size=size,
        pricetick=pricetick,
        capital=capital,
        end=end,
        mode=mode,
        inverse=inverse
    )

    engine.add_strategy(strategy_class, setting)
    engine.load_data()
    engine.run_backtesting()
    engine.calculate_result()
    statistics = engine.calculate_statistics(output=False)

    target_value = statistics[target_name]
    return (str(setting), target_value, statistics)


def wrap_evaluate(engine: BacktestingEngine, target_name: str) -> callable:
    """
    Wrap evaluate function with given setting from backtesting engine.
    """
    func: callable = partial(
        evaluate,
        target_name,
        engine.strategy_class,
        engine.vt_symbol,
        engine.interval,
        engine.start,
        engine.rate,
        engine.slippage,
        engine.size,
        engine.pricetick,
        engine.capital,
        engine.end,
        engine.mode,
        engine.inverse
    )
    return func


def get_target_value(result: list) -> float:
    """
    Get target value for sorting optimization results.
    """
    return result[1]
Member
avatar
加入于:
帖子: 1448
声望: 102

大佬讲解的太详细了

Member
avatar
加入于:
帖子: 4622
声望: 284

感谢楼主的贡献,尝试复现了一下,需要成功使用还有些补充的地方:
 
1楼已贴代码补充:
 
(1). 3.4.2: vnpy_rqdata\rqdata_datafeed.py里class RqdataDatafeed下需要加一行

trading_hours_file = "trading_hours.json"

 
(2). 3.4.3: vnpy\trader\engine.py里顶部添加from .utility import load_json,然后main_engine的init函数下也需要加一句

self.all_trading_hours = load_json("trading_hours.json")

 
如需通过CTA策略模块使用MyBarGenerator,还需对vnpy_ctastrategy做出相应调整:
 
(1). vnpy_ctastrategy.template的CtaTemplate类下添加get_trading_hours函数:

    def get_trading_hours(self):
        """
        Return trading_hours of trading contract.
        """
        return self.cta_engine.get_trading_hours(self)

 
(2). vnpy_ctastrategy.engine下添加get_trading_hours函数:

    def get_trading_hours(self, strategy: CtaTemplate):
        """
        Return contract trading hours.
        """
        trading_hours: str = self.main_engine.get_trading_hours(strategy.vt_symbol)
        return trading_hours

 
(3). vnpy_ctastrategy.backtesting的BacktestingEngine类的init函数下添加self.trading_hours: str = ""定义、在set_parameters函数中添加trading_hours入参以及添加get_trading_hours函数:

    def set_parameters(
        self,
        vt_symbol: str,
        interval: Interval,
        start: datetime,
        rate: float,
        slippage: float,
        size: float,
        pricetick: float,
        capital: int = 0,
        end: datetime = None,
        mode: BacktestingMode = BacktestingMode.BAR,
        inverse: bool = False,
        risk_free: float = 0,
        annual_days: int = 240,
        trading_hours: str = ""    # add
    ):
        """"""
        self.mode = mode
        self.vt_symbol = vt_symbol
        self.interval = Interval(interval)
        self.rate = rate
        self.slippage = slippage
        self.size = size
        self.pricetick = pricetick
        self.start = start

        self.symbol, exchange_str = self.vt_symbol.split(".")
        self.exchange = Exchange(exchange_str)

        self.capital = capital
        self.end = end
        self.mode = mode
        self.inverse = inverse
        self.risk_free = risk_free
        self.annual_days = annual_days

        self.trading_hours = trading_hours    # add

 

    def get_trading_hours(self, strategy: CtaTemplate):
        """
        Return contract trading hours.
        """
        return self.trading_hours

 

策略内使用方法:
 
1分钟K线:
策略顶部导入MyBarGeratorfrom vnpy.trader.utility import MyBarGenerator,策略init函数初始化MyBarGerator:

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super().__init__(cta_engine, strategy_name, vt_symbol, setting)
        trading_hours: str = self.get_trading_hours()

        self.bg = MyBarGenerator(self.on_bar, trading_hours=trading_hours)
        self.am = ArrayManager()

 

 
n分钟K线:
策略顶部导入MyBarGeneratorfrom vnpy.trader.utility import MyBarGenerator, Interval,策略init函数初始化MyBarGenerator:

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super().__init__(cta_engine, strategy_name, vt_symbol, setting)
        trading_hours: str = self.get_trading_hours()

        self.bg = MyBarGenerator(self.on_bar, 15, self.on_15min_bar, interval=Interval.DAILY, trading_hours=trading_hours)
        self.am = ArrayManager()

 
请注意,如需使用MyBarGenerator进行CTA策略回测,回测脚本调用engine.engine.set_parameters函数的时候不要忘记传入trading_hours参数
 

Member
avatar
加入于:
帖子: 24
声望: 0

请注意,如需使用MyBarGenerator进行CTA策略回测,回测脚本调用engine.engine.set_parameters函数的时候不要忘记传入trading_hours参数
回测时,trading_hours参数如何传入,能给出详细的代码吗?

Member
avatar
加入于:
帖子: 24
声望: 0

急呀 哪位兄弟解答一下!连日k线都合成不准确,后面的量化指标意义吗?

Member
avatar
加入于:
帖子: 419
声望: 171

龙津 wrote:

请注意,如需使用MyBarGenerator进行CTA策略回测,回测脚本调用engine.engine.set_parameters函数的时候不要忘记传入trading_hours参数
回测时,trading_hours参数如何传入,能给出详细的代码吗?

MyBarGenerator进行CTA策略回测的修改已经放在7.1节中给出来,仔细修改下。

Member
avatar
加入于:
帖子: 24
声望: 0

To hxxjava: 多谢大神的无私分享!点赞。。。

Member
avatar
加入于:
帖子: 3
声望: 0

复现过程中遇到两个报错,解决不了,请大佬指点,拜托!!!
一、加载策略时遇到如下报错:
File "D:\softwork\python3.10\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 268, in open_widget
widget = widget_class(self.main_engine, self.event_engine)
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\ui\widget.py", line 38, in init
self.cta_engine.init_engine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 117, in init_engine
self.load_strategy_setting()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 907, in load_strategy_setting
self.add_strategy(
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 662, in add_strategy strategy: CtaTemplate = strategy_class(self, strategy_name, vt_symbol, setting)
File "C:\Users\Administrator\strategies\demo009_MA20903.py", line 94, in init
self.bg_x = MyBarGenerator(self.on_bar, self.x_min, self.on_x_bar, trading_hours=trading_hours)
File "D:\softwork\python3.10\lib\site-packages\vnpy\usertools\utility.py", line 52, in init
raise ValueError(f"MyBarGenerator need trading hours setting , please check it !")
ValueError: MyBarGenerator need trading hours setting , please check it !

二、回测时遇到如下报错:
Traceback (most recent call last):
File "D:\softwork\python3.10\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 268, in open_widget
widget = widget_class(self.main_engine, self.event_engine)
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 55, in init
self.backtester_engine.init_engine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctabacktester\engine.py", line 63, in init_engine
self.backtesting_engine = BacktestingEngine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\backtesting.py", line 87, in init
self.load_contracts() # hxxjava add
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\backtesting.py", line 170, in load_contracts
contracts:List[ContractData] = database.load_contract_data()
AttributeError: 'SqliteDatabase' object has no attribute 'load_contract_data'

Member
avatar
加入于:
帖子: 419
声望: 171

王中锋 wrote:

复现过程中遇到两个报错,解决不了,请大佬指点,拜托!!!
一、加载策略时遇到如下报错:
File "D:\softwork\python3.10\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 268, in open_widget
widget = widget_class(self.main_engine, self.event_engine)
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\ui\widget.py", line 38, in init
self.cta_engine.init_engine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 117, in init_engine
self.load_strategy_setting()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 907, in load_strategy_setting
self.add_strategy(
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\engine.py", line 662, in add_strategy strategy: CtaTemplate = strategy_class(self, strategy_name, vt_symbol, setting)
File "C:\Users\Administrator\strategies\demo009_MA20903.py", line 94, in init
self.bg_x = MyBarGenerator(self.on_bar, self.x_min, self.on_x_bar, trading_hours=trading_hours)
File "D:\softwork\python3.10\lib\site-packages\vnpy\usertools\utility.py", line 52, in init
raise ValueError(f"MyBarGenerator need trading hours setting , please check it !")
ValueError: MyBarGenerator need trading hours setting , please check it !

二、回测时遇到如下报错:
Traceback (most recent call last):
File "D:\softwork\python3.10\lib\site-packages\vnpy\trader\ui\mainwindow.py", line 268, in open_widget
widget = widget_class(self.main_engine, self.event_engine)
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctabacktester\ui\widget.py", line 55, in init
self.backtester_engine.init_engine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctabacktester\engine.py", line 63, in init_engine
self.backtesting_engine = BacktestingEngine()
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\backtesting.py", line 87, in init
self.load_contracts() # hxxjava add
File "D:\softwork\python3.10\lib\site-packages\vnpy_ctastrategy\backtesting.py", line 170, in load_contracts
contracts:List[ContractData] = database.load_contract_data()
AttributeError: 'SqliteDatabase' object has no attribute 'load_contract_data'

你可能不仔细按照我的帖子中去做,楼上“龙津”已经用了照着做,代码是没有问题的。你可以私信他,他应该会告诉你经验。

Member
avatar
加入于:
帖子: 24
声望: 0

在实盘过程中,发现日内对齐等交易时长的K线生成器的一个隐藏很深的BUG?当要合成日内“n分钟”,n较大时,如超过3小时=180分钟,4小时=240分钟等。这时合成的n分钟K线是不对的。主要原因是:分钟较大时,需要获取多个交易时间段。如以下合成240分钟K线:
(datetime.date(2022, 2, 18), [(datetime.datetime(2022, 2, 17, 21, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' LMT+8:06:00 STD>), datetime.datetime(2022, 2, 17, 23, 0, tzinfo=<DstTzInfo 'Asia/Shanghai' LMT+8:06:00 STD>)), (datetime.datetime(2022, 2, 18, 10, 30, tzinfo=<DstTzInfo 'Asia/Shanghai' LMT+8:06:00 STD>), datetime.datetime(2022, 2, 18, 11, 15, tzinfo=<DstTzInfo 'Asia/Shanghai' LMT+8:06:00 STD>))])
我们发现交易时间段中的上午9:00至上午10:15这一段缺失了。因此需要修改一个地方:trading_hours.py里面的 def get_intraday_window(self,trade_dt:datetime,window:int) -> Tuple[date,List[Tuple[datetime,datetime]]]:

    window_segments = [(start if start > start_time else start_time,stop if stop < stop_time else stop_time) 
                        for start,stop in segment_datetimes 

if (start_time < stop) and (start < stop_time )] #srj add

                        # if (start <= start_time < stop) or (start < stop_time <= stop)]       #原来的
Member
avatar
加入于:
帖子: 419
声望: 171

答复:

前天在你给我私信里,已经给你回复了,你没有看到吗?
这个问题已经修改了,因为我看到你贴出来的代码是原来有问题的代码,往二楼查看下。
需要更新下面的几个class:

  • TradingHours
  • MyBarGenerator

你看不到回复是吗?可能是系统提示我的私信邮箱已经超了,发不了回信吧。

Member
avatar
加入于:
帖子: 24
声望: 0

To hxxjava: 兄弟你好!我没收到你的私信,以为你忙没有看到,以为楼上代码没更正?

Member
avatar
加入于:
帖子: 3
声望: 0

大佬 真的厉害,把代码写的那么详细还很有耐心!感谢!!!

Member
avatar
加入于:
帖子: 3
声望: 0

复现过程中遇到两个报错,解决不了,请大佬指点,拜托!!!
def load_contracts(self) -> None: # hxxjava add end
""" 装载合同 """
database = get_database()
print(f"{database}")
contracts: List[ContractData] = database.load_contract_data()
self.contracts = {}
for c in contracts:
self.contracts[c.vt_symbol] = c

    print(f"BachtestingEngine.contracts len={len(self.contracts)}")

contracts: List[ContractData] = database.load_contract_data()
AttributeError: 'MysqlDatabase' object has no attribute 'load_contract_data'

Member
avatar
加入于:
帖子: 7
声望: 0

时隔多年,才看到大佬的帖子,非常感谢大佬的无私分享,感觉K线合成好难啊!
运行了下大佬的代码,发现后续特别tick的处理部分没有实现,实际运行了一下结果就是丢了最后一个K线,不知怎么解决。

调试了下5分钟的k线,
22:55时收到22:50~22:55的k线正确。
2024-03-05 22:55:00,134 INFO: BarData(gateway_name='CTP', symbol='rb2405', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2024, 3, 5, 22, 50, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), interval=<Interval.MINUTE: '1m'>, volume=15366, turnover=0, open_interest=2157315.0, open_price=3710.0000000000005, high_price=3712.0000000000005, low_price=3708.0000000000005, close_price=3711.0000000000005)

23:00时只收到了tick,没有合成k线
特别tick【TickData(gateway_name='CTP', symbol='rb2405', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2024, 3, 5, 23, 0), name='rb2405', volume=428859, turnover=0, open_interest=2150992.0, last_price=3712.0000000000005, last_volume=0, limit_up=3924.0, limit_down=3551.0, open_price=3715.0, high_price=3722.0000000000005, low_price=3705.0000000000005, pre_close=3723.0, bid_price_1=3712.0000000000005, bid_price_2=0, bid_price_3=0, bid_price_4=0, bid_price_5=0, ask_price_1=3713.0000000000005, ask_price_2=0, ask_price_3=0, ask_price_4=0, ask_price_5=0, bid_volume_1=177, bid_volume_2=0, bid_volume_3=0, bid_volume_4=0, bid_volume_5=0, ask_volume_1=173, ask_volume_2=0, ask_volume_3=0, ask_volume_4=0, ask_volume_5=0, avg_price=37129.607446736576, PreSettlementPrice=3738.0, lastprice=3712.0000000000005)】

Member
avatar
加入于:
帖子: 8
声望: 0

非常感谢!大佬

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】