vn.py量化社区
By Traders, For Traders.
Administrator
avatar
加入于:
帖子: 198
声望: 38

第7步:计算技术指标

 

这一步,我们需要调用talib库的相关函数来进行指标的计算。

 

由于技术指标的输入和输出数量不同,我们这里大致分成5类:

 

  • index_1to1:输入收盘价,得到1个技术指标结果,如移动平均SMA;
  • index_2to1:输入最高价,最低价,得到1个技术指标结果,如AROONOSC指标
  • index_2to2:输入最高价,最低价,得到2个技术指标结果,如AROON指标;
  • index_3to1:输入最高价,最低价,收盘价,得到1个技术指标结果,如ATR指标;
  • index_4to1: 输入开盘价,最高价,最低价,收盘价,得到1个技术指标结果,如BOP指标。

 

然后我们设计在这3类列表中填入我们想要的技术指标字段,程序能检验到并进行相对于的计算,最后添加到DataFrame中。

 

  1. 先检查这5个列表,列表为空则不进行操作;
  2. 若列表不为空:用for循环读取里面的字符串字段;
  3. 调用getattr()函数从talib库中找到对应的函数,这里定义为func;
  4. 在func()函数中通过指定的数据输入得到技术指标结果;
  5. 全部技术指标计算好后,返回最新的DataFrame。

 

self.index_3to1 = ["ATR","ADX","CCI"],
self.index_1to1 = ["STDDEV","SMA"],
self.index_2to2 = ["AROON"],
self.index_2to1 = ["AROONOSC"],
self.index_4to1 = ["BOP"],

    def calculate_index(self, df: DataFrame = None):
        """"""
        output("第七步:计算相关技术指标,返回DataFrame\n")

        if self.index_1to1:
            for i in self.index_1to1:
                func = getattr(talib, i)
                df[i] = func(
                    np.array(df["close"]), 
                    self.window_index
                )

        if self.index_3to1:
            for i in self.index_3to1:
                func = getattr(talib, i)
                df[i] = func(        
                    np.array(df["high"]),
                    np.array(df["low"]),
                    np.array(df["close"]),
                    self.window_index
                )

        if self.index_2to2:
            for i in self.index_2to2:
                func = getattr(talib, i)
                result_down, result_up = func(
                    np.array(df["high"]),
                    np.array(df["low"]),
                    self.window_index
                )
                up = i + "_UP"
                down = i + "_DOWN"
                df[up] = result_up
                df[down] = result_down

        if self.index_2to1:
            for i in self.index_2to1:
                func = getattr(talib, i)
                df[i] = func(
                    np.array(df["high"]),
                    np.array(df["low"]),
                    self.window_index
                )

        if self.index_4to1:
            for i in self.index_4to1:
                func = getattr(talib, i)
                df[i] = func(  
                    np.array(df["open"]),      
                    np.array(df["high"]),
                    np.array(df["low"]),
                    np.array(df["close"]),
                )             
        return df

 

description

 

尽管talib库自身就提供了超过100个技术指标,但在实践应用中还是远远不能满足实际策略开发的需要。但基于以上talib提供的基本指标,我们可以很方便的合成出更加复杂的指标,如通道类的布林带通道、金肯特纳通道等。

 

从上面的例子中,已经有了移动平均SMA和标准差STDDEV,基于这两者就可以计算布林带了,这里我们自行定义的技术指标名称都采用小写:

 

  • boll_up = SMA + 宽度 * STDDEV
  • boll_down = SMA - 宽度 * STDDEV

 

然后可以大致观察布林带开仓信号的有效性:

 

  • 当收盘价从下往上突破布林带上轨,标记为买入开仓信号
  • 当收盘价从上往下突破布林带下轨,标记为卖出开仓信号

 

然后对买卖信号画图,同时也可以画出其他指标,如CCI指标,ATR指标,观察其相关性。

 

下面提供示例代码:

 

  def show_chart(self, data, boll_wide):
      """"""      
      data["boll_up"] = data["SMA"] + data["STDDEV"] * boll_wide
      data["boll_down"] = data["SMA"] - data["STDDEV"] * boll_wide

      up_signal = []
      down_signal = []
      len_data = len(data["close"]) 
      for i in range(1, len_data):
          if data.iloc[i]["close"] > data.iloc[i]["boll_up"]and data.iloc[i-1]["close"] < data.iloc[i - 1]["boll_up"]:
              up_signal.append(i)

          elif data.iloc[i]["close"] < data.iloc[i]["boll_down"] and data.iloc[i-1]["close"] > data.iloc[i - 1]["boll_down"]:
              down_signal.append(i)

      fig = plt.figure(figsize=(20, 8))
      close = data["close"]
      plt.plot(close, lw=1)
      plt.plot(close, '^', markersize=5, color='r', label='UP signal', markevery=up_signal)
      plt.plot(close, 'v', markersize=5, color='g', label='DOWN signal', markevery=down_signal)
      plt.plot(data["boll_up"], lw=0.5, color="r")
      plt.plot(data["boll_down"], lw=0.5, color="g")
      plt.legend()
      plt.show()

      data["ATR"].plot(figsize=(20, 3), title="ATR")
      plt.show()

 

description

 

第8步:多周期分析

 

之前分析的都是基于1分钟K线数据,尽管有许多量化的初学者喜欢盲目追求使用最高频率(比如国内期货的Tick)的数据来做研究分析,美名其曰能够把握行情中最细小的价格波动,但实际上对于许多量化策略来说这么干就等于已经走入了死胡同。

 

不同周期的K线,在省却了其内部价格波动过程的同时(只保留OHLC四个价格),也过滤了不同程度的行情波动噪声,使得各种时序分析的技术能够更好地筛选出有用的信号。

 

接下来我们来尝试合成N分钟数据,比如5分钟、15分钟、30分钟和1小时(60分钟)等的K线,得到新的DataFrame,并重复上面1~7步来观察不同时间序列的统计特征:
 

  1. 定义一个列表intervals,里面填写需要合成的周期;
  2. 用for循环读取周期,创建新的DataFrame,然后调用resample()函数来合成更大周期的高开低收数据;
  3. 把新的DataFrame放入base_analysis()函数,自动进行1~7步的数据分析。

 

intervals = ["5min","15min","30min","1h","2h","4h"]

def multi_time_frame_analysis(self, intervals: list = None, df: DataFrame = None):
    """"""
    if not intervals:
        output("请输入K线合成周期")
        return

    if df is None:
        df = self.orignal

    if df is None:
        output("请先加载数据")
        return

    for interval in intervals: 
        output("------------------------------------------------")  
        output(f"合成{interval}周期K先并开始数据分析")

        data = pd.DataFrame()
        data["open"] = df["open"].resample(interval, how="first")
        data["high"] = df["high"].resample(interval, how="max")
        data["low"] = df["low"].resample(interval, how="min")
        data["close"] = df["close"].resample(interval, how="last")

        result = self.base_analysis(data)
        self.results[interval] = result

 

 

Jupyter Notebook应用

 
将以上8个步骤中,所有的数据分析流程所用到的函数封装成标准的工具类,这样在后续的使用中就能轻松很多:

 

  1. 调用数据分析及相关库
  2. 加载历史数据,以及相关的技术指标参数
  3. 调用base_analysis函数对单周期的数据序列开始进行分析

 

description

 

分析完单时间周期后,调用multi_time_frame_analysis函数开始多周期的数据分析:

 

description

 

 

附录

 

最后,vn.py社区的老规矩,附上数据分析工具类封装好之后的完整源代码:
 

from datetime import datetime
import warnings

import numpy as np
import pandas as pd
from pandas import DataFrame
import matplotlib.pyplot as plt
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller as ADF
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import talib

from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import database_manager
warnings.filterwarnings("ignore")


class DataAnalysis:

    def __init__(self):
        """"""
        self.symbol = ""
        self.exchange = None
        self.interval = None
        self.start = None
        self.end = None
        self.rate = 0.0

        self.window_volatility = 20
        self.window_index = 20

        self.orignal = pd.DataFrame()

        self.index_1to1 = []
        self.index_2to2 = []   
        self.index_3to1 = []
        self.index_2to1 = []
        self.index_4to1 = []
        self.intervals = []

        self.results = {}

    def load_history(
        self,
        symbol: str, 
        exchange: Exchange, 
        interval: Interval, 
        start: datetime, 
        end: datetime,
        rate: float = 0.0,
        index_1to1: list = None,
        index_2to2: list = None,
        index_3to1: list = None,
        index_2to1: list = None,
        index_4to1: list = None,
        window_index: int = 20,
        window_volatility: int = 20,

    ):
        """""" 
        output("开始加载历史数据")

        self.window_volatility = window_volatility
        self.window_index = window_index
        self.rate = rate
        self.index_1to1 = index_1to1
        self.index_2to2 = index_2to2
        self.index_3to1 = index_3to1
        self.index_2to1 = index_2to1
        self.index_4to1 = index_4to1

        # Load history data from database  
        bars = database_manager.load_bar_data(    
            symbol=symbol, 
            exchange=exchange, 
            interval=interval, 
            start=start, 
            end=end,

        )

        output(f"历史数据加载完成,数据量:{len(bars)}")

        # Generate history data in DataFrame
        t = []
        o = []
        h = []
        l = []
        c = []
        v = []

        for bar in bars:
            time = bar.datetime
            open_price = bar.open_price
            high_price = bar.high_price
            low_price = bar.low_price
            close_price = bar.close_price
            volume = bar.volume

            t.append(time)
            o.append(open_price)
            h.append(high_price)
            l.append(low_price)
            c.append(close_price)  
            v.append(volume)

        self.orignal["open"] = o
        self.orignal["high"] = h
        self.orignal["low"] = l
        self.orignal["close"] = c
        self.orignal["volume"] = v
        self.orignal.index = t

    def base_analysis(self, df: DataFrame = None):
        """"""
        if df is None:
            df = self.orignal

        if df is None:
            output("数据为空,请输入数据")

        close_price = df["close"]

        output("第一步:画出行情图,检查数据断点")

        close_price.plot(figsize=(20, 8), title="close_price")
        plt.show()

        random_test(close_price)
        stability_test(close_price)
        autocorrelation_test(close_price)

        self.relative_volatility_analysis(df)
        self.growth_analysis(df)

        self.calculate_index(df)

        return df

    def relative_volatility_analysis(self, df: DataFrame = None):
        """
        相对波动率
        """
        output("第五步:相对波动率分析")
        df["volatility"] = talib.ATR(
            np.array(df["high"]),
            np.array(df["low"]),
            np.array(df["close"]),
            self.window_volatility
        )

        df["fixed_cost"] = df["close"] * self.rate
        df["relative_vol"] = df["volatility"] - df["fixed_cost"]

        df["relative_vol"].plot(figsize=(20, 6), title="relative volatility")
        plt.show()

        df["relative_vol"].hist(bins=200, figsize=(20, 6), grid=False)
        plt.show()

        statitstic_info(df["relative_vol"])  

    def growth_analysis(self, df: DataFrame = None):
        """
        百分比K线变化率
        """
        output("第六步:变化率分析")
        df["pre_close"] = df["close"].shift(1).fillna(0)
        df["g%"] = 100 * (df["close"] - df["pre_close"]) / df["close"]

        df["g%"].plot(figsize=(20, 6), title="growth", ylim=(-5, 5))
        plt.show()

        df["g%"].hist(bins=200, figsize=(20, 6), grid=False)
        plt.show()

        statitstic_info(df["g%"])  

    def calculate_index(self, df: DataFrame = None):
        """"""
        output("第七步:计算相关技术指标,返回DataFrame\n")

        if self.index_1to1:
            for i in self.index_1to1:
                func = getattr(talib, i)
                df[i] = func(
                    np.array(df["close"]), 
                    self.window_index
                )

        if self.index_3to1:
            for i in self.index_3to1:
                func = getattr(talib, i)
                df[i] = func(        
                    np.array(df["high"]),
                    np.array(df["low"]),
                    np.array(df["close"]),
                    self.window_index
                )

        if self.index_2to2:
            for i in self.index_2to2:
                func = getattr(talib, i)
                result_down, result_up = func(
                    np.array(df["high"]),
                    np.array(df["low"]),
                    self.window_index
                )
                up = i + "_UP"
                down = i + "_DOWN"
                df[up] = result_up
                df[down] = result_down

        if self.index_2to1:
            for i in self.index_2to1:
                func = getattr(talib, i)
                df[i] = func(
                    np.array(df["high"]),
                    np.array(df["low"]),
                    self.window_index
                )

        if self.index_4to1:
            for i in self.index_4to1:
                func = getattr(talib, i)
                df[i] = func(  
                    np.array(df["open"]),      
                    np.array(df["high"]),
                    np.array(df["low"]),
                    np.array(df["close"]),
                )
        return df

    def multi_time_frame_analysis(self, intervals: list = None, df: DataFrame = None):
        """"""
        if not intervals:
            output("请输入K线合成周期")
            return

        if df is None:
            df = self.orignal

        if df is None:
            output("请先加载数据")
            return

        for interval in intervals: 
            output("------------------------------------------------")  
            output(f"合成{interval}周期K先并开始数据分析")

            data = pd.DataFrame()
            data["open"] = df["open"].resample(interval, how="first")
            data["high"] = df["high"].resample(interval, how="max")
            data["low"] = df["low"].resample(interval, how="min")
            data["close"] = df["close"].resample(interval, how="last")
            data["volume"] = df["volume"].resample(interval, how="sum")

            result = self.base_analysis(data)
            self.results[interval] = result

    def show_chart(self, data, boll_wide):
        """"""      
        data["boll_up"] = data["SMA"] + data["STDDEV"] * boll_wide
        data["boll_down"] = data["SMA"] - data["STDDEV"] * boll_wide

        up_signal = []
        down_signal = []
        len_data = len(data["close"]) 
        for i in range(1, len_data):
            if data.iloc[i]["close"] > data.iloc[i]["boll_up"]and data.iloc[i-1]["close"] < data.iloc[i - 1]["boll_up"]:
                up_signal.append(i)

            elif data.iloc[i]["close"] < data.iloc[i]["boll_down"] and data.iloc[i-1]["close"] > data.iloc[i - 1]["boll_down"]:
                down_signal.append(i)

        fig = plt.figure(figsize=(20, 8))
        close = data["close"]
        plt.plot(close, lw=1)
        plt.plot(close, '^', markersize=5, color='r', label='UP signal', markevery=up_signal)
        plt.plot(close, 'v', markersize=5, color='g', label='DOWN signal', markevery=down_signal)
        plt.plot(data["boll_up"], lw=0.5, color="r")
        plt.plot(data["boll_down"], lw=0.5, color="g")
        plt.legend()
        plt.show()

        data["ATR"].plot(figsize=(20, 3), title="ATR")
        plt.show()


def random_test(close_price):
    """"""
    acorr_result = acorr_ljungbox(close_price, lags=1)
    p_value = acorr_result[1]
    if p_value < 0.05:
        output("第二步:随机性检验:非纯随机性")
    else:
        output("第二步:随机性检验:纯随机性")
    output(f"白噪声检验结果:{acorr_result}\n")


def stability_test(close_price):
    """"""
    statitstic = ADF(close_price)
    t_s = statitstic[1]
    t_c = statitstic[4]["10%"]

    if t_s > t_c:
        output("第三步:平稳性检验:存在单位根,时间序列不平稳")
    else:
        output("第三步:平稳性检验:不存在单位根,时间序列平稳")

    output(f"ADF检验结果:{statitstic}\n")


def autocorrelation_test(close_price):
    """"""
    output("第四步:画出自相关性图,观察自相关特性")

    plot_acf(close_price, lags=60)
    plt.show()

    plot_pacf(close_price, lags=60).show()
    plt.show()


def statitstic_info(df):
    """"""
    mean = round(df.mean(), 4)
    median = round(df.median(), 4)    
    output(f"样本平均数:{mean}, 中位数: {median}")

    skew = round(df.skew(), 4)
    kurt = round(df.kurt(), 4)

    if skew == 0:
        skew_attribute = "对称分布"
    elif skew > 0:
        skew_attribute = "分布偏左"
    else:
        skew_attribute = "分布偏右"

    if kurt == 0:
        kurt_attribute = "正态分布"
    elif kurt > 0:
        kurt_attribute = "分布陡峭"
    else:
        kurt_attribute = "分布平缓"

    output(f"偏度为:{skew},属于{skew_attribute};峰度为:{kurt},属于{kurt_attribute}\n")


def output(msg):
    """
    Output message of backtesting engine.
    """
    print(f"{datetime.now()}\t{msg}")

 

了解更多知识,请关注vn.py社区公众号。
description

Member
avatar
加入于:
帖子: 1
声望: 0

沙发

Member
加入于:
帖子: 7
声望: 2

from vnpy.trader.database import database_manager
2.0.9版本是不是修改了?

Member
avatar
加入于:
帖子: 3
声望: 0

在使用jupyter 对数据进行 多周期分析 时,报错如下?
报错显示是random_test函数的acorr_result值为空?是不是于随机性检验结果有关?

2020-02-21 11:32:49.868343 ------------------------------------------------
2020-02-21 11:32:49.868343 合成5min周期K先并开始数据分析
2020-02-21 11:32:49.883959 第一步:画出行情图,检查数据断点

2020-02-21 11:32:50.149438 第二步:随机性检验:纯随机性
2020-02-21 11:32:50.149438 白噪声检验结果:(array([nan]), array([nan]))


MissingDataError Traceback (most recent call last)

<ipython-input-12-7b0b58a21f2f> in <module>
2 #intervals = ["5min","15min","30min","1h","2h","4h"]
3 intervals = ["5min"]
----> 4 herramiento.multi_time_frame_analysis(intervals=intervals)

~\myvnpy\data_analysis.py in multi_time_frame_analysis(self, intervals, df)
257 data["volume"] = df["volume"].resample(interval, how="sum")
258
--> 259 result = self.base_analysis(data)
260 self.results[interval] = result
261

~\myvnpy\data_analysis.py in base_analysis(self, df)
128
129 random_test(close_price)
--> 130 stability_test(close_price)
131 autocorrelation_test(close_price)
132

~\myvnpy\data_analysis.py in stability_test(close_price)
304 def stability_test(close_price):
305 """"""
--> 306 statitstic = ADF(close_price)
307 t_s = statitstic[1]
308 t_c = statitstic[4]["10%"]

c:\vnstudio\lib\site-packages\statsmodels\tsa\stattools.py in adfuller(x, maxlag, regression, autolag, store, regresults)
266 if not regresults:
267 icbest, bestlag = _autolag(OLS, xdshort, fullRHS, startlag,
--> 268 maxlag, autolag)
269 else:
270 icbest, bestlag, alres = _autolag(OLS, xdshort, fullRHS, startlag,

c:\vnstudio\lib\site-packages\statsmodels\tsa\stattools.py in _autolag(mod, endog, exog, startlag, maxlag, method, modargs, fitargs, regresults)
93 method = method.lower()
94 for lag in range(startlag, startlag + maxlag + 1):
---> 95 mod_instance = mod(endog, exog[:, :lag], *modargs)
96 results[lag] = mod_instance.fit()
97

c:\vnstudio\lib\site-packages\statsmodels\regression\linear_model.py in init(self, endog, exog, missing, hasconst, kwargs)
857
kwargs):
858 super(OLS, self).init(endog, exog, missing=missing,
--> 859 hasconst=hasconst, **kwargs)
860 if "weights" in self._init_keys:
861 self._init_keys.remove("weights")

c:\vnstudio\lib\site-packages\statsmodels\regression\linear_model.py in init(self, endog, exog, weights, missing, hasconst, kwargs)
700 weights = weights.squeeze()
701 super(WLS, self).init(endog, exog, missing=missing,
--> 702 weights=weights, hasconst=hasconst,
kwargs)
703 nobs = self.exog.shape[0]
704 weights = self.weights

c:\vnstudio\lib\site-packages\statsmodels\regression\linear_model.py in init(self, endog, exog, kwargs)
188 """
189 def init(self, endog, exog,
kwargs):
--> 190 super(RegressionModel, self).init(endog, exog, **kwargs)
191 self._data_attr.extend(['pinv_wexog', 'wendog', 'wexog', 'weights'])
192

c:\vnstudio\lib\site-packages\statsmodels\base\model.py in init(self, endog, exog, kwargs)
234
235 def init(self, endog, exog=None,
kwargs):
--> 236 super(LikelihoodModel, self).init(endog, exog, **kwargs)
237 self.initialize()
238

c:\vnstudio\lib\site-packages\statsmodels\base\model.py in init(self, endog, exog, kwargs)
75 hasconst = kwargs.pop('hasconst', None)
76 self.data = self._handle_data(endog, exog, missing, hasconst,
---> 77
kwargs)
78 self.k_constant = self.data.k_constant
79 self.exog = self.data.exog

c:\vnstudio\lib\site-packages\statsmodels\base\model.py in _handle_data(self, endog, exog, missing, hasconst, kwargs)
98
99 def _handle_data(self, endog, exog, missing, hasconst,
kwargs):
--> 100 data = handle_data(endog, exog, missing, hasconst, **kwargs)
101 # kwargs arrays could have changed, easier to just attach here
102 for key in kwargs:

c:\vnstudio\lib\site-packages\statsmodels\base\data.py in handle_data(endog, exog, missing, hasconst, kwargs)
670 klass = handle_data_class_factory(endog, exog)
671 return klass(endog, exog=exog, missing=missing, hasconst=hasconst,
--> 672
kwargs)

c:\vnstudio\lib\site-packages\statsmodels\base\data.py in init(self, endog, exog, missing, hasconst, **kwargs)
85 self.const_idx = None
86 self.k_constant = 0
---> 87 self._handle_constant(hasconst)
88 self._check_integrity()
89 self._cache = {}

c:\vnstudio\lib\site-packages\statsmodels\base\data.py in _handle_constant(self, hasconst)
131 exog_max = np.max(self.exog, axis=0)
132 if not np.isfinite(exog_max).all():
--> 133 raise MissingDataError('exog contains inf or nans')
134 exog_min = np.min(self.exog, axis=0)
135 const_idx = np.where(exog_max == exog_min)[0].squeeze()

MissingDataError: exog contains inf or nans

Member
avatar
加入于:
帖子: 3
声望: 0

**

如上问题,在运行mult_time_frame_analysis 模块时,5分钟采样前数据如下图一,采样后如图二(从上到下顺序),为什么出现这么多NaN值?


description

description

Member
avatar
加入于:
帖子: 3
声望: 0

问题找到了,是数据的问题,NaN位置附近的数据缺失,造成合并后数据NaN

Member
avatar
加入于:
帖子: 15
声望: 2

非常感谢社区的开源精神,今天14:00课程已买。~坐等直播。

Member
加入于:
帖子: 51
声望: 2

eπi10 wrote:

问题找到了,是数据的问题,NaN位置附近的数据缺失,造成合并后数据NaN

是的··我也是到这一步出了问题·。 你是怎么处理的呢。 我在想这是 休息日没有行情数据照成的。 难道要 把休息日的 数据自己拼接上去

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3