1. 等自然时长机制的K线生成器说明
交易日内等自然时长K线生成器,可以实现周以下的周期单位的时间就间隔:
x=window,interval的取:
Interval.MINUTE :x分钟宽度的K线,对齐交易日开始时间,不可以超过日交易时长
Interval.HOUR : x小时宽度的K线,对齐交易日开始时间,不可以超过日交易时长
Interval.DAILY : x日宽度的K线,对齐上市交易日开始时间开始的整x日,x可以任意值
Interval.WEEKLY : x周宽度的K线,对齐上市交易日开始时间整x周,x可以任意值
再策略中的应用举例:
- 等自然时长机制的30分钟K线创建:
self.bg = ENTBarGenerator(self.on_bar,30,self.on_30m_bar,interval=Interval.MINUTE,self.vt_symbol) - 等自然时长机制的2小时K线创建:
self.bg = ENTBarGenerator(self.on_bar,120,self.on_2h_bar,interval=Interval.MINUTE,self.vt_symbol)
或者:
self.bg = ENTBarGenerator(self.on_bar,2,self.on_2h_bar,interval=Interval.HOUR,self.vt_symbol) - 等自然时长机制的1日K线创建:
self.bg = ENTBarGenerator(self.on_bar,1,self.on_1d_bar,interval=Interval.DAILY,self.vt_symbol) - 等自然时长机制的1周K线创建:
self.bg = ENTBarGenerator(self.on_bar,5,self.on_1w_bar,interval=Interval.DAILY,self.vt_symbol)
或者:
self.bg = ENTBarGenerator(self.on_bar,1,self.on_1w_bar,interval=Interval.WEEKLY,self.vt_symbol)
2. 先实现等自然时长机制的K线生成器
保存文件:vnpy\usertools\equal_nature_time.py
内容如下:
"""
实现一个等自然时长机制的K线生成器
作者:hxxjava
日期:2020-9-2
"""
from typing import Callable,List,Dict, Tuple, Union
from vnpy.app.cta_strategy import BarGenerator
from vnpy.trader.constant import Interval
from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.object import BarData
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")
from vnpy.usertools.trade_hour import (
TradeHours,
Timeunit,
get_listed_date,
get_de_listed_date,
)
import datetime
import rqdatac as rq
class MyError(Exception):
def __init__(self,ErrorInfo):
super().__init__(self) #初始化父类
self.errorinfo=ErrorInfo
def __str__(self):
return self.errorinfo
class ENTBarGenerator(BarGenerator):
"""
交易日内等自然时长K线生成器,可以实现周以下的周期单位的时间就间隔:
x=window,interval的取:
Interval.MINUTE :x分钟宽度的K线,对齐交易日开始时间,不可以超过日交易时长
Interval.HOUR : x小时宽度的K线,对齐交易日开始时间,不可以超过日交易时长
Interval.DAILY : x日宽度的K线,对齐上市交易日开始时间开始的整x日,x可以任意值
Interval.WEEKLY : x周宽度的K线,对齐上市交易日开始时间整x周,x可以任意值
ENT:equal nature time ———— 等自然时长的缩写
"""
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE,
vt_symbol:str=''
):
super().__init__(on_bar,window,on_window_bar,interval)
symbol,exchange = extract_vt_symbol(vt_symbol)
self.trade_hours = TradeHours(symbol.upper())
self.trade_start,self.trade_stop = None,None
self.bar_width = self.get_bar_witdh()
self.bar_index = None
if self.interval in [Interval.DAILY,Interval.WEEKLY]:
self.kx_start,self.kx_stop = (None,None)
def get_bar_witdh(self):
"""
求window_bar的宽度
"""
# 求每日的交易时长
TTPD = self.trade_hours.get_trade_time_perday()
try:
if self.interval == Interval.MINUTE:
# 以分钟作为单位
bar_width = self.window
if bar_width>TTPD:
raise MyError(f'window_bar宽度不可以大于1日的交易总时长')
elif self.interval == Interval.HOUR:
# 以小时作为单位
bar_width = self.window*60
if bar_width>TTPD:
raise MyError(f'window_bar宽度不可以大于1日的交易总时长')
elif self.interval == Interval.DAILY:
# 以日作为单位
bar_width = self.window * TTPD
elif self.interval == Interval.WEEKLY:
# 以周作为单位
bar_width = self.window * TTPD * 5
else:
raise MyError(f'不可以使用{self.interval}作为interval参数')
return bar_width
except MyError as e:
print(e)
def get_bar_index(self,bar:BarData):
"""
计算当前分钟bar所属window_bar的日内索引
"""
time_diff = bar.datetime - self.trade_start
days = time_diff.days
sec = time_diff.seconds
us = time_diff.microseconds
minutes = (days*24*3600 +sec+us*0.000001)/60.0
index = int(minutes/self.bar_width)
return index
def update_bar(self,bar:BarData) -> None:
if self.interval in [Interval.MINUTE,Interval.HOUR]:
self.update_bar1(bar)
elif self.interval in [Interval.DAILY,Interval.WEEKLY]:
self.update_bar2(bar)
def update_bar1(self,bar:BarData) -> None:
"""
Update 1 minute bar into generator
"""
# 如果bar的时间戳笔windows的时间戳还早,丢弃bar
if self.window_bar and bar.datetime < self.window_bar.datetime:
return
in_trade,(trade_start,trade_stop) = self.trade_hours.get_date_start_stop(bar.datetime)
if not in_trade:
# 无效K线,不可以处理
# print(f"bar.datetime= {bar.datetime} 无效K线,不可以处理")
return
if (self.trade_start,self.trade_stop) == (None,None):
# 计算当日的开始和停止交易时间
self.trade_start,self.trade_stop = trade_start,trade_stop
elif trade_start != self.trade_start:
# 判断是否跨日,更新当日的开始和停止交易时间
self.trade_start,self.trade_stop = trade_start,trade_stop
# print(f"!1 {self.trade_start,self.trade_stop}")
# 计算当前window_bar的日内索引
bar_index = self.get_bar_index(bar)
if self.bar_index is None or bar_index != self.bar_index:
# 产生新window_barK线
self.bar_index = bar_index
if self.window_bar:
# 推送已经走完的window_bar
self.on_window_bar(self.window_bar)
self.window_bar = None
# 计算K线的交易起止时间
bar_datetime = self.trade_start + datetime.timedelta(minutes=self.bar_index*self.bar_width)
# 生成新的window_bar
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime= bar_datetime,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
# 更新window_bar的high和low
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
# 更新 close price/volume到window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
def update_bar2(self,bar:BarData) -> None:
"""
Update 1 minute bar into generator
"""
# 如果bar的时间戳笔windows的时间戳还早,丢弃bar
if self.window_bar and bar.datetime < self.window_bar.datetime:
return
if (self.kx_start,self.kx_stop) == (None,None):
self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
if (self.kx_start,self.kx_stop) == (None,None):
return
# If not inited, creaate window bar object
if (not self.window_bar):
# 获得K线的交易起止时间
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=self.kx_start,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
elif self.kx_start <= bar.datetime and bar.datetime < self.kx_stop:
# 1分钟K线属于当前K线
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
elif bar.datetime >= self.kx_stop: # Check if window bar completed
self.on_window_bar(self.window_bar)
self.window_bar = None
self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
if (self.kx_start,self.kx_stop) == (None,None):
# 不在交易时段
return
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=self.kx_start,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
# 下面是ENTBarGenerator类的测试代码
def test():
from vnpy.trader.constant import Exchange
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")
def on_bar(bar: BarData):
pass
def on_xbar_bar(bar: BarData):
print(bar)
bar_gen = ENTBarGenerator(on_bar,30,on_xbar_bar,Interval.MINUTE,vt_symbol='rb2010.SHFE')
dt0 = CHINA_TZ.localize(datetime.datetime(2020,8,31,9,20,1))
for i in range(60*24):
# print(f"{[dt0 + datetime.timedelta(minutes=i)]}")
bar = BarData(
gateway_name = 'CTP',
symbol = 'rb2010',
exchange = Exchange.SHFE,
datetime = dt0 + datetime.timedelta(minutes=i),
interval=Interval.MINUTE,
volume=2,
open_price=3560.0,
high_price= 3565.0,
close_price=3558.0,
low_price=3555.0,
open_interest=10,
)
bar_gen.update_bar(bar)
if __name__ == "__main__":
# RQDatac初始化
rq.init('xxxxxx','******',("rqdatad-pro.ricequant.com",16011))
test()