VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 3
声望: 0

"""
period.py
除年、季外,支持月、周、日、分钟的任意周期K线的合成
本模块位于vnpy\trader\目录下,并且要求在同目录下object.py中的TickData、BarData中增加成员 tradingday: datetime
"""

from datetime import time, datetime, timedelta
from operator import le
from vnpy.trader.object import BarData, TickData
from enum import Enum

"""
字典key是期货品种的全小写代码,value是一个元组,分别表示:区分大小写的期货品种代码,品种中文名称,所属交易所,倍率,单位跳价,交易时段编号
其中的交易时段编号,为G_TRADESEG元组的元素编号,编号从0开始
"""
G_PRODUCT: dict = {
"hw": ("HW", "强麦", "CZCE", 20, 1, 0),
"pm": ("PM", "普麦", "CZCE", 50, 1, 0),
"cf": ("CF", "棉花", "CZCE", 5, 5, 1),
"cy": ("CY", "棉纱", "CZCE", 5, 5, 1),
"sr": ("SR", "白糖", "CZCE", 10, 1, 1),
"ta": ("TA", "PTA", "CZCE", 5, 2, 1),
"oi": ("OI", "菜籽油", "CZCE", 10, 2, 1),
"ri": ("RI", "早籼稻", "CZCE", 20, 1, 0),
"ma": ("MA", "甲醇", "CZCE", 10, 1, 1),
"fg": ("FG", "玻璃", "CZCE", 20, 1, 1),
"rs": ("RS", "油菜籽", "CZCE", 10, 1, 0),
"rm": ("RM", "菜粕", "CZCE", 10, 1, 1),
"zc": ("ZC", "动力煤", "CZCE", 100, 0.2, 1),
"jr": ("JR", "粳稻", "CZCE", 20, 1, 0),
"lr": ("LR", "晚籼稻", "CZCE", 20, 1, 0),
"sf": ("SF", "硅铁", "CZCE", 5, 2, 0),
"sm": ("SM", "锰硅", "CZCE", 5, 2, 0),
"ap": ("AP", "苹果", "CZCE", 10, 1, 0),
"cj": ("CJ", "红枣", "CZCE", 5, 5, 0),
"pk": ("PK", "花生", "CZCE", 5, 2, 0),
"ur": ("UR", "尿素", "CZCE", 20, 1, 0),
"sa": ("SA", "纯碱", "CZCE", 20, 1, 1),
"c": ("c", "玉米", "DCE", 10, 1, 1),
"cs": ("cs", "淀粉", "DCE", 10, 1, 1),
"a": ("a", "大豆一号", "DCE", 10, 1, 1),
"b": ("b", "大豆二号", "DCE", 10, 1, 1),
"m": ("m", "豆粕", "DCE", 10, 1, 1),
"y": ("y", "豆油", "DCE", 10, 2, 1),
"p": ("p", "棕榈油", "DCE", 10, 2, 1),
"lh": ("lh", "生猪", "DCE", 16, 5, 0),
"jd": ("jd", "鸡蛋", "DCE", 10, 1, 0),
"bb": ("bb", "胶合板", "DCE", 500, 0.05, 0),
"fb": ("fb", "纤维板", "DCE", 500, 0.05, 0),
"l": ("l", "塑料", "DCE", 5, 1, 1),
"v": ("v", "PVC", "DCE", 5, 1, 1),
"pp": ("pp", "聚丙烯", "DCE", 5, 1, 1),
"eg": ("eg", "乙二醇", "DCE", 10, 1, 1),
"eb": ("eb", "苯乙烯", "DCE", 5, 1, 1),
"pg": ("pg", "LPG", "DCE", 20, 1, 1),
"j": ("j", "焦炭", "DCE", 100, 0.5, 1),
"jm": ("jm", "焦煤", "DCE", 60, 0.5, 1),
"i": ("i", "铁矿石", "DCE", 100, 0.5, 1),
"rr": ("rr", "粳米", "DCE", 10, 1, 1),
"cu": ("cu", "铜", "SHFE", 5, 10, 3),
"al": ("al", "铝", "SHFE", 5, 5, 3),
"zn": ("zn", "锌", "SHFE", 5, 5, 3),
"pb": ("pb", "铅", "SHFE", 5, 5, 3),
"ni": ("ni", "镍", "SHFE", 1, 10, 3),
"sn": ("sn", "锡", "SHFE", 1, 10, 3),
"ss": ("ss", "不锈钢", "SHFE", 5, 5, 3),
"au": ("au", "黄金", "SHFE", 1000, 0.05, 4),
"ag": ("ag", "白银", "SHFE", 15, 1, 4),
"rb": ("rb", "螺纹钢", "SHFE", 10, 1, 1),
"wr": ("wr", "线材", "SHFE", 10, 1, 0),
"hc": ("hc", "热卷", "SHFE", 10, 1, 1),
"fu": ("fu", "燃油", "SHFE", 10, 1, 1),
"bu": ("bu", "沥青", "SHFE", 10, 2, 1),
"ru": ("ru", "橡胶", "SHFE", 10, 5, 1),
"sp": ("sp", "纸浆", "SHFE", 10, 2, 1),
"if": ("IF", "沪深300", "CFFEX", 300, 0.2, 5),
"ih": ("IH", "上证50", "CFFEX", 300, 0.2, 5),
"ic": ("IC", "中证500", "CFFEX", 200, 0.2, 5),
"ts": ("TS", "二债", "CFFEX", 10000, 0.005, 6),
"tf": ("TF", "五年国债", "CFFEX", 10000, 0.005, 6),
"t": ("T", "十年国债", "CFFEX", 10000, 0.005, 6),
"sc": ("sc", "原油", "INE", 1000, 0.1, 4),
"lu": ("lu", "低硫燃油", "INE", 10, 1, 1),
"nr": ("nr", "20号胶", "INE", 10, 5, 1)
}
"""
交易时段
"""
G_TRADESEG: tuple = (((time(9, 0), time(10, 15)), (time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),

                 # 期货日盘交易时段

                 ((time(21, 0), time(23, 0)), (time(9, 0), time(10, 15)),
                  (time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),
                 # 期货夜盘交易时段

                 ((time(21, 0), time(23, 30)), (time(9, 0), time(10, 15)),
                  (time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),
                 # 期货夜盘交易时段,曾经使用过,目前国内已经没有交易所采用

                 ((time(21, 0), time(1, 0)), (time(9, 0), time(10, 15)),
                  (time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),
                 # 有色金属交易时段

                 ((time(21, 0), time(2, 30)), (time(9, 0), time(10, 15)),
                  (time(10, 30), time(11, 30)), (time(13, 30), time(15, 0))),
                 # 贵金属交易时段

                 ((time(9, 30), time(11, 30)), (time(13, 0), time(15, 0))),
                 # 股票、股指交易时段

                 ((time(9, 15), time(11, 30)), (time(13, 0), time(15, 15))),
                 # 国债期货交易时段

                 ((time(0, 0), time(0, 0)),)
                 # 24小时连续交易,目前国内没有
                 )


def MinuteGap(StartTime: time, EndTime: time) -> int:
"""计算两个时间相隔的分钟数,如果开始时间大于结束时间,则表示隔夜了,如果开始时间等于结束时间则表示相差一整天"""
tmp: int = EndTime.hour
gap: int = 0
if EndTime < StartTime:
tmp = EndTime.hour + 24
if EndTime == StartTime:
gap = 2460
else:
gap = (tmp
60+EndTime.minute) - (StartTime.hour*60+StartTime.minute)
return gap

def nMinuteLater(point: time, m: int) -> time:
"""从给定的时间point开始之后的m分钟,取值必须为正整数,返回time。如果超过24:00则自动接续到凌晨"""
totalmin: int = point.hour60+point.minute
m = m % (24
60)
totalmin += m
if totalmin >= 2460:
totalmin -= 24
60
return time(int(totalmin/60), totalmin % 60)

def GetTimeGapSerial(tradeseg: tuple, gap: int) -> list:
"""给定交易时段以及K线的分钟周期,返回该周期下的时间分段序列。只适用自定义分钟周期
tradeseg为G_TRADESEG中的元素,其中gap为分钟周期,因此不得大于等于全天交易的总分钟时长
如GetTimeGapSerial(G_TRADESEG[1], 60)则返回如下的60分钟K线的分段时间序列:
[21:00:00 , 22:00:00],
[22:00:00 , 23:00:00],
[09:00:00 , 10:00:00],
[10:00:00 , 11:15:00],
[11:15:00 , 14:15:00],
[14:15:00 , 15:00:00]]
"""
totalminute: int = 0
tgs: list = []
for t in tradeseg:
totalminute += MinuteGap(t[0], t[1]) # 计算交易总时长分钟数
if gap >= totalminute:
return tgs # K线分钟周期大于等于当日交易总时长,直接返回
gapremain: int = gap
for t in tradeseg:
subseg: int = MinuteGap(t[0], t[1])
if gapremain == gap:
tgs.append([t[0], t[1]])

    if gapremain > subseg:
        gapremain -= subseg
        tgs[-1][1] = t[1]
    elif gapremain == subseg:
        gapremain = gap
        tgs[-1][1] = t[1]
    else:
        postime: time = nMinuteLater(t[0], gapremain)
        tgs[-1][1] = postime
        tgs.append([postime, t[1]])
        subsegremain: int = subseg-gapremain
        gapremain = gap
        while subsegremain > 0:
            if gapremain <= subsegremain:
                postime = nMinuteLater(tgs[-1][0], gapremain)
                tgs[-1][1] = postime
                if gapremain < subsegremain:
                    tgs.append([postime, t[1]])
                subsegremain -= gapremain
                gapremain = gap
            else:
                gapremain -= subsegremain
                subsegremain = 0
return tgs


def isSameWeek(d1: datetime, d2: datetime) -> bool:
"""判断两个日期是否在同一周内"""
return (d1-timedelta(d1.weekday())).date() == (d2-timedelta(d2.weekday())).date()

def isSameMonth(d1: datetime, d2: datetime) -> bool:
"""判断两个日期是否在同一个月内"""
return (d1.year == d2.year) and (d1.month == d2.month)

def isInSameTimeSeg(timeseg: list, dt: datetime, tradeseg: tuple = ()) -> bool:
"""
判断给定的时间dt是否属于timeseg时段内
任意分钟K线的分段,可能会隔夜横跨两个日期,因此判断一个时间是否属于当前K线时间段内需要专门处理
"""
ret: bool = False
isrightborder: bool = False
for seg in tradeseg:
if seg[1] == dt.time():
isrightborder = True # 为了将各交易时间分段的最后一个tick数据正确归入K线
break
if timeseg[1] > timeseg[0]:
if isrightborder:
if dt.time() >= timeseg[0] and dt.time() <= timeseg[1]:
ret = True
else:
if dt.time() >= timeseg[0] and dt.time() < timeseg[1]:
ret = True
else:
if isrightborder:
if (dt.time() >= timeseg[0] and dt.time() >= timeseg[1]) or\
(dt.time() <= timeseg[0] and dt.time() <= timeseg[1]):
ret = True
else:
if (dt.time() >= timeseg[0] and dt.time() > timeseg[1]) or\
(dt.time() <= timeseg[0] and dt.time() < timeseg[1]):
ret = True
return ret

def GetSeg(tradesplitseg: list, dt: datetime) -> list:
"""给定时间分段序列以及时间点,返回该时间点所属的时段,用于自定义分钟周期的当前时段确定"""
for timeseg in tradesplitseg:
if isInSameTimeSeg(timeseg, dt):
return timeseg
return []

def isSameSeason(d1: datetime, d2: datetime) -> bool:
"""判断两个日期是否在同一季节内"""
rt: bool = False
if d1.year == d2.year:
if d1.month >= 1 and d1.month <= 3 and d2.month >= 1 and d2.month <= 3:
rt = True
elif d1.month >= 4 and d1.month <= 6 and d2.month >= 4 and d2.month <= 6:
rt = True
elif d1.month >= 7 and d1.month <= 9 and d2.month >= 7 and d2.month <= 9:
rt = True
elif d1.month >= 10 and d1.month <= 12 and d2.month >= 10 and d2.month <= 12:
rt = True
return rt

class Level(Enum):
"""
K线合并的周期级别:年、季、月、周、日、分钟
"""
YEAR = "year"
SEASON = "season"
MONTH = "month"
WEEK = "week"
DAY = "day"
MINUTE = "minute"

class MergedBarData:
level: Level
num: int
count: int = 1
bar: BarData
startdatetime: datetime
enddatetime: datetime

class MergeBar():
"""K线合并"""

def __init__(self, vt_product: str, level: Level,  num: int = 1) -> None:
    """
    参数vt_product为期货品种代码
    参数level为要合并的K线的周期基数,参数num为level基数下周期数量,
    如level=Level.MINUTE, num=12,表示合成12分钟线
    如level=Level.MINUTE, num=60,表示合成60分钟线,即1小时线
    如level=Level.DAY, num=10,表示合成10日线
    其中 月、周、日、分钟支持任意数量的K线合并;年、季则只支持num=1的K线合并
    """
    self.vt_product: str = vt_product.lower()
    self.productinfo: tuple = ()
    self.tradeseg: tuple = ()
    self.tradesplitseg: list = []
    self.bars: list(MergedBarData) = []
    self.level: Level = level
    self.num: int = num

    if self.vt_product in G_PRODUCT:
        self.productinfo = G_PRODUCT[self.vt_product]
        self.tradeseg = G_TRADESEG[self.productinfo[5]]
        self.tradesplitseg = GetTimeGapSerial(self.tradeseg, self.num)
    else:
        self.tradesplitseg = GetTimeGapSerial(
            G_TRADESEG[1], self.num)  # 默认夜盘

def MergeFromTick(self, tick: TickData):
    """实盘行情推送时调用,或者有tick历史数据的回测也可以调用"""
    bar: BarData = BarData()

    bar.gateway_name = tick.gateway_name
    bar.extra = tick.extra
    bar.symbol = tick.symbol
    bar.exchange = tick.exchange
    bar.datetime = tick.datetime
    bar.tradingday = tick.tradingday
    # bar.interval
    bar.volume = tick.volume
    bar.turnover = tick.turnover
    bar.open_interest = tick.open_interest
    bar.open_price = tick.last_price
    bar.high_price = tick.last_price
    bar.low_price = tick.last_price
    bar.close_price = tick.last_price

    self.MergeFromBar(bar)

def MergeFromBar(self, bar: BarData):
    """没有tick数据时调用,如历史数据回测,K线必须是1分钟的K线,否则无法合成任意分钟的K线"""
    if self.level == Level.YEAR:
        self._MergeYear(bar)
    elif self.level == Level.SEASON:
        self._MergeSeason(bar)
    elif self.level == Level.MONTH:
        self._MergeMonth(bar)
    elif self.level == Level.WEEK:
        self._MergeWeek(bar)
    elif self.level == Level.DAY:
        self._MergeDay(bar)
    elif self.level == Level.MINUTE:
        self._MergeMinute(bar)

def MergeFromDayBar(self, bar: BarData):
    if self.level == Level.YEAR:
        self._MergeYear(bar)
    elif self.level == Level.SEASON:
        self._MergeSeason(bar)
    elif self.level == Level.MONTH:
        self._MergeMonth(bar)
    elif self.level == Level.WEEK:
        self._MergeWeek(bar)
    elif self.level == Level.DAY:
        self._MergeDay(bar)

def _SuckIn(self, suckbar: BarData, foodbar: BarData) -> BarData:
    suckbar.high_price = max(suckbar.high_price, foodbar.high_price)
    suckbar.low_price = min(suckbar.low_price, foodbar.low_price)
    suckbar.close_price = foodbar.close_price
    suckbar.tradingday = foodbar.tradingday
    suckbar.open_interest = foodbar.open_interest
    suckbar.turnover += foodbar.turnover
    suckbar.volume += foodbar.volume
    return suckbar

def _SuckNew(self, foodbar: BarData) -> MergedBarData:
    mergebar = MergedBarData()
    mergebar.level = self.level
    mergebar.num = self.num
    mergebar.count = 1
    mergebar.bar = foodbar
    return mergebar

def _MergeYear(self, bar: BarData):
    """由于只支持1年的K线合并,因此不对self.num做判断"""
    # self.num = 1
    new: bool = True
    if len(self.bars) > 0:
        if self.bars[-1].startdatetime.year == bar.tradingday.year:
            # 同一年
            self.bars[-1].enddatetime = bar.tradingday
            self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
            new = False
    if new:
        mergebar = self._SuckNew(bar)
        mergebar.startdatetime = bar.tradingday
        mergebar.enddatetime = bar.tradingday
        self.bars.append(mergebar)

def _MergeSeason(self, bar: BarData):
    """由于只支持1季的K线合并,因此不对self.num做判断"""
    # self.num = 1
    new: bool = True
    if len(self.bars) > 0:
        if isSameSeason(self.bars[-1].startdatetime, bar.tradingday):
            # 同一季
            self.bars[-1].enddatetime = bar.tradingday
            self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
            new = False
    if new:
        mergebar = self._SuckNew(bar)
        mergebar.startdatetime = bar.tradingday
        mergebar.enddatetime = bar.tradingday
        self.bars.append(mergebar)

def _MergeMonth(self, bar: BarData):
    new: bool = True
    if len(self.bars) > 0:
        if self.bars[-1].startdatetime.year == bar.tradingday.year:
            if (self.num == self.bars[-1].count) and\
                    (self.bars[-1].enddatetime.month == bar.tradingday.month):
                self.bars[-1].enddatetime = bar.tradingday
                self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
                new = False
            if (self.num > self.bars[-1].count):
                if self.bars[-1].enddatetime.month != bar.tradingday.month:
                    self.bars[-1].count += 1
                self.bars[-1].enddatetime = bar.tradingday
                self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
                new = False
    if new:
        mergebar = self._SuckNew(bar)
        mergebar.startdatetime = bar.tradingday
        mergebar.enddatetime = bar.tradingday
        self.bars.append(mergebar)

def _MergeWeek(self, bar: BarData):
    new: bool = True
    if len(self.bars) > 0:
        if self.bars[-1].startdatetime.year == bar.tradingday.year:
            if (self.num == self.bars[-1].count) and\
                    isSameWeek(self.bars[-1].enddatetime, bar.tradingday):
                self.bars[-1].enddatetime = bar.tradingday
                self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
                new = False
            if (self.num > self.bars[-1].count):
                if not isSameWeek(self.bars[-1].enddatetime, bar.tradingday):
                    self.bars[-1].count += 1
                self.bars[-1].enddatetime = bar.tradingday
                self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
                new = False
    if new:
        mergebar = self._SuckNew(bar)
        mergebar.startdatetime = bar.tradingday
        mergebar.enddatetime = bar.tradingday
        self.bars.append(mergebar)

def _MergeDay(self, bar: BarData):
    new: bool = True
    if len(self.bars) > 0:
        if self.bars[-1].startdatetime.year == bar.tradingday.year:
            if (self.num == self.bars[-1].count) and\
                    (self.bars[-1].enddatetime.date() == bar.tradingday.date()):
                self.bars[-1].enddatetime = bar.tradingday
                self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
                new = False
            if (self.num > self.bars[-1].count):
                if self.bars[-1].enddatetime.date() != bar.tradingday.date():
                    self.bars[-1].count += 1
                self.bars[-1].enddatetime = bar.tradingday
                self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
                new = False
    if new:
        mergebar = self._SuckNew(bar)
        mergebar.startdatetime = bar.tradingday
        mergebar.enddatetime = bar.tradingday
        self.bars.append(mergebar)

def _MergeMinute(self, bar: BarData):
    new: bool = True
    if len(self.bars) > 0:
        if self.bars[-1].bar.tradingday.date() == bar.tradingday.date():
            if isInSameTimeSeg([self.bars[-1].startdatetime.time(),
                                self.bars[-1].enddatetime.time()],
                               bar.datetime,
                               self.tradeseg):
                self.bars[-1].enddatetime = self.bars[-1].enddatetime.replace(
                    year=bar.datetime.year,
                    month=bar.datetime.month,
                    day=bar.datetime.day)
                self.bars[-1].bar = self._SuckIn(self.bars[-1].bar, bar)
                new = False
    if new:
        ts = GetSeg(self.tradesplitseg, bar.datetime)
        if ts:
            mergebar = self._SuckNew(bar)
            mergebar.startdatetime = bar.datetime
            mergebar.enddatetime = bar.datetime

            mergebar.startdatetime = mergebar.startdatetime.replace(
                hour=ts[0].hour, minute=ts[0].minute)
            mergebar.enddatetime = mergebar.enddatetime.replace(
                hour=ts[1].hour, minute=ts[1].minute)
            self.bars.append(mergebar)
Member
avatar
加入于:
帖子: 3
声望: 0

def MinuteGap(StartTime: time, EndTime: time) -> int:
def nMinuteLater(point: time, m: int) -> time:
两个函数中 2460中间的乘号丢失,应是 24 * 60

Member
avatar
加入于:
帖子: 6
声望: 0

马克

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】