VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 419
声望: 170

VNPY系统CTA策略中的BarGenerater的问题:

当K线周期为1,5,15分钟或者1日,1周时,它是没有问题的
当K线周期为10,20,30分钟或者一些自定义分钟周期如7分钟,还有1、2,4小时,由于合约的交易时段的不规则,导致某些K线的周期于其时间发生的交易时间不想等。
你开启CTA策略的时机是随机的,这导致self.laod_bar(20)的执行也是随机的,它从米筐或者数据库加载的1分钟数据也是随机,最终导致你所产生的上述K线也是随机的。那你已经这样的K线数据计算出来的指标在某种程度上可能也随机的。

问题出在合约的交易时间上

不赘述原因了,举例吧:
RB2010交易时间段:'21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00'
假如你的K线每天从21:01开始计算K线,
如果K线的周期为10分钟,那么在10:10-10:20的那个K线其实只交易了5分钟
如果K线的周期为20分钟,那么在10:00-10:20的那个K线其实只交易了15分钟,在10:20-10:40的那个K线其实只交易了10分钟
如果K线的周期为30分钟,那么在10:00-10:30的那个K线其实只交易了15分钟
如果K线的周期为60分钟,那么在10:00-11:00的那个K线其实只交易了45分钟
如果K线的周期为120分钟,那么在10:00-14:00的那个K线其实只交易了105分钟

K线应该是从初始化策略之时往前计算,还是从上市日期开始计算?

如果BarGenerator采用等交易时长产生K线,策略初始化时通过load_bar(n),读取1分钟历史K线,目前BarGenerator时是从n日之前的第一个1分钟K线区合约其他周期的K线的。
这导某些周期K线随n值不同,K线的起止时间会变化。而如果采用从上市日期开始计算等交易时长的K线位置,则无论何时初始化策略,K线的起止时间都是一样的。

你希望那种K线:

1 等自然时长K线——无需考虑交易时段
2 等交易时长K线——需要考虑交易时段
3 从上市日起算K线——起止位置固定
4 从策略初始化时起算K线——起止位置不固定

第6帖子已经实现目标——固定交易时长位置固定的K线图

description

Member
avatar
加入于:
帖子: 419
声望: 170

如何确定一个Tick属于哪一根K线

合约的交易时段与K线周期的关系

description

LD:合约上市日期(Listed date)
LTD:最新交易日(Lastest trade date)
ULD:合约最后交易日(Unlisted date)
TTPD:每日交易时间(Trade time per day)
KW:K线周期(Kindle witdth)
STTD:交易日开始时间(从LD起算)
STTK:K线日开始时间(从LD起算)

Days = LTD到LD之间到交易日数(去掉节假日)
STTD = TTPD×Days

每日交易时间TTPD的计算

不同合约的交易时间段可能是不一样的,它们可以利用rqdatac的instruments(symbol)函数读取,返回结果中包含 trading_hours字段。如:
RB2010交易时间段:
trading_hours='21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00'
AG2012交易时间段:
trading_hours='21:01-02:30,09:01-10:15,10:31-11:30,13:31-15:00'
MA2010交易时间段:
trading_hours='21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00'
trading_hours的形式:start1-stop1,start2-stop2,...,startN-stopN

description

注意把start和stop的单位换算为日内的分钟数量

计算Tick所在K线的索引

已知tick中包含时间datetime,K线的周期为KW,那么从上市日算起的各个时间:
利用tick.datetime求出所在交易日,从而求出STTD
TickTD:tick在交易日内的分钟数
TickLD:tick从上市开始的分钟数=STTD+TickTD
IndexK:tick所在K线的所有=int(TickLD/KW)
STTK:K线开始时间=IndexK*KW

Member
avatar
加入于:
帖子: 419
声望: 170

rqdatac可以提供的功能

获取合约信息

rq.instruments(symbol)

返回值:

Instrument(order_book_id='RB2010', symbol='螺纹钢2010', round_lot=1.0, contract_multiplier=10.0, underlying_order_book_id='null', underlying_symbol='RB', maturity_date='2020-10-15', type='Future', exchange='SHFE', listed_date='2019-10-16', de_listed_date='2020-10-15', margin_rate=0.09, trading_hours='21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00', market_tplus=0, industry_name='焦煤钢矿', product='Commodity')

其中:

listed_date——上市日期
de_listed_date——交割日期
trading_hours——交易时间段

求最新交易日

 rq.get_latest_trading_date()

求两个日期之间的交易日

rq.get_trading_dates(listed_date,last_trade_date)

求下n个交易日

rq.get_next_trading_date(listed_date,n=days)

获取交易时间段

rq.get_trading_hours(symbol,frequency='tick',expected_fmt='str')
rq.get_trading_hours(symbol,frequency='1m',expected_fmt='datetime')
rq.get_trading_hours(symbol,frequency='1m',expected_fmt='time')
Member
avatar
加入于:
帖子: 419
声望: 170

实现方法:

import datetime

import rqdatac as rq
from rqdatac.utils import to_date

# RQDatac初始化
rq.init('xxxxxx','*****',("rqdatad-pro.ricequant.com",16011))

''' 获得上市日期 '''
def get_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.listed_date)

''' 获得交割日期 '''
def get_de_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.de_listed_date)

symbol = 'RB2010'

listed_date = get_listed_date(symbol)   # 上市日期
de_listed_date = get_de_listed_date(symbol) # 交割日期
trade_dates = rq.get_trading_dates(listed_date,de_listed_date) # 合约的所有的交易日

trade_date_index = {}   # 合约的交易日索引字典
trade_index_date = {}   # 交易天数与交易日字典
days = 0
for td in trade_dates:
    trade_date_index[td] = days
    trade_index_date[days] = td
    days += 1

last_date = rq.get_latest_trading_date()
print(f"{symbol} {last_date}'d index: {trade_date_index[last_date]}")   

trading_hours = rq.get_trading_hours(symbol,date=last_date,frequency='1m',expected_fmt='datetime')

TTPD = datetime.timedelta(0,0,0)
for start,stop in trading_hours:
    timedelta = stop - start + datetime.timedelta(minutes=1)
    TTPD = TTPD + timedelta
    print(f"{symbol}'s start:{start}-stop:{stop},timedelta={timedelta}") 

print(f"{symbol}'s TTPD={TTPD}") 

STTD = TTPD * trade_date_index[last_date]
print(f"{symbol}'s STTD={STTD}") 

# 已知tick时间为最新交易日的14:37,K线周期为30分钟,求K线的开始时间STTK
TickTime = datetime.datetime.combine(last_date,datetime.time(14,37))
#求tick的日内时间
tt_in_day = datetime.timedelta(0,0,0)   
for start,stop in trading_hours:
    if start <= TickTime and TickTime < stop:
        tt_in_day += (TickTime-start)+datetime.timedelta(minutes=1)
    else:
        tt_in_day += (stop-start)+datetime.timedelta(minutes=1)
# 求tick从上市开始的总时间
tt_total = TTPD * trade_date_index[TickTime.date()] + tt_in_day

# K线周期
KW = datetime.timedelta(minutes=30) 
# K线的开始时间
STTK = int(tt_total/KW)*KW

print(f"TickTime={TickTime},tt_total={tt_total},STTK={STTK}")

执行结果

RB2010 2020-07-28'd index: 191
RB2010's start:2020-07-27 21:01:00-stop:2020-07-27 23:00:00,timedelta=2:00:00
RB2010's start:2020-07-28 09:01:00-stop:2020-07-28 10:15:00,timedelta=1:15:00
RB2010's start:2020-07-28 10:31:00-stop:2020-07-28 11:30:00,timedelta=1:00:00
RB2010's start:2020-07-28 13:31:00-stop:2020-07-28 15:00:00,timedelta=1:30:00
RB2010's TTPD=5:45:00
RB2010's STTD=45 days, 18:15:00
TickTime=2020-07-28 14:37:00,tt_total=45 days, 23:37:00,STTK=45 days, 23:30:00
Member
avatar
加入于:
帖子: 419
声望: 170

合约的交易时间段

之前发布的交易时间段有错误,经过细致修改终于把各种特殊情况都修改,现在重新上传:
代码保存文件:vnpy\user_tools\my_strategy_tool.py中

from typing import Callable,List,Dict, Tuple, Union

import copy
import numpy as np
import talib
import vnpy.usertools.mylib as mylib

from vnpy.app.cta_strategy import (
    BarGenerator,
    ArrayManager
)

from vnpy.trader.object import (
    BarData,
    TickData,
    TradeData
)

from vnpy.trader.constant import Interval
from enum import Enum

import datetime 
import rqdatac as rq
from rqdatac.utils import to_date

import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")


''' 获得上市日期 '''
def get_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.listed_date)

''' 获得交割日期 '''
def get_de_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.de_listed_date)


class Timeunit(Enum):
    """ 时间单位 """
    SECOND = '1s'
    MINUTE = '1m'
    HOUR = '1h'


class TradeHours(object):
    """ 合约交易时间段 """
    def __init__(self,symbol:str):
        self.symbol = symbol
        self.init()

    def init(self):
        """ 初始化交易日字典及交易时间段数据列表 """
        self.listed_date = get_listed_date(self.symbol)
        self.de_listed_date = get_de_listed_date(self.symbol)

        self.trade_date_index = {}   # 合约的交易日索引字典
        self.trade_index_date = {}   # 交易天数与交易日字典

        trade_dates = rq.get_trading_dates(self.listed_date,self.de_listed_date) # 合约的所有的交易日
        days = 0
        for td in trade_dates:
            self.trade_date_index[td] = days
            self.trade_index_date[days] = td
            days += 1

        trading_hours = rq.get_trading_hours(self.symbol,date=self.listed_date,frequency='tick',expected_fmt='datetime')

        self.time_dn_pairs = self._get_trading_times_dn(trading_hours)

        trading_hours0 = [(start.replace(tzinfo=CHINA_TZ),stop.replace(tzinfo=CHINA_TZ)) for start,stop in trading_hours]
        self.trade_date_index[self.listed_date] = (0,trading_hours0)
        for day in range(1,days):
            td = self.trade_index_date[day]
            trade_datetimes = []
            for (start,dn1),(stop,dn2) in self.time_dn_pairs:
                #start:开始时间,dn1:相对交易日前推天数,
                #stop :开始时间,dn2:相对开始时间后推天数     
                d = self.trade_index_date[day+dn1]
                trade_datetimes.append((
                    datetime.datetime.combine(d,start,tzinfo=CHINA_TZ),
                    datetime.datetime.combine(d,stop,tzinfo=CHINA_TZ)+datetime.timedelta(days=dn2)))
            self.trade_date_index[td] = (day,trade_datetimes)

    def _get_trading_times_dn(self,trading_hours:List[Tuple[datetime.datetime,datetime.datetime]]): 
        """ 
        交易时间跨天处理,不推荐外部使用 。
        产生的结果:[((start1,dn11),(stop1,dn21)),((start2,dn12),(stop2,dn22)),...,((startN,dn1N),(stopN,dn2N))]
        其中:
            startN:开始时间,dn1N:相对交易日前推天数,
            stopN:开始时间,dn2N:相对开始时间后推天数      
        """
        ilen = len(trading_hours)
        if ilen == 0:
            return []
        start_stops = []
        for start,stop in trading_hours:
            start_stops.insert(0,(start.time(),stop.time()))

        pre_start,pre_stop = start_stops[0]
        dn1 = 0
        dn2 = 1 if pre_start > pre_stop else 0
        time_dn_pairs = [((pre_start,dn1),(pre_stop,dn2))]
        for start,stop in start_stops[1:]:
            if start > pre_start:
                dn1 -= 1
            dn2 = 1 if start > stop else 0
            time_dn_pairs.insert(0,((start,dn1),(stop,dn2)))
            pre_start,pre_stop = start,stop

        return time_dn_pairs

    def get_date_tradetimes(self,date:datetime.date):
        """ 得到合约date日期的交易时间段 """
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        return idx,trade_times

    def get_trade_datetimes(self,dt:datetime,allday:bool=False):
        """ 得到合约date日期的交易时间段 """

        # 得到最早的交易时间
        idx0,trade_times0 = self.get_date_tradetimes(self.listed_date)
        start0,stop0 = trade_times0[0]
        if dt < start0:
            return None,[]

        # 首先找到dt日期自上市以来的交易天数
        date,dn = dt.date(),0
        days = None
        while date < self.de_listed_date:
            days,ths = self.trade_date_index.get(date,(None,[]))
            if not days:
                dn += 1
                date = (dt+datetime.timedelta(days=dn)).date()
            else:
                break
        # 如果超出交割日也没有找到,那这就不是一个有效的交易时间
        if days is None:
            return (None,[])

        index_3 = [days,days+1,days-1]  # 前后三天的

        date_3d = []
        for day in index_3: 
            date = self.trade_index_date.get(day,None)
            date_3d.append(date)

        # print(date_3d)

        for date in date_3d:
            if not date:
                # print(f"{date} is not trade date")
                continue

            idx,trade_dts = self.get_date_tradetimes(date)
            # print(f"{date} tradetimes {trade_dts}")
            ilen = len(trade_dts)
            if ilen > 0:
                start0,stop = trade_dts[0]      # start0 是date交易日的开始时间
                start,stop0 = trade_dts[-1]
            if dt<start0 or dt>stop0:
                continue

            for start,stop in trade_dts:
                if dt>=start and dt < stop:
                    if allday:
                        return idx,trade_dts
                    else:
                        return idx,[(start,stop)]

        return None,[]

    def get_trade_time_perday(self):
        """ 计算每日的交易总时长(单位:分钟) """
        TTPD = datetime.timedelta(0,0,0)

        datetimes = []
        today = datetime.datetime.now().date()

        for (start,dn1),(stop,dn2) in self.time_dn_pairs:
            start_dt = datetime.datetime.combine(today,start,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn1)
            stop_dt = datetime.datetime.combine(today,stop,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn2)
            time_delta = stop_dt - start_dt
            TTPD = TTPD + time_delta
        return int(TTPD.seconds/60)

    def get_trade_time_inday(self,dt:datetime,unit:Timeunit=Timeunit.MINUTE):
        """ 
        计算dt在交易日内的分钟数 
        unit: '1s':second;'1m':minute;'1h';1h
        """
        TTID = datetime.timedelta(0,0,0)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            return None

        for start,stop in trade_times:
            if dt > stop:
                time_delta = stop - start
                TTID += time_delta
            elif dt > start:
                time_delta = dt - start
                TTID += time_delta     
                break
            else:
                break          

        if unit == Timeunit.SECOND:
            return TTID.seconds
        elif unit == Timeunit.MINUTE:
            return int(TTID.seconds/60) 
        elif unit == Timeunit.HOUR:
            return int(TTID.seconds/3600) 
        else:
            return TTID

    def convet_to_datetime(self,day:int,minutes:int):
        """ 计算minutes在第day交易日内的datetime形式的时间 """
        date = self.trade_index_date.get(day,None)
        if date is None:
            return None
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        if not trade_times:     # 不一定必要
            return None
        for (start,stop) in trade_times:
            timedelta = stop - start 
            if minutes < int(timedelta.seconds/60):
                return start + datetime.timedelta(minutes=minutes)
            else:
                minutes -= int(timedelta.seconds/60)
        return None

    def get_bar_window(self,dt:datetime,window:int,interval:Interval=Interval.MINUTE):
        """ 计算dt所在K线的起止时间 """
        bar_windows = (None,None)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            # print(f"day={day} trade_times={trade_times}")
            return bar_windows


        # 求每个交易日的交易时间分钟数
        TTPD = self.get_trade_time_perday()

        # 求dt在交易日内的分钟数
        TTID = self.get_trade_time_inday(dt,unit=Timeunit.MINUTE)

        # 得到dt时刻K线的起止时间 
        total_minites = day*TTPD + TTID

        # 计算K线宽度(分钟数)
        if interval == Interval.MINUTE:
            bar_width = window
        elif interval == Interval.HOUR:
            bar_width = 60*window
        elif interval == Interval.DAILY:
            bar_width = TTPD*window
        elif interval == Interval.WEEKLY:
            bar_width = TTPD*window*5
        else:
            return bar_windows

        # 求K线的开始时间的和结束的分钟形式
        start_m = int(total_minites/bar_width)*bar_width
        stop_m = start_m + bar_width

        # 计算K开始时间的datetime形式
        start_d = int(start_m / TTPD)
        minites = start_m % TTPD
        start_dt = self.convet_to_datetime(start_d,minites)
        # print(f"start_d={start_d} minites={minites}---->{start_dt}")

        # 计算K结束时间的datetime形式
        stop_d = int(stop_m / TTPD)
        minites = stop_m % TTPD
        stop_dt = self.convet_to_datetime(stop_d,minites)
        # print(f"stop_d={stop_d} minites={minites}---->{stop_dt}")

        return start_dt,stop_dt
Member
avatar
加入于:
帖子: 419
声望: 170

一个等交易长度、固定位置的K线产生器FixedBarGenerator

以下代码保存在vnpy\user_tools\my_strategy_tool.py文件中

from typing import Callable,List,Dict, Tuple, Union

import copy
import numpy as np
import talib
import vnpy.usertools.mylib as mylib

from vnpy.app.cta_strategy import (
    BarGenerator,
    ArrayManager
)

from vnpy.trader.object import (
    BarData,
    TickData,
    TradeData
)

from vnpy.trader.constant import Interval
from enum import Enum

import datetime 
import rqdatac as rq
from rqdatac.utils import to_date

import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")


''' 获得上市日期 '''
def get_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.listed_date)

''' 获得交割日期 '''
def get_de_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.de_listed_date)


class Timeunit(Enum):
    """ 时间单位 """
    SECOND = '1s'
    MINUTE = '1m'
    HOUR = '1h'


class TradeHours(object):
    """ 合约交易时间段 """
    def __init__(self,symbol:str):
        self.symbol = symbol
        self.init()

    def init(self):
        """ 初始化交易日字典及交易时间段数据列表 """
        self.listed_date = get_listed_date(self.symbol)
        self.de_listed_date = get_de_listed_date(self.symbol)

        self.trade_date_index = {}   # 合约的交易日索引字典
        self.trade_index_date = {}   # 交易天数与交易日字典

        trade_dates = rq.get_trading_dates(self.listed_date,self.de_listed_date) # 合约的所有的交易日
        days = 0
        for td in trade_dates:
            self.trade_date_index[td] = days
            self.trade_index_date[days] = td
            days += 1

        trading_hours = rq.get_trading_hours(self.symbol,date=self.listed_date,frequency='tick',expected_fmt='datetime')

        self.time_dn_pairs = self._get_trading_times_dn(trading_hours)

        trading_hours0 = [(start.replace(tzinfo=CHINA_TZ),stop.replace(tzinfo=CHINA_TZ)) for start,stop in trading_hours]
        self.trade_date_index[self.listed_date] = (0,trading_hours0)
        for day in range(1,days):
            td = self.trade_index_date[day]
            trade_datetimes = []
            for (start,dn1),(stop,dn2) in self.time_dn_pairs:
                #start:开始时间,dn1:相对交易日前推天数,
                #stop :开始时间,dn2:相对开始时间后推天数     
                d = self.trade_index_date[day+dn1]
                trade_datetimes.append((
                    datetime.datetime.combine(d,start,tzinfo=CHINA_TZ),
                    datetime.datetime.combine(d,stop,tzinfo=CHINA_TZ)+datetime.timedelta(days=dn2)))
            self.trade_date_index[td] = (day,trade_datetimes)

    def _get_trading_times_dn(self,trading_hours:List[Tuple[datetime.datetime,datetime.datetime]]): 
        """ 
        交易时间跨天处理,不推荐外部使用 。
        产生的结果:[((start1,dn11),(stop1,dn21)),((start2,dn12),(stop2,dn22)),...,((startN,dn1N),(stopN,dn2N))]
        其中:
            startN:开始时间,dn1N:相对交易日前推天数,
            stopN:开始时间,dn2N:相对开始时间后推天数      
        """
        ilen = len(trading_hours)
        if ilen == 0:
            return []
        start_stops = []
        for start,stop in trading_hours:
            start_stops.insert(0,(start.time(),stop.time()))

        pre_start,pre_stop = start_stops[0]
        dn1 = 0
        dn2 = 1 if pre_start > pre_stop else 0
        time_dn_pairs = [((pre_start,dn1),(pre_stop,dn2))]
        for start,stop in start_stops[1:]:
            if start > pre_start:
                dn1 -= 1
            dn2 = 1 if start > stop else 0
            time_dn_pairs.insert(0,((start,dn1),(stop,dn2)))
            pre_start,pre_stop = start,stop

        return time_dn_pairs

    def get_date_tradetimes(self,date:datetime.date):
        """ 得到合约date日期的交易时间段 """
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        return idx,trade_times

    def get_trade_datetimes(self,dt:datetime,allday:bool=False):
        """ 得到合约date日期的交易时间段 """

        # 得到最早的交易时间
        idx0,trade_times0 = self.get_date_tradetimes(self.listed_date)
        start0,stop0 = trade_times0[0]
        if dt < start0:
            return None,[]

        # 首先找到dt日期自上市以来的交易天数
        date,dn = dt.date(),0
        days = None
        while date < self.de_listed_date:
            days,ths = self.trade_date_index.get(date,(None,[]))
            if not days:
                dn += 1
                date = (dt+datetime.timedelta(days=dn)).date()
            else:
                break
        # 如果超出交割日也没有找到,那这就不是一个有效的交易时间
        if days is None:
            return (None,[])

        index_3 = [days,days+1,days-1]  # 前后三天的

        date_3d = []
        for day in index_3: 
            date = self.trade_index_date.get(day,None)
            date_3d.append(date)

        # print(date_3d)

        for date in date_3d:
            if not date:
                # print(f"{date} is not trade date")
                continue

            idx,trade_dts = self.get_date_tradetimes(date)
            # print(f"{date} tradetimes {trade_dts}")
            ilen = len(trade_dts)
            if ilen > 0:
                start0,stop = trade_dts[0]      # start0 是date交易日的开始时间
                start,stop0 = trade_dts[-1]
            if dt<start0 or dt>stop0:
                continue

            for start,stop in trade_dts:
                if dt>=start and dt < stop:
                    if allday:
                        return idx,trade_dts
                    else:
                        return idx,[(start,stop)]

        return None,[]

    def get_trade_time_perday(self):
        """ 计算每日的交易总时长(单位:分钟) """
        TTPD = datetime.timedelta(0,0,0)

        datetimes = []
        today = datetime.datetime.now().date()

        for (start,dn1),(stop,dn2) in self.time_dn_pairs:
            start_dt = datetime.datetime.combine(today,start,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn1)
            stop_dt = datetime.datetime.combine(today,stop,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn2)
            time_delta = stop_dt - start_dt
            TTPD = TTPD + time_delta
        return int(TTPD.seconds/60)

    def get_trade_time_inday(self,dt:datetime,unit:Timeunit=Timeunit.MINUTE):
        """ 
        计算dt在交易日内的分钟数 
        unit: '1s':second;'1m':minute;'1h';1h
        """
        TTID = datetime.timedelta(0,0,0)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            return None

        for start,stop in trade_times:
            if dt > stop:
                time_delta = stop - start
                TTID += time_delta
            elif dt > start:
                time_delta = dt - start
                TTID += time_delta     
                break
            else:
                break          

        if unit == Timeunit.SECOND:
            return TTID.seconds
        elif unit == Timeunit.MINUTE:
            return int(TTID.seconds/60) 
        elif unit == Timeunit.HOUR:
            return int(TTID.seconds/3600) 
        else:
            return TTID

    def convet_to_datetime(self,day:int,minutes:int):
        """ 计算minutes在第day交易日内的datetime形式的时间 """
        date = self.trade_index_date.get(day,None)
        if date is None:
            return None
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        if not trade_times:     # 不一定必要
            return None
        for (start,stop) in trade_times:
            timedelta = stop - start 
            if minutes < int(timedelta.seconds/60):
                return start + datetime.timedelta(minutes=minutes)
            else:
                minutes -= int(timedelta.seconds/60)
        return None

    def get_bar_window(self,dt:datetime,window:int,interval:Interval=Interval.MINUTE):
        """ 计算dt所在K线的起止时间 """
        bar_windows = (None,None)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            # print(f"day={day} trade_times={trade_times}")
            return bar_windows


        # 求每个交易日的交易时间分钟数
        TTPD = self.get_trade_time_perday()

        # 求dt在交易日内的分钟数
        TTID = self.get_trade_time_inday(dt,unit=Timeunit.MINUTE)

        # 得到dt时刻K线的起止时间 
        total_minites = day*TTPD + TTID

        # 计算K线宽度(分钟数)
        if interval == Interval.MINUTE:
            bar_width = window
        elif interval == Interval.HOUR:
            bar_width = 60*window
        elif interval == Interval.DAILY:
            bar_width = TTPD*window
        elif interval == Interval.WEEKLY:
            bar_width = TTPD*window*5
        else:
            return bar_windows

        # 求K线的开始时间的和结束的分钟形式
        start_m = int(total_minites/bar_width)*bar_width
        stop_m = start_m + bar_width

        # 计算K开始时间的datetime形式
        start_d = int(start_m / TTPD)
        minites = start_m % TTPD
        start_dt = self.convet_to_datetime(start_d,minites)
        # print(f"start_d={start_d} minites={minites}---->{start_dt}")

        # 计算K结束时间的datetime形式
        stop_d = int(stop_m / TTPD)
        minites = stop_m % TTPD
        stop_dt = self.convet_to_datetime(stop_d,minites)
        # print(f"stop_d={stop_d} minites={minites}---->{stop_dt}")

        return start_dt,stop_dt

class FixedBarGenerator(BarGenerator):
    """ 固定位置K线生成器 """ 
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE,
        symbol:str=''
    ):
        super().__init__(on_bar,window,on_window_bar,interval)
        self.trade_hours = TradeHours(symbol)

        self.kx_start,self.kx_stop = None,None

    def update_bar(self,bar:BarData) -> None:
        """
        Update 1 minute bar into generator
        """

        # 如果bar的时间戳笔windows的时间戳还早,丢弃bar
        if self.window_bar and bar.datetime < self.window_bar.datetime:
            return

        if (self.kx_start,self.kx_stop) == (None,None):
            self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
            if (self.kx_start,self.kx_stop) == (None,None):
                return

        # If not inited, creaate window bar object
        if (not self.window_bar):
            # 获得K线的交易起止时间
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=self.kx_start,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )         

        elif self.kx_start <= bar.datetime and bar.datetime < self.kx_stop:
            # 1分钟K线属于当前K线
            self.window_bar.high_price = max(
                self.window_bar.high_price, bar.high_price)
            self.window_bar.low_price = min(
                self.window_bar.low_price, bar.low_price)

        elif bar.datetime >= self.kx_stop:       # Check if window bar completed
            self.on_window_bar(self.window_bar)
            self.window_bar = None

            self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
            if (self.kx_start,self.kx_stop) == (None,None): 
                # 不在交易时段
                return

            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=self.kx_start,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )       

        # Update close price/volume into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += int(bar.volume)
        self.window_bar.open_interest = bar.open_interest

创建一个可显示K线的策略FixedKxStrategy

以下代码为 [用户目录]\strategies\FixedKxStrategy.py的内容

from typing import Any,List,Dict,Tuple
import copy

from vnpy.app.cta_strategy import (
    CtaTemplate,
    BarGenerator,
    ArrayManager,
    StopOrder,
    Direction
)

from vnpy.trader.engine import MainEngine,EventEngine
from vnpy.app.cta_strategy.engine import CtaEngine
from vnpy.event.engine import Event

from vnpy.trader.object import (
    LogData,
    TickData,
    BarData,
    TradeData,
    OrderData,
)

from vnpy.app.cta_strategy import StopOrder
from vnpy.app.cta_strategy.base import EngineType
from vnpy.trader.constant import Interval

from vnpy.app.cta_strategy.base import (
    APP_NAME,
    EVENT_CTA_LOG,
    EVENT_CTA_TICK,
    EVENT_CTA_HISTORY_BAR,
    EVENT_CTA_BAR,
    EVENT_CTA_ORDER,
    EVENT_CTA_TRADE,    
    EVENT_CTA_STOPORDER,
    EVENT_CTA_STRATEGY,
)

from vnpy.usertools.kx_chart import (   # hxxjava add
    NewChartWidget,
    CandleItem,
    VolumeItem, 
    LineItem,
    SmaItem,
    RsiItem,
    MacdItem,
)

from vnpy.usertools.my_strategy_tool import FixedBarGenerator

class FixedKxStrategy(CtaTemplate):
    """"""

    author = "hxxjava"
    kx_interval = 1
    show_chart = False  # 显示K线图表 

    parameters = [
        "kx_interval",
        "show_chart"
    ]

    kx_count:int = 0
    cta_manager = None

    variables = ["kx_count"]

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ):
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)
        symbol,exchange = self.vt_symbol.split('.')

        self.bg = FixedBarGenerator(self.on_bar,self.kx_interval,self.on_Nmin_bar,symbol=symbol.upper())

        self.am = ArrayManager()

        cta_engine:CtaEngine = self.cta_engine
        self.engine_type = cta_engine.engine_type
        self.even_engine = cta_engine.main_engine.event_engine

        # 必须在这里声明,因为它们是实例变量
        self.all_bars:List[BarData] = [] 
        self.cur_window_bar:[BarData] = None
        self.bar_updated = False

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.load_bar(20)

        if len(self.all_bars)>0:
            self.send_event(EVENT_CTA_HISTORY_BAR,self.all_bars)

    def on_start(self):
        """ """
        self.write_log("已开始")

    def on_stop(self):
        """"""
        self.write_log("_kx_strategy 已停止")

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bar_updated = False
        self.current_tick = tick    # 记录最新tick 

        # 再更新tick,产生1分钟K线乃至N 分钟线
        self.bg.update_tick(tick)

        if self.inited:     
            # 先产生当前临时K线
            self.cur_window_bar = self.get_cur_window_bar()  
            if self.cur_window_bar:
                # 发送当前临时K线更新消息
                self.send_event(EVENT_CTA_BAR,self.cur_window_bar)           

            self.send_event(EVENT_CTA_TICK,tick)  

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        if self.inited:   
            self.write_log(f"I got a 1min BarData at {bar.datetime}")

        self.bg.update_bar(bar)
        self.bar_updated = True

    def on_Nmin_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.all_bars.append(bar)
        self.kx_count = len(self.all_bars)

        if self.inited:
            self.write_log(f"I got a {self.kx_interval}min BarData at {bar.datetime}")
            self.send_event(EVENT_CTA_BAR,bar)

        self.put_event()

    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.send_event(EVENT_CTA_TRADE,trade)

    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        self.send_event(EVENT_CTA_ORDER,order)

    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        self.send_event(EVENT_CTA_STOPORDER,stop_order)

    def get_cur_window_bar(self):
        window_bar = copy.deepcopy(self.bg.window_bar)
        bar = self.bg.bar

        if not(window_bar): # 刚产生过window_bar
            return None

        if self.bar_updated: # 刚更新过window_bar
            return window_bar

        # 上一分钟window_bar和当前bar合成出临时window bar
        window_bar.high_price = max(window_bar.high_price, bar.high_price)
        window_bar.low_price = min(window_bar.low_price, bar.low_price)

        # Update close price/volume into window bar
        window_bar.close_price = bar.close_price
        window_bar.volume += int(bar.volume)
        window_bar.open_interest = bar.open_interest
        return window_bar

    def send_event(self,event_type:str,data:Any):   
        """
        只在实盘引擎并且配置为显示K线图表的情况下发送小线
        """     
        if self.engine_type==EngineType.LIVE and self.show_chart:     # "如果显示K线图表"
            self.even_engine.put(Event(event_type,(self.strategy_name,data)))

    def init_kx_chart(self,kx_chart:NewChartWidget=None):    # hxxjava add ----- 提供给外部调用
        # self.write_log("init_kx_chart executed !!!")
        if kx_chart:
            kx_chart.add_plot("candle", hide_x_axis=True)
            kx_chart.add_plot("volume", maximum_height=150)
            kx_chart.add_plot("rsi", maximum_height=150)
            kx_chart.add_plot("macd", maximum_height=150)
            kx_chart.add_item(CandleItem, "candle", "candle")
            kx_chart.add_item(VolumeItem, "volume", "volume")

            kx_chart.add_item(LineItem, "line", "candle")
            kx_chart.add_item(SmaItem, "sma", "candle")
            kx_chart.add_item(RsiItem, "rsi", "rsi")
            kx_chart.add_item(MacdItem, "macd", "macd")
            kx_chart.add_last_price_line()
            kx_chart.add_cursor()

运行策略FixedKxStrategy

具体操作方法以及缺少的NewChartWidget等组件参见:
https://www.vnpy.com/forum/topic/3920-wei-kxian-tu-biao-tian-zhuan-jia-wa-rang-ctace-lue-de-yun-xing-kan-de-jian-1

设置方法

按下图的步骤创建一个可以显示K线图的用户策略

description

运行效果

以下为ag2012.SHFE的30分钟K线图,它显示的K线是等交易时长,位置也是固定的。

description

Member
avatar
加入于:
帖子: 419
声望: 170

K线种类总结:

K线的种类可以这样划分:
按K线周期长度:

  • 1 常规周期K线
  • 2 自定义K周期线

按K线参考起始时间划分:

  • 1 等自然时长K线
  • 2 等交易时长K线
    • 1)参考交易日起点的等交易时长K线
    • 2)参考日盘起点和夜盘起点的等交易时长K线
    • 3)参考合约上市日起点的等交易时长K线

一、按照K线周期划分

  • 1 常规周期K线:如1,5,10,15,20,30分钟K线,1,2,4小时K线,日K线,周K线、月K线,年K线等。它们通常在常见的金融终端的GUI界面都会出现。因为是常规周期K线,为了加快软件的加载、显示和速度,常常把它们直接在服务器的数据库中做存储。这样做的好处是K线数据的处理和加载简单,显示速度快,缺点是需要为每个用户可能用到的周都准备一个表。
  • 2 自定义周期K线:在常规周期之外,用户自定义时间周期的K线,如7,19分钟周期K线。

VNPY对常规周期K线和自定义周期K线的处理是一视同仁的,统一使用BarGenerator()就可以了,它只使用1分钟K线数据,根据需要产生N分钟K线,至于产生出来是常规周期K线,还是自定义周期K线,主要看N是多少了。这样做的好处是自由!缺点是处理复杂些、速度慢些(不过反正不用手工处理,让计算机做,无所谓)。

二、按K线起始时间的参考起点划分

1 等自然时长K线

它是按照自然时间来计算一根K线是否结束的。如对于30分钟K线,rb2010合约在每个交易日都会产生这样的一组K线:
起止时间 交易时长
21:00-21:30——30分钟
21:30-22:00——30分钟
22:00-22:30——30分钟
22:30-23:00——30分钟
23:00-23:30——30分钟
09:00-09:30——30分钟
09:30-10:00——30分钟
10:00-10:30——15分钟(?)
10:30-11:00——30分钟
11:00-11:30——30分钟
13:30-14:00——30分钟
14:00-14:30——30分钟
14:30-15:00——30分钟
目前vnpy的BarGenerator()就是这种K线参考机制。优点点是实现简单,无需考虑交易时段;缺点是明天都会有特别30分钟K是个另类:10:00-10:30的K线,其实只交易了15分钟!

2 等交易时长K线

熟悉文华财经8.3的应该熟悉下图的设置:
description

1)参考交易日起点

以合约的日交易时间的起点为参考,然后以等交易时间宽度产生K线。如文华财经的常规周期K线划分中的 “交易时间机制”就是这种情况。
如RB2010交易时间段:'21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00'。以交易日起点方式的30分钟K线分别是
起止时间 交易时长
21:00-21:30——30分钟
21:30-22:00——30分钟
22:00-22:30——30分钟
22:30-23:00——30分钟
23:00-23:30——30分钟
09:00-09:30——30分钟
09:30-10:00——30分钟
10:00-10:45——30分钟
10:45-11:15——30分钟
11:15-11:45——30分钟
13:45-14:15——30分钟
14:15-14:45——30分钟
14:45-15:00——15分钟(?)

这种K线时间参考机制的优点是不会模糊每日开市时的跳空行情,缺点是每日都可能产生一根交易时间不足30分钟的K线。

2)参考日盘起点和夜盘起点

分别以合约的日盘交易时间起点和夜盘交易时间起点为参考,然后以等交易时间宽度产生K线。如文华财经的常规周期K线划分中的 “交易时间机制改进型”就是这种情况。
如RB2010交易时间段:夜盘:'21:00-02:30, 日盘:09:00-10:15,10:30-11:30,13:30-15:00'。以交易日起点方式的60分钟K线分别是
夜盘K线:
起止时间 交易时长
21:00-22:00——60分钟
22:00-23:00——60分钟
23:00-00:00——60分钟
00:00-01:00——60分钟
01:00-02:00——60分钟
02:00-02:30——30分钟(?)
日盘K线:
起止时间 交易时长
09:00-10:00——60分钟
10:00-11:15——60分钟
11:15-11:15——60分钟
11:15-14:15——60分钟
14:15-15:00——45分钟(?)
这种K线时间参考机制的优点是不会模糊各种日盘和夜盘开市时的跳空行情,缺点是每日都可能产生交易时长不足60分钟的K线。

3)参考合约上市日起点

以合约上市日的交易时间起点为参考,取出节假日,考虑交易时段,然后以等交易时间宽度产生K线。如FixedBarGenerator对N分钟K线的处理就是这种情况。FixedBarGenerator的原理参见具体实现代码参见在前面的帖子里面已经描述得比较清楚了,这里不再重复。

参考合约上市日起点、等交易时间的好处是:所有的K线的交易时长都是想等的,它们的成交量是可类比的。因为交易时长不等的K线产生的所谓“地量”或者“天量”,有多大意义是可想而知的。这种K线产生机制的缺点是:可能产生跨交易日,跨日盘和夜盘的K线,这样可能造成观察不到交易日开盘跳空,日盘和夜盘开市时的跳空现象,因为可能那根K线还没有结束。

总结:

鱼和熊掌不可兼得,每种K线产生机制都有优点和缺点,至于您选择哪一种来产生你的K线,就看你的取舍了。明白与不明白个中的道理却是大相径庭的,所谓胸有成竹,才能够临危不乱。谨以本贴分享本人的一点浅薄之见,希望能够帮助到您!

Member
avatar
加入于:
帖子: 9
声望: 0

对于我这种新手来说 合成日线真的是苦不堪言,要考虑的东西太多了

Member
avatar
加入于:
帖子: 2
声望: 0

奇怪,5分钟K线应该没有这个问题吧,为什么合同后的也和文华的不一样

Member
avatar
加入于:
帖子: 419
声望: 170

请看看这个帖子:不要轻易说你生成的K线错误了 !!!

Member
avatar
加入于:
帖子: 23
声望: 0

vnpy\user_tools\my_strategy_tool.py没找到6楼说的这个路径啊

Member
avatar
加入于:
帖子: 69
声望: 0

mark

Member
avatar
加入于:
帖子: 11
声望: 2

如果tick数据不全,比如这一分钟有100条数据,正常应该是120条 1秒2跳才对,那么vnpy合成的1分钟会用100条数据还是120条数据?

Member
avatar
加入于:
帖子: 419
声望: 170

五彩城 wrote:

如果tick数据不全,比如这一分钟有100条数据,正常应该是120条 1秒2跳才对,那么vnpy合成的1分钟会用100条数据还是120条数据?

1分钟里有多少条用多少条。

比如只有3个tick,就用3条。
为什么?因当盘口中买1价总是低于卖1价,当没有人愿意出价比买1价高的价格买入,
也没有人愿意出比卖1价低的价格卖出时,盘口就处于安静状态,此时没有任何该合约
的tick推送给的客户端,那么可能这一分钟就不足120个tick,当下一分钟的tick被收
到时,就算是只收到了3个tick,当前的这一分钟K线也必须结束了。

Member
avatar
加入于:
帖子: 103
声望: 7

楼主,你好,我发现在vnpy原有的K线合成逻辑,在期货中10:15-10:30这段以后合成出来的K线总是错的,和期货app上的K线对应不上,这种情况怎么处理?

Member
avatar
加入于:
帖子: 419
声望: 170

李春宝 wrote:

楼主,你好,我发现在vnpy原有的K线合成逻辑,在期货中10:15-10:30这段以后合成出来的K线总是错的,和期货app上的K线对应不上,这种情况怎么处理?

答复:

不能说vnpy产生的K线就是错误的,可能是两边产生的规则不一样导致的的。
我不知道你使用的什么期货软件,但是每个软件在产生K线的时候都不一样的,有时候这是允许你设置的。
比如文华的期货客户端,有等自然时长、等交易时长和增强型K线之分。
对齐方式还有分为对齐上市时刻、对齐黑/白盘、对齐日盘等之分。
具体参阅K线种类总结

Member
avatar
加入于:
帖子: 7
声望: 0

mark

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】