vn.py官网
开源量化社区
Member
avatar
加入于:
帖子: 7
声望: 0

class ArrayManager(object):
"""
For:

1. time series container of bar data
2. calculating technical indicator value
"""

def __init__(self, size: int = 100):
    """Constructor"""
    self.count: int = 0
    self.size: int = size
    self.inited: bool = False

    self.open_array: np.ndarray = np.zeros(size)
    self.high_array: np.ndarray = np.zeros(size)
    self.low_array: np.ndarray = np.zeros(size)
    self.close_array: np.ndarray = np.zeros(size)
    self.volume_array: np.ndarray = np.zeros(size)
    self.open_interest_array: np.ndarray = np.zeros(size)

def update_bar(self, bar: BarData) -> None:
    """
    Update new bar data into array manager.
    """
    self.count += 1
    if not self.inited and self.count >= self.size:
        self.inited = True

    self.open_array[:-1] = self.open_array[1:]
    self.high_array[:-1] = self.high_array[1:]
    self.low_array[:-1] = self.low_array[1:]
    self.close_array[:-1] = self.close_array[1:]
    self.volume_array[:-1] = self.volume_array[1:]
    self.open_interest_array[:-1] = self.open_interest_array[1:]

    self.open_array[-1] = bar.open_price
    self.high_array[-1] = bar.high_price
    self.low_array[-1] = bar.low_price
    self.close_array[-1] = bar.close_price
    self.volume_array[-1] = bar.volume
    self.open_interest_array[-1] = bar.open_interest

@property
def open(self) -> np.ndarray:
    """
    Get open price time series.
    """
    return self.open_array

@property
def high(self) -> np.ndarray:
    """
    Get high price time series.
    """
    return self.high_array

@property
def low(self) -> np.ndarray:
    """
    Get low price time series.
    """
    return self.low_array

@property
def close(self) -> np.ndarray:
    """
    Get close price time series.
    """
    return self.close_array

@property
def volume(self) -> np.ndarray:
    """
    Get trading volume time series.
    """
    return self.volume_array

@property
def open_interest(self) -> np.ndarray:
    """
    Get trading volume time series.
    """
    return self.open_interest_array

def sma(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Simple moving average.
    """
    result = talib.SMA(self.close, n)
    if array:
        return result
    return result[-1]

def ema(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Exponential moving average.
    """
    result = talib.EMA(self.close, n)
    if array:
        return result
    return result[-1]

def sar(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    Exponential moving average.
    """
    result = talib.SAR(self.high, self.low, acceleration=0, maximum=0)
    if array:
        return result
    return result[-1]

def t3(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Exponential moving average.
    """
    result = talib.T3(self.high, n, vfactor=0)
    if array:
        return result
    return result[-1]

def dema(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Exponential moving average.
    """
    result = talib.DEMA(self.close, n)
    if array:
        return result
    return result[-1]

def tema(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Exponential moving average.
    """
    result = talib.TEMA(self.close, n)
    if array:
        return result
    return result[-1]

def kama(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    KAMA.
    """
    result = talib.KAMA(self.close, n)
    if array:
        return result
    return result[-1]

def wma(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.WMA(self.close, n)
    if array:
        return result
    return result[-1]

def midpoint(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.MIDPOINT(self.close, n)
    if array:
        return result
    return result[-1]

def midprice(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.MIDPRICE(self.high, self.low, n)
    if array:
        return result
    return result[-1]

def trima(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.TRIMA(self.close, n)
    if array:
        return result
    return result[-1]

def ht_dcperiod(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.HT_DCPERIOD(self.close)
    if array:
        return result
    return result[-1]

def ht_dcphase(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.HT_DCPHASE(self.close)
    if array:
        return result
    return result[-1]

def ht_trendmode(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.HT_TRENDMODE(self.close)
    if array:
        return result
    return result[-1]

def avgprice(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.$GPRICE(self.open,self.high,self.low,self.close)
    if array:
        return result
    return result[-1]

def medprice(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.MEDPRICE(self.high, self.low)
    if array:
        return result
    return result[-1]

def typprice(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.TYPPRICE(self.high, self.low, self.close)
    if array:
        return result
    return result[-1]

def wclprice(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.WCLPRICE(self.high, self.low, self.close)
    if array:
        return result
    return result[-1]

def stoch(self,fastk_period:int,slowk_period:int,slowk_matype:int,slowd_period:int,slowd_matype:int,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    STOCH_slowk, STOCH_slowd = talib.STOCH(self.high, self.low, self.close,
                                                       fastk_period=5, slowk_period=3, slowk_matype=0,
                                                       slowd_period=3, slowd_matype=0)

    if array:
        return STOCH_slowk,STOCH_slowd
    return STOCH_slowk[-1],STOCH_slowd[-1]

def stochf(self, fastk_period:int ,fastd_period:int,fastd_matype:int,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    STOCHF_fastk, STOCHF_fastd = talib.STOCHF(self.high, self.low, self.close,
                                           fastk_period=5, fastd_period=3, fastd_matype=0)
    if array:
        return STOCHF_fastk, STOCHF_fastd
    return STOCHF_fastk[-1], STOCHF_fastd[-1]

def beta(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.BETA(self.high, self.low, n)
    if array:
        return result
    return result[-1]

def correl(self, n: int ,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.CORREL(self.high,self.low, n)
    if array:
        return result
    return result[-1]


def linearreg(self, n: int ,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.LINEARREG(self.close, n)
    if array:
        return result
    return result[-1]

def linearreg_angle(self, n: int ,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.LINEARREG_ANGLE(self.close, n)
    if array:
        return result
    return result[-1]

def linearreg_intercept(self, n: int ,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.LINEARREG_INTERCEPT(self.close, n)
    if array:
        return result
    return result[-1]

def linearreg_slope(self, n: int ,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.LINEARREG_SLOPE(self.close, n)
    if array:
        return result
    return result[-1]

def tsf(self, n: int ,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.TSF(self.close, n)
    if array:
        return result
    return result[-1]

def stddev(self, timeperiod: int,nbdev:int ,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.STDDEV(self.close, timeperiod, nbdev)
    if array:
        return result
    return result[-1]

def var(self, timeperiod: int,nbdev:int ,array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.VAR(self.close, timeperiod, nbdev)
    if array:
        return result
    return result[-1]


def ht_phasor(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    HT_PHASOR_inphase,HT_PHASOR_quadrature = talib.HT_PHASOR(self.close)
    if array:
        return HT_PHASOR_inphase,HT_PHASOR_quadrature
    return HT_PHASOR_inphase[-1],HT_PHASOR_quadrature[-1]

def ht_sine(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    HT_SINE_sine,HT_SINE_leadsine = talib.HT_SINE(self.close)
    if array:
        return HT_SINE_sine,HT_SINE_leadsine
    return HT_SINE_sine[-1],HT_SINE_leadsine[-1]


def sarext(self,
           startvalue: int,
           offsetonreverse: int,
           accelerationinitlong: int,
           accelerationlong: int,
           accelerationmaxlong: int,
           accelerationinitshort: int,
           accelerationshort: int,
           accelerationmaxshort: int,
           array: bool = False) -> Union[float, np.ndarray]:
    """
    WMA.
    """
    result = talib.SAREXT(self.high, self.low, startvalue, offsetonreverse,
                                accelerationinitlong, accelerationlong, accelerationmaxlong,
                                accelerationinitshort, accelerationshort, accelerationmaxshort)
    if array:
        return result
    return result[-1]

def apo(
    self,
    fast_period: int,
    slow_period: int,
    matype: int = 0,
    array: bool = False
) -> Union[float, np.ndarray]:
    """
    APO.
    """
    result = talib.APO(self.close, fast_period, slow_period, matype)
    if array:
        return result
    return result[-1]

def cmo(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    CMO.
    """
    result = talib.CMO(self.close, n)
    if array:
        return result
    return result[-1]

def mom(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    MOM.
    """
    result = talib.MOM(self.close, n)
    if array:
        return result
    return result[-1]

def ppo(
    self,
    fast_period: int,
    slow_period: int,
    matype: int = 0,
    array: bool = False
) -> Union[float, np.ndarray]:
    """
    PPO.
    """
    result = talib.PPO(self.close, fast_period, slow_period, matype)
    if array:
        return result
    return result[-1]

def roc(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    ROC.
    """
    result = talib.ROC(self.close, n)
    if array:
        return result
    return result[-1]

def rocr(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    ROCR.
    """
    result = talib.ROCR(self.close, n)
    if array:
        return result
    return result[-1]

def rocp(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    ROCP.
    """
    result = talib.ROCP(self.close, n)
    if array:
        return result
    return result[-1]

def rocr_100(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    ROCR100.
    """
    result = talib.ROCR100(self.close, n)
    if array:
        return result
    return result[-1]

def trix(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    TRIX.
    """
    result = talib.TRIX(self.close, n)
    if array:
        return result
    return result[-1]

def std(self, n: int, nbdev: int = 1, array: bool = False) -> Union[float, np.ndarray]:
    """
    Standard deviation.
    """
    result = talib.STDDEV(self.close, n, nbdev)
    if array:
        return result
    return result[-1]

def obv(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    OBV.
    """
    result = talib.OBV(self.close, self.volume)
    if array:
        return result
    return result[-1]

def cci(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Commodity Channel Index (CCI).
    """
    result = talib.CCI(self.high, self.low, self.close, n)
    if array:
        return result
    return result[-1]

def atr(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Average True Range (ATR).
    """
    result = talib.ATR(self.high, self.low, self.close, n)
    if array:
        return result
    return result[-1]

def natr(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    NATR.
    """
    result = talib.NATR(self.high, self.low, self.close, n)
    if array:
        return result
    return result[-1]

def rsi(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Relative Strenght Index (RSI).
    """
    result = talib.RSI(self.close, n)
    if array:
        return result
    return result[-1]

def bbands(
    self,
    array: bool = False
) -> Union[
    Tuple[np.ndarray, np.ndarray, np.ndarray],
    Tuple[float, float, float]
]:
    """
    talib.BBANDS
    """
    BBANDS_upper, BBANDS_middle, BBANDS_lower = talib.BBANDS(
        self.close, matype=MA_Type.T3
    )
    if array:
        return BBANDS_upper, BBANDS_middle, BBANDS_lower
    return BBANDS_upper[-1], BBANDS_middle[-1], BBANDS_lower[-1]

def macd(
    self,
    fast_period: int,
    slow_period: int,
    signal_period: int,
    array: bool = False
) -> Union[
    Tuple[np.ndarray, np.ndarray, np.ndarray],
    Tuple[float, float, float]
]:
    """
    MACD.
    """
    macd, signal, hist = talib.MACD(
        self.close, fast_period, slow_period, signal_period
    )
    if array:
        return macd, signal, hist
    return macd[-1], signal[-1], hist[-1]

def macdext(
    self,
    fast_period: int,
    fast_matype: int,
    slow_period: int,
    slow_matype: int,
    signal_period: int,
    signal_matype: int,
    array: bool = False
) -> Union[
    Tuple[np.ndarray, np.ndarray, np.ndarray],
    Tuple[float, float, float]
]:
    """
    MACD.
    """
    macd, signal, hist = talib.MACDEXT(
        self.close, fast_period, fast_matype, slow_period, slow_matype, signal_period, signal_matype
    )
    if array:
        return macd, signal, hist
    return macd[-1], signal[-1], hist[-1]

def macdfix(
    self,
    signal_period: int,
    array: bool = False
) -> Union[
    Tuple[np.ndarray, np.ndarray, np.ndarray],
    Tuple[float, float, float]
]:
    """
    MACD.
    """
    macd, signal, hist = talib.MACDFIX(
        self.close, signal_period
    )
    if array:
        return macd, signal, hist
    return macd[-1], signal[-1], hist[-1]

def adx(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    ADX.
    """
    result = talib.ADX(self.high, self.low, self.close, n)
    if array:
        return result
    return result[-1]

def adxr(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    ADXR.
    """
    result = talib.ADXR(self.high, self.low, self.close, n)
    if array:
        return result
    return result[-1]

def dx(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    DX.
    """
    result = talib.DX(self.high, self.low, self.close, n)
    if array:
        return result
    return result[-1]

def minus_di(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    MINUS_DI.
    """
    result = talib.MINUS_DI(self.high, self.low, self.close, n)
    if array:
        return result
    return result[-1]

def plus_di(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    PLUS_DI.
    """
    result = talib.PLUS_DI(self.high, self.low, self.close, n)
    if array:
        return result
    return result[-1]

def willr(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    WILLR.
    """
    result = talib.WILLR(self.high, self.low, self.close, n)
    if array:
        return result
    return result[-1]

def ultosc(
    self,
    time_period1: int = 7,
    time_period2: int = 14,
    time_period3: int = 28,
    array: bool = False
) -> Union[float, np.ndarray]:
    """
    Ultimate Oscillator.
    """
    result = talib.ULTOSC(self.high, self.low, self.close, time_period1, time_period2, time_period3)
    if array:
        return result
    return result[-1]

def trange(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    TRANGE.
    """
    result = talib.TRANGE(self.high, self.low, self.close)
    if array:
        return result
    return result[-1]

def boll(
    self,
    n: int,
    dev: float,
    array: bool = False
) -> Union[
    Tuple[np.ndarray, np.ndarray],
    Tuple[float, float]
]:
    """
    Bollinger Channel.
    """
    mid = self.sma(n, array)
    std = self.std(n, 1, array)

    up = mid + std * dev
    down = mid - std * dev

    return up, down

def keltner(
    self,
    n: int,
    dev: float,
    array: bool = False
) -> Union[
    Tuple[np.ndarray, np.ndarray],
    Tuple[float, float]
]:
    """
    Keltner Channel.
    """
    mid = self.sma(n, array)
    atr = self.atr(n, array)

    up = mid + atr * dev
    down = mid - atr * dev

    return up, down

def donchian(
    self, n: int, array: bool = False
) -> Union[
    Tuple[np.ndarray, np.ndarray],
    Tuple[float, float]
]:
    """
    Donchian Channel.
    """
    up = talib.MAX(self.high, n)
    down = talib.MIN(self.low, n)

    if array:
        return up, down
    return up[-1], down[-1]

def aroon(
    self,
    n: int,
    array: bool = False
) -> Union[
    Tuple[np.ndarray, np.ndarray],
    Tuple[float, float]
]:
    """
    Aroon indicator.
    """
    aroon_up, aroon_down = talib.AROON(self.high, self.low, n)

    if array:
        return aroon_up, aroon_down
    return aroon_up[-1], aroon_down[-1]

def aroonosc(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Aroon Oscillator.
    """
    result = talib.AROONOSC(self.high, self.low, n)

    if array:
        return result
    return result[-1]

def minus_dm(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    MINUS_DM.
    """
    result = talib.MINUS_DM(self.high, self.low, n)

    if array:
        return result
    return result[-1]

def plus_dm(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    PLUS_DM.
    """
    result = talib.PLUS_DM(self.high, self.low, n)

    if array:
        return result
    return result[-1]

def mfi(self, n: int, array: bool = False) -> Union[float, np.ndarray]:
    """
    Money Flow Index.
    """
    result = talib.MFI(self.high, self.low, self.close, self.volume, n)
    if array:
        return result
    return result[-1]

def ad(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    AD.
    """
    result = talib.AD(self.high, self.low, self.close, self.volume)
    if array:
        return result
    return result[-1]

def adosc(
    self,
    fast_period: int,
    slow_period: int,
    array: bool = False
) -> Union[float, np.ndarray]:
    """
    ADOSC.
    """
    result = talib.ADOSC(self.high, self.low, self.close, self.volume, fast_period, slow_period)
    if array:
        return result
    return result[-1]

def bop(self, array: bool = False) -> Union[float, np.ndarray]:
    """
    BOP.
    """
    result = talib.BOP(self.open, self.high, self.low, self.close)

    if array:
        return result
    return result[-1]
Member
avatar
加入于:
帖子: 7
声望: 0

注释部分请忽略

Member
avatar
加入于:
帖子: 1
声望: 0

标个中文注释 就好了

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3

沪公网安备 31011502017034号