文章太长,再分一贴吧。
4. 交易时间段的处理
4.1 交易时间段处理的复杂性
一个合约的交易时间段信息,就包含在一个字符串中。通常看起来是这样的:
"21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
它看似简单,实则非常复杂!简单在于它只是一个字符串,其实它能够表达非常复杂的交易时间规定。例如交易时间可以少到只有1段,也可以4到5个段,可跨日,也可以跨多日,如遇到周末或者长假。但是长假太难处理了,我们这也不处理各种各样的假日规定,因为那个太复杂了!不过好在时下很多软件,著名的和非著名的软件,几乎都不处理跨长假的问题,不处理的原因也是和我分析的一样,不过这也没有影响他们多软件被广大用户接受的程度。所以我们也不处理跨长假的问题。
当然想处理跨长假也不成,条件不具备呀。因为毕竟我们不是交易所,不知道各种各样的休假规定,不同市场,不同国家的节假日,千奇百怪,太难处理了。而且我们也不能说不处理哪个市场或者国家的投资品种吧?绝大部分软件都不处理长假对K线对齐方式的影响,原因就在于此,没有什么别的说辞!
4.2 交易时间段处理具有的功能
- 从交易时间段字符串中提取出各段的起止时间(天内的秒数) 列表
- 给定一个时间,可以得到其交易日及日期时间格式的交易时间段列表,无效交易时间返回空
- 给定一个时间,得到其在日内的交易时间、所在窗口的索引、窗口开始时间和截止时间
4.3 交易时间段处理的实现
在vnpy\usertools下创建一个名称为trading_hours.py,其代码如下:
"""
本文件主要实现合约的交易时间段:TradingHours
作者:hxxjava
日期:2022-03-28
"""
from typing import Callable,List,Dict, Tuple, Union
from enum import Enum
from datetime import datetime,date,timedelta, tzinfo
import numpy as np
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")
from vnpy.trader.constant import Interval
INTERVAL_MAP = {
Interval.MINUTE:60,
Interval.HOUR:3600,
Interval.DAILY:3600*24,
Interval.WEEKLY:3600*24*7,
}
def get_time_segments(trading_hours:str) -> List:
"""
从交易时间段字符串中提取出各段的起止时间(天内的秒数) 列表
"""
time_sepments = []
# 提取各段
str_segments = trading_hours.split(',')
pre_start,day_offset = None,0
for s in reversed(str_segments): # 反向遍历各段
# 提取段的起止时间
start,stop = s.split('-')
# 计算开始时间天内秒
hh,mm = start.split(':')
start_s = int(hh)*3600+int(mm)*60
# 计算截止时间天内秒
hh,mm = stop.split(':')
stop_s = int(hh)*3600+int(mm)*60
if pre_start and start > pre_start:
day_offset -= 1
pre_start = start
# 加入列表
time_sepments.insert(0,(day_offset,start_s,stop_s))
return time_sepments
def in_segments(trade_segments:List,trade_dt:datetime):
""" 判断一个时间是否在一个交易时间段列表中 """
trade_dt = trade_dt.replace(tzinfo=CHINA_TZ)
for start,stop in trade_segments:
if start <= trade_dt < stop:
return True
return False
class TradingHours(object):
"""
交易时间段处理
"""
def __init__(self,trading_hours:str):
""" 初始化函数 """
self.time_segments:List[Tuple[int,int,int]] = get_time_segments(trading_hours)
def day_trade_time(self,interval:Interval) -> int:
"""
一个交易日的交易时长,单位由interval 规定,不足的部分+1
"""
seconds = 0.0
for _,start,stop in self.time_segments:
seconds += stop - start + (0 if start < stop else INTERVAL_MAP[Interval.DAILY])
if not interval:
return seconds
else:
return np.ceil(seconds/INTERVAL_MAP[interval])
def get_trade_hours(self,trade_dt:datetime) -> Tuple[date,List[Tuple[datetime,datetime]]]:
"""
得到一个时间的交易日及日期时间格式的交易时间段列表,无效交易时间返回空
"""
# 构造trade_dt加前后三天共7的日期
trade_dt = trade_dt.replace(tzinfo=CHINA_TZ)
dates = [trade_dt.date()+timedelta(days=i) for i in range(-3,4)]
# 根据 self.time_segments 构造出一周内的日期时间格式的交易时间段字典
week_seqments = {
dt:
[(datetime(dt.year,dt.month,dt.day,tzinfo=CHINA_TZ)+timedelta(days=days-(2 if days == -1 and dt.weekday()==0 else 0),seconds=start),
datetime(dt.year,dt.month,dt.day,tzinfo=CHINA_TZ)+timedelta(days=days-(2 if days == -1 and dt.weekday()==0 else 0)+(1 if start>stop else 0),seconds=stop))
for days,start,stop in self.time_segments]
for dt in dates if dt.weekday() not in [5,6]
}
trade_day,trading_segments = None,[]
# 在交易时间段字典中查找trade_dt所在交易时间段,确定所属交易日
for dt,datetime_segments in week_seqments.items():
# 遍历一周中的每日
for start,stop in datetime_segments:
# 遍历一日中的每个交易时间段
if start <= trade_dt < stop:
# 找到了,确定dt为trade_dt的交易日
trade_day = dt
break
if trade_day:
# 已经找到,停止
trading_segments = datetime_segments
break
return (trade_day,trading_segments)
def get_intraday_window(self,trade_dt:datetime,window:int) -> Tuple[date,List[Tuple[datetime,datetime]]]:
"""
得到一个时间的日内交易时间、窗口索引、窗口开始时间和截止时间
"""
trade_dt = trade_dt.replace(tzinfo=CHINA_TZ)
interval = Interval.MINUTE
oneday_minutes = self.day_trade_time(interval)
if window > oneday_minutes:
raise f"In day window can't exceed {oneday_minutes} minutes !"
result = (None,[])
if window == 0:
# window==0 无意义
return result
# 求dt的交易日
trade_day,segment_datetimes = self.get_trade_hours(trade_dt)
if not trade_day:
# 无效的交易日
return result
if np.sum([start <= trade_dt < stop for start,stop in segment_datetimes]) == 0:
# 如果dt不在各个交易时间段内为无效的交易时间
return result
# 交易日的开盘时间
t0 = segment_datetimes[0][0]
# 构造各个交易时间段的起止数组
starts = np.array([(seg_dt[0]-t0).seconds*1.0 for seg_dt in segment_datetimes])
stops = np.array([(seg_dt[1]-t0).seconds*1.0 for seg_dt in segment_datetimes])
# 求dt在交易日中的自然时间
nature_t = (trade_dt - t0).seconds
# 求dt已经走过的交易时间
traded_t = np.sum(nature_t - starts[starts<=nature_t]) - np.sum(nature_t-stops[stops<nature_t])
if traded_t < 0:
# 开盘之前的为无效交易时间
return result
# 求当前所在窗口的宽度、索引、开始交易时间及截止时间
window_width = window * INTERVAL_MAP[interval]
window_idx = np.floor(traded_t/window_width)
window_start = window_idx * window_width
window_stop = window_start + window_width
# 求各个交易时间段的宽度
segment_widths = stops - starts
# 求各个交易时间段累计日内交易时间
seg_sum =[np.sum(segment_widths[:(i+1)]) for i in range(len(segment_widths))]
if window_stop > seg_sum[-1]:
# 不可以跨日处理
window_stop = seg_sum[-1]
# 求当前窗口所在交易时间段的索引
start_in_seg = np.sum([seg_sum<=window_start])
stop_in_seg = np.sum([seg_sum<window_stop])
# 当前窗口的开始和截止时间
seconds1 = window_start - seg_sum[start_in_seg-1] if start_in_seg > 0 else window_start
seconds2 = window_stop - seg_sum[stop_in_seg-1] if stop_in_seg > 0 else window_stop
# print(f"seconds2={seconds2} stop_in_seg={stop_in_seg} window_stop={window_stop}")
start_time = segment_datetimes[start_in_seg][0] + timedelta(seconds = seconds1)
stop_time = segment_datetimes[stop_in_seg][0] + timedelta(seconds = seconds2)
window_segments = [(start if start > start_time else start_time,stop if stop < stop_time else stop_time)
for start,stop in segment_datetimes
if (start <= start_time < stop) or (start < stop_time <= stop)]
result = (trade_day,window_segments)
return result
def get_week_tradedays(self,trade_dt:datetime) -> List[date]:
""" 得到一个交易时间所在周的交易日 """
trade_dt = trade_dt.replace(tzinfo=CHINA_TZ)
trade_day,trade_segments = self.get_trade_hours(trade_dt)
if not trade_day:
return []
monday = trade_dt.date() - timedelta(days=trade_dt.weekday())
week_dates = [monday + timedelta(days=i) for i in range(5)]
if trade_day not in week_dates:
next_7days = [(trade_dt + timedelta(days=i+1)) for i in range(7)]
week_dates = [day.date() for day in next_7days if day.weekday() not in [5,6]]
return week_dates
def get_month_tradedays(self,trade_dt:datetime) -> List[date]:
""" 得到一个交易时间所在月的交易日 """
trade_dt = trade_dt.replace(tzinfo=CHINA_TZ)
trade_day,trade_segments = self.get_trade_hours(trade_dt)
if not trade_day:
return []
first_day = date(year=trade_day.year,month=trade_day.month,day=1)
this_month = trade_day.month
days32 = [first_day + timedelta(days = i) for i in range(32)]
month_dates = [day for day in days32 if day.weekday() not in [5,6] and day.month==this_month]
return month_dates
def get_year_tradedays(self,trade_dt:datetime) -> List[date]:
""" 得到一个交易时间所在年的交易日 """
trade_dt = trade_dt.replace(tzinfo=CHINA_TZ)
trade_day,trade_segments = self.get_trade_hours(trade_dt)
if not trade_day:
return []
new_years_day = date(year=trade_day.year,month=1,day=1)
this_year = trade_day.year
days366 = [new_years_day + timedelta(days = i) for i in range(366)]
trade_dates = [day for day in days366 if day.weekday() not in [5,6] and day.year==this_year]
return trade_dates
def has_night_tradetime(self) -> bool:
""" 有夜盘交易时间吗? """
for (days,start,stop) in self.time_segments:
if start >= 18*INTERVAL_MAP(Interval.HOUR):
return True
return False
def has_day_tradetime(self) -> bool:
""" 有日盘交易时间吗 ? """
for (days,start,stop) in self.time_segments:
if start < 18*INTERVAL_MAP(Interval.HOUR):
return True
return False
5. 日内对齐等交易时长K线生成器的实现
5.1 确定K线生成器MyBarGenerator的生成规则
5.1.1 一步到位地解决问题
先给它取个名称,就叫MyBarGenerator吧,它是对BarGenerator的扩展。
不过在构思MyBarGenerator的时候,我发现它其实不应该叫“日内对齐等交易时长K线生成器”。因为我们不应该只局限于日内的n分钟K线生成器,难道vnpy系统就不应该、不能够或者不使用日线以上的K线了吗?我们只能够使用日内K线进行量化交易吗?难道大家都没有过这方面的需求吗?我想答案是否定的。
那好,所幸就设计一个全功能的K线生成器:MyBarGenerator。
为此我们需要扩展Interval的定义,因为Interval是表示K线周期的常量,可是它的格局不够,最大只能到周一级WEEKLY。也就是说您用目前的Interval是没有办法表达月和年这样的周期的。
class Interval(Enum):
"""
Interval of bar data.
"""
MINUTE = "1m"
HOUR = "1h"
DAILY = "d"
WEEKLY = "w"
TICK = "tick"
MONTHLY = "month" # hxxjava add
YEARLY = "year" # hxxjava add
顺便在这里吐槽一下BarGenerator:
- 目前的BarData中包含了一个interval字段的,可是它在BarGenerator的时候根本就没有使用过,而使用它本是信手拈来的事情,但是没有却没有使用。如果不信,你可以去看看用它产生出来的bar的内容。
- 另外本来还应该增加一个秒单位(SECONDLY = "1s")的,这个单位其实对高频交易也是很有需求的,可是现在却没有。不知道大家对此有什么看法。
5.1.2 按周期对K线分类
在系统且并详细分析之后,把K线分类为:日内K线、日K线,周K线、月K线及年K线等周期K线五类。
1)日内K线包括1~n分钟K线,如1分钟、n分钟两类,其中n小于正常交易日的最大交易分钟数。日内K线取消对小时周期单位支持,因为可以通过n分钟的方式来实现。如:
- 1小时K线可以通过60分钟来表达
- 2小时K线可以通过120分钟来表达
- 4小时K线可以通过240分钟来表达
这么做的好处是:非常容易地实现90分钟的日内K线,而这是系统自带BarGenerator无法做到的。
2)日K线:每个交易日产生一个,它包含一到多个交易时间段。根据是否包含夜盘交易时间段,又可以分为跨日K线和不跨日K线。
3)周K线:由周一至周五中所有交易日的交易数据合成得到,它其实是一种特殊的n日K线,只是n<=5而已。
4)月K线:由每月1日至月末最后一个交易日的交易数据合成得到,除去所有周末,它最多包含23个交易日,遇到本月有长假日,其所包含的交易日会更少。
5)年K线:由每年1月1日至12月31日中的所有交易日的交易数据合成得到,除去所有周末。它可以理解为由一年中的所有交易日数据合成的,也可以理解为由一年中的12个月的交易日数据合成的。
5.1.3 确定K线生成规则:
1)日内K线(包括1~n分钟K线)生成规则:
- K线对齐交易日的开盘
- 等交易时长生成
- 忽略中间休市时间
- 不跨日生成,遇收市强行截止
- 周期单位必须为分钟,n小于日交易最大分钟数
2)日K线生成规则:
- 对齐其交易日的开盘时间
- 休市时间收到的数据为非法数据
- 交易日收盘时间生成或者在收到下收到大于收盘时间交易数据时生成
3)周K线生成规则:
- 对齐周一开盘时间
- 收到周一或者第一个交易日的日K线时创建
- 收到周二到四等交易日日K线时继续合成
- 收到周五或者下周交易日日K线时生成
4)月K线生成规则:
- 对齐当月1日的开盘时间,去除所有周末构成本月可能的交易日期
- 收到当月第一个交易日的日K线时创建
- 月K线创建后在未收到本月可能的交易日期的日K线时继续合成
- 收到月可能的交易日期的最后一个交易日K线或者下个月的第一交易日日K线时生成
5)年K线生成规则:
年K线可以由两种方式进行合成:一种是用日K线合成,另一种是用月K线合成。我们这里选择用日K线来合成。
- 年K线对齐每年的1月1日,从1月1日至12月31日,去除所有周末,构成所有的可能的交易日
- 遇到当年的第一个日K线时创建年K线
- 年K线创建后,在收到日K线的交易日期未到最后一个可能的交易日时继续合成
- 收到日K线的交易日为本年可能的交易日期或者下一年的交易日时生成
5.2 MyBargenerator的实现
在vnpy\usertools\utility.py中加入如下面的两个部分:
5.2.1 加入引用部分:
from typing import List,Dict,Tuple,Optional,Sequence,Callable
from datetime import date,datetime,timedelta
from vnpy.trader.constant import Interval
from vnpy.trader.object import TickData,BarData
from vnpy.trader.utility import extract_vt_symbol
from vnpy.usertools.trading_hours import TradingHours,in_segments
from vnpy.usertools.trade_hours import CHINA_TZ
5.2.2 MyBarGenerator的完整代码
class MyBarGenerator():
"""
An align bar generator.
Comment's for parameters:
on_bar : callback function on 1 minute bar is generated.
window : window bar's width.
on_window_bar : callback function on x interval bar is generated.
interval : window bar's unit.
trading_hours: trading hours with which the window bar can be generated.
"""
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE,
trading_hours:str = ""
):
""" Constructor """
self.bar: BarData = None
self.on_bar: Callable = on_bar
self.interval: Interval = interval
self.interval_count: int = 0
self.intra_day_bar: BarData = None
self.day_bar: BarData = None
self.week_bar: BarData = None
self.month_bar: BarData = None
self.year_bar: BarData = None
self.window: int = window
self.on_window_bar: Callable = on_window_bar
self.last_tick: TickData = None
if interval not in [Interval.MINUTE,Interval.DAILY,Interval.WEEKLY,Interval.MONTHLY,Interval.YEARLY]:
raise ValueError(f"MyBarGenerator support MINUTE,DAILY,WEEKLY,MONTHLY and YEARLY bar generation only , please check it !")
if not trading_hours:
raise ValueError(f"MyBarGenerator need trading hours setting , please check it !")
# trading hours object
self.trading_hours = TradingHours(trading_hours)
# current intraday window bar's contains trading day and time segments list
self.intraday_bar_window = (None,[]) # (trade_day,[])
# current daily bar's window containts trading day and time segment list
self.daily_bar_window = (None,[])
# current weekly bar's window containts all trade days
self.weekly_bar_window = []
# current monthly bar's window containts all trade days
self.monthly_bar_window = []
# current yearly bar's window containts all trade days
self.yearly_bar_window = []
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with older timestamp
if self.last_tick and tick.datetime < self.last_tick.datetime:
return
if not self.bar:
new_minute = True
elif (
(self.bar.datetime.minute != tick.datetime.minute)
or (self.bar.datetime.hour != tick.datetime.hour)
):
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
if tick.high_price > self.last_tick.high_price:
self.bar.high_price = max(self.bar.high_price, tick.high_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
if tick.low_price < self.last_tick.low_price:
self.bar.low_price = min(self.bar.low_price, tick.low_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
turnover_change = tick.turnover - self.last_tick.turnover
self.bar.turnover += max(turnover_change, 0)
self.last_tick = tick
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
if bar:
bar.datetime = bar.datetime.replace(tzinfo=CHINA_TZ)
if self.interval == Interval.MINUTE and self.window > 0:
# update inday bar
self.update_intraday_bar(bar)
elif self.interval in [Interval.DAILY,Interval.WEEKLY,Interval.MONTHLY,Interval.YEARLY]:
# update daily,weekly,monthly or yearly bar
self.update_daily_bar(bar)
def update_intraday_bar(self, bar: BarData) -> None:
""" update intra day x window bar """
if bar:
bar.datetime = bar.datetime.replace(tzinfo=CHINA_TZ)
if self.interval != Interval.MINUTE or self.window <= 1:
return
if self.intraday_bar_window == (None,[]):
# 首次调用日内K线更新函数
trade_day,time_segments = self.trading_hours.get_intraday_window(bar.datetime,self.window)
if (trade_day,time_segments) == (None,[]):
# 无效的1分钟K线
return
# 更新当前日内K线交易时间
self.intraday_bar_window = (trade_day,time_segments )
# 创建新的日内K线
self.intra_day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.MINUTE,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
if not in_segments(self.intraday_bar_window[1],bar.datetime):
# 1分钟K线不属于当前日内K线
trade_day,time_segments = self.trading_hours.get_intraday_window(bar.datetime,self.window)
if (trade_day,time_segments) == (None,[]):
# 无效的1分钟K线
return
# 当前日内K线已经生成,推送当前日内K线
if self.on_window_bar:
self.on_window_bar(self.intra_day_bar)
# 更新当前日内K线交易时间
self.intra_day_bar_window = (trade_day,time_segments)
# 创建新的日内K线
self.intra_day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.MINUTE,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
# 1分钟K线属于当前日内K线
# 更新当前日内K线
self.intra_day_bar.high_price = max(self.intra_day_bar.high_price,bar.high_price)
self.intra_day_bar.low_price = min(self.intra_day_bar.low_price,bar.low_price)
self.intra_day_bar.close_price = bar.close_price
self.intra_day_bar.open_interest = bar.open_interest
self.intra_day_bar.volume += bar.volume
self.intra_day_bar.turnover += bar.turnover
# 判断当前日内K线是否结束
close_time = self.intraday_bar_window[1][-1][1]
next_minute_dt = bar.datetime + timedelta(minutes=1)
if close_time <= next_minute_dt:
# 当前日K内线已经结束
# 当前日内K线已经生成,推送之
if self.on_window_bar:
self.on_window_bar(self.intra_day_bar)
self.intraday_bar_window = (None,[])
self.intra_day_bar = None
def update_daily_bar(self, bar: BarData) : # -> None:
""" update daily bar using 1 minute bar """
if bar:
bar.datetime = bar.datetime.replace(tzinfo=CHINA_TZ)
if self.daily_bar_window == (None,[]):
# 首次调用日K线更新函数
trade_day,trade_segments = self.trading_hours.get_trade_hours(bar.datetime)
if (trade_day,trade_segments) == (None,[]):
# 无效的1分钟K线
return
# 更新当前日K线交易时间
self.daily_bar_window = (trade_day,trade_segments)
# 创建新的日K线
self.day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.DAILY,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
if not in_segments(self.daily_bar_window[1],bar.datetime):
# 1分钟K线不属于当前日K线
trade_day,trade_segments = self.trading_hours.get_trade_hours(bar.datetime)
if (trade_day,trade_segments) == (None,[]):
# 无效的1分钟K线
return
# 当前日K线已经生成
if self.interval == Interval.DAILY:
# 推送当前日K线
if self.on_window_bar:
self.on_window_bar(self.day_bar)
else:
# 更新更大周期K线
self.update_weekly_bar(self.day_bar)
self.update_monthly_bar(self.day_bar)
self.update_yearly_bar(self.day_bar)
# 更新当前日K线交易时间
self.daily_bar_window = (trade_day,trade_segments)
# 创建新的日K线
self.day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.DAILY,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
# 1分钟K线属于当前交易日
# 更新当前日K线
self.day_bar.high_price = max(self.day_bar.high_price,bar.high_price)
self.day_bar.low_price = min(self.day_bar.low_price,bar.low_price)
self.day_bar.close_price = bar.close_price
self.day_bar.open_interest = bar.open_interest
self.day_bar.volume += bar.volume
self.day_bar.turnover += bar.turnover
# 判断当前日K线是否结束
close_time = self.daily_bar_window[1][-1][1]
next_minute_dt = bar.datetime + timedelta(minutes=1)
if close_time <= next_minute_dt:
# 当前日K线已经结束
# 当前日K线已经生成
if self.interval == Interval.DAILY:
# 推送当前日K线
if self.on_window_bar:
self.on_window_bar(self.day_bar)
else:
# 更新更大周期K线
self.update_weekly_bar(self.day_bar)
self.update_monthly_bar(self.day_bar)
self.update_yearly_bar(self.day_bar)
self.daily_bar_window = (None,[])
self.day_bar = None
def update_weekly_bar(self, bar: BarData) -> None:
""" update weekly bar using a daily bar """
if bar:
bar.datetime = bar.datetime.replace(tzinfo=CHINA_TZ)
if self.interval != Interval.WEEKLY:
# 设定周期单位不是周,不处理
return
if not self.weekly_bar_window:
# 首次调用周K线更新函数
week_tradedays = self.trading_hours.get_week_tradedays(bar.datetime)
if not week_tradedays:
# 无效的日K线
return
# 更新当前周K线交易日列表
self.weekly_bar_window = week_tradedays
# 创建新的周K线
self.week_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.WEEKLY,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
if bar.datetime not in self.weekly_bar_window:
# 日线不属于当前周K线
week_tradedays = self.trading_hours.get_week_tradedays(bar.datetime)
if not week_tradedays:
# 无效的日K线
return
# 当前周K线已经生成,推送
if self.on_window_bar:
self.on_window_bar(self.week_bar)
# 更新当前周K线交易日列表
self.weekly_bar_window = week_tradedays
# 创建新的周K线
self.week_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.WEEKLY,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
# 更新当前周K线
self.week_bar.high_price = max(self.week_bar.high_price,bar.high_price)
self.week_bar.low_price = min(self.week_bar.low_price,bar.low_price)
self.week_bar.close_price = bar.close_price
self.week_bar.open_interest = bar.open_interest
self.week_bar.volume += bar.volume
self.week_bar.turnover += bar.turnover
# 判断当前周K线是否结束
next_day_dt = bar.datetime + timedelta(days=1)
if next_day_dt > self.weekly_bar_window[-1]:
# 当前周K线已经结束,推送当前周K线
if self.on_window_bar:
self.on_window_bar(self.week_bar)
# 复位当前周交易日列表及周K线
self.weekly_bar_window = []
self.week_bar = None
def update_monthly_bar(self, bar: BarData) -> None:
""" update monthly bar using a daily bar """
if bar:
bar.datetime = bar.datetime.replace(tzinfo=CHINA_TZ)
if self.interval != Interval.MONTHLY:
# 设定周期单位不是月,不处理
return
if not self.monthly_bar_window:
# 首次调用月K线更新函数
month_tradedays = self.trading_hours.get_month_tradedays(bar.datetime)
if not month_tradedays:
# 无效的日K线
return
# 更新当前月K线交易日列表
self.monthly_bar_window = month_tradedays
# 创建新的月K线
self.month_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.MONTHLY,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
if bar.datetime not in self.monthly_bar_window:
# 日线不属于当前月K线
month_tradedays = self.trading_hours.get_month_tradedays(bar.datetime)
if not month_tradedays:
# 无效的日K线
return
# 当前月K线已经生成,推送
if self.on_window_bar:
self.on_window_bar(self.month_bar)
# 更新当前月交易日列表
self.monthly_bar_window = month_tradedays
# 创建新的月K线
self.month_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.MONTHLY,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
# 更新当前月K线
self.month_bar.high_price = max(self.month_bar.high_price,bar.high_price)
self.month_bar.low_price = min(self.month_bar.low_price,bar.low_price)
self.month_bar.close_price = bar.close_price
self.month_bar.open_interest = bar.open_interest
self.month_bar.volume += bar.volume
self.month_bar.turnover += bar.turnover
# 判断当前月K线是否结束
next_day_dt = bar.datetime + timedelta(days=1)
if next_day_dt > self.monthly_bar_window[-1]:
# 当前月K线已经结束,推送当前月K线
if self.on_window_bar:
self.on_window_bar(self.month_bar)
# 复位当前月交易日列表及月K线
self.monthly_bar_window = []
self.month_bar = None
def update_yearly_bar(self, bar: BarData) -> None:
""" update yearly bar using a daily bar """
if bar:
bar.datetime = bar.datetime.replace(tzinfo=CHINA_TZ)
if self.interval != Interval.YEARLY:
# 设定周期单位不是年,不处理
return
if not self.yearly_bar_window:
# 首次调用年K线更新函数
year_tradedays = self.trading_hours.get_year_tradedays(bar.datetime)
if not year_tradedays:
# 无效的日K线
return
# 更新当前年K线交易日列表
self.yearly_bar_window = year_tradedays
# 创建新的年K线
self.year_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.YEARLY,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
if bar.datetime not in self.yearly_bar_window:
# 日线不属于当前年K线
year_tradedays = self.trading_hours.get_year_tradedays(bar.datetime)
if not year_tradedays:
# 无效的日K线
return
# 当前年K线已经生成,推送
if self.on_window_bar:
self.on_window_bar(self.year_bar)
# 更新当前年交易日列表
self.yearly_bar_window = year_tradedays
# 创建新的年K线
self.year_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
interval=Interval.YEARLY,
datetime=bar.datetime.replace(tzinfo=CHINA_TZ),
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
# 更新当前年K线
self.year_bar.high_price = max(self.year_bar.high_price,bar.high_price)
self.year_bar.low_price = min(self.year_bar.low_price,bar.low_price)
self.year_bar.close_price = bar.close_price
self.year_bar.open_interest = bar.open_interest
self.year_bar.volume += bar.volume
self.year_bar.turnover += bar.turnover
# 判断当前年K线是否结束
next_day_dt = bar.datetime + timedelta(days=1)
if next_day_dt > self.yearly_bar_window[-1]:
# 当前年K线已经结束,推送当前年K线
self.on_window_bar(self.year_bar)
# 复位当前年交易日列表及年K线
self.yearly_bar_window = []
self.year_bar = None
def generate(self) -> Optional[BarData]:
"""
Generate the bar data and call callback immediately.
"""
bar = self.bar
if self.bar:
bar.datetime = bar.datetime.replace(second=0, microsecond=0)
self.on_bar(bar)
self.bar = None
return bar
6. 对集合竞价tick和休市期间收到的tick的特别处理
交易时间段是交易所对一个合约连续交易时间的规定,它只规定了在哪些时间段内市场是可以连续交易的,也就是说投资者交易开仓、平仓和撤单的。
但是交易时间段不包括一个合约交易的所有交易时间的规定,例如集合竞价时间段、日内中间休市时间段和交易日收盘休市时间段这三类时间段的规定。
6.1 集合竞价时间段
集合竞价时间段在交易日的开盘时间之前。能够该时间段的参与的投资者可能有资格的限制,就是说可能不是市场的参与者都有资格能够在在集合竞价时段中进行交易的。
而且不同市场,不同合约的集合竞价时间段的长度是不一样的,不同的交易日也可能不同,例如:
1)国内市场
- 国内期货,期权,有夜盘品种是20:55-21:00,遇有长假则位于日盘的第一个交易时段前5分钟;只有日盘品种是8:55-9:00。
- 国内股票,9:25-9:30,因为A股全部是日盘,所以没有长假带来的问题集合竞价发生变化的问题。
- 国内市场的集合竞价通常包括前4分钟为撮合成交,在开盘前1分钟推送一个包括集合竞价tick,它的last_price就是该交易日的开盘价。
2)国外市场
- WTI原油期货的Pre-Open时间,其实就是集合竞价时段。它更加复制,周日的盘前议价期为开盘前1小时,其他交易日的盘前议价期为开盘前15分钟。在此期间,客户可以输入、修改和撤销报单,但报单在该时段不会被撮合成交。此外,在盘前议价期快结束时,即开盘前30秒,不可以进行修改和撤销报单,但是可以下新的报单。所有报单在开盘后的连续交易时段才会被撮合成交。

总之,集合竞价时段变化多端,非常复杂,在K线时长上需要特别关注和处理,否则您生成的是什么K线,正确与否是无从谈起的。没准您多了个莫名其妙的K线都不知道。
6.2 集合竞价和日内的休市时间段对K线合成处理的影响
6.2.1 在一个交易日中,用户接口收到的交易所中的tick为4类:
- 上一交易日结算时间之后~本交易日集合竞价开始之前收到的tick,为垃圾无效tick数据;
- 本交易日集合竞价期间收到唯一个包含本交易日开盘价的tick,为集合竞价tick数据;
- 在各个连续竞价时间段收到的tick,为连续竞价tick数据;
- 在日内的各个休市时间段收到tick,为休市tick数据。
6.2.2 以上4类tick的在K线合成方面的不同处理
- 收到无效tick,直接做丢弃处理
- 收到连续竞价tick,进行正常K线合成处理
- 收到集合竞价tick,将其时间修改为集合竞价时间段的截止时间,之后与连续竞价tick一样处理
- 收到休市tick,将其时间修改为所在休市时间段的开始时间减去1毫秒,之后与连续竞价tick一样处理
6.2.3 也可以参考利用合约交易状态信息来处理集合竞价tick
这种特别处理请参考:分析一下盘中启动CTA策略带来的第一根$K线错误
7. 该K线生成器的使用
待续...