最近在做日线和周线合成的时候,遇到不少坑,特意做了一下改进,直接上代码:
def the_end_window_datetime(dt: datetime,
window: int = 1,
interval: Interval = Interval.MINUTE,
delta_tuple: tuple = None) -> datetime:
"""获取当前周期的最后时间"""
if interval.MINUTE == interval:
nsecs = dt.minute * 60 + dt.second + dt.microsecond * 1e-6
window_sec = 60 * window
elif interval.HOUR == interval:
nsecs = dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond * 1e-6
window_sec = 3600 * window
elif interval.DAILY == interval:
nsecs = dt.day * 86400 + dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond * 1e-6
window_sec = 86400 * window
elif interval.WEEKLY == interval:
nsecs = dt.weekday() * 86400 + dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond * 1e-6
window_sec = 604800 * window
shift = 0
if delta_tuple:
days, hours, minutes, seconds = delta_tuple
shift = days * 86400 + hours * 3600 + minutes * 60 + seconds
delta = math.ceil((nsecs - shift) / window_sec) * window_sec + shift - nsecs
return dt + timedelta(seconds = delta)
#----------------------------------------------------------------------
# 自定义 BarGenerator
#----------------------------------------------------------------------
class AzBarGenerator(BarGenerator):
"""
一般周期生成器,由于无法保证 window 结束时间的确定性,因此合并前检测
如:国内期货上午 10:15 - 10:30 休市,因此就无法保证 30min 周期的结束时间有数据
所以只能通过下一个数据是否跨越了 window 的结束时间来判定
"""
def __init__(self,
on_bar: Callable,
window: int = 1,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE):
super(AzBarGenerator, self).__init__(
on_bar, window, on_window_bar, interval)
# 调整为结束时间戳,方便后续处理
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
if not self.bar:
new_minute = True
elif tick.datetime > self.bar.datetime:
# 不属于同一个 window 了,生成 bar
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol = tick.symbol,
exchange = tick.exchange,
interval = Interval.MINUTE,
# 标记此 window 的结束时间戳
datetime = the_end_window_datetime(tick.datetime),
gateway_name = tick.gateway_name,
open_price = tick.last_price,
high_price = tick.last_price,
low_price = tick.last_price,
close_price = tick.last_price,
open_interest = tick.open_interest)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
def _update_window_bar(self, bar: BarData) -> None:
"""更新 window bar 数据"""
if not self.window_bar:
dt = self.normalize_window_bar_datetime(bar.datetime)
self.window_bar = BarData(
symbol = bar.symbol,
exchange = bar.exchange,
datetime = dt,
gateway_name = bar.gateway_name,
open_price = bar.open_price,
high_price = bar.high_price,
low_price = bar.low_price)
# Otherwise, update high/low price into window bar
else:
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
def update_bar(self, bar: BarData) -> None:
"""update bar into generator"""
# Check if window bar completed
if self.check_window_bar_finished(bar):
self.on_window_bar(self.window_bar)
self.window_bar = None
# 更新 window bar
self._update_window_bar(bar)
# Cache last bar object
self.last_bar = bar
def check_window_bar_finished(self, bar: BarData):
"""检查 window bar 是否完成"""
if self.window_bar and bar.datetime > self.window_bar.datetime:
return True
else:
return False
def normalize_window_bar_datetime(self, dt: datetime):
"""标准化 window bar datetime 数据"""
return the_end_window_datetime(dt, self.window, self.interval)