VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 419
声望: 170

绘制K线图表时有两种选择,一种是不显示未结束的临时K线,另外一种是显示未结束的临时K线。
不知道您有没有发现,当您选择显示未结束的临时K线时,麻烦也就来了,计算机显得非常的慢!
什么原因?是因为python不适合做此类的图表显示吗?还是指标添加的太多?
非也,是因为我们图表控件的写作思路出了问题!
答案是:主要的原因处在临时K线的指标的计算上。让我们仔细地分析下......

1. 计算临时K线指标与历史K线指标有什么不同?

无论什么指标,历史K线的指标的计算是一次的,即每根K线只计算一次,这是一定的!
但是计算临时K线的指标却不同,原理上是每个tick都可能改变K线的开、高、低、收和成交量等的值,那么收到一个tick都需要重新计算一下该临时K线的指标,收到的tick越快,显示的指标越多,计算的次数就越多,计算量就越大,这也是一定的!
目前我们的指标在计算历史K线的指标时,是把历史数据一次性的调用talib的相关函数, 把一个巨大数组进行计算一次性地出来,然后把每个指标对应地保存到一个按K线索引为键值对字典中,方便绘制图形的时候查询的。
目前vnpy就没有考虑计算临时K线的指标显示,更不用说考虑如何高效地计算计算临时K线的指标了。我探索了临时K线的显示了,做法是对于没有计算过的K线,查询self._manager中的bar的,将最后的若干bar的需要的属性值查询出来,然后用talib的相关函数来计算出此时临时K线的指标,然后保存到指标字典的对应键值下,方便绘制图形的时候查询的。不过正是这个做法,让临时K线的指标的计算效率极低!

2. 如何提升计算临时K线指标的速度

临时K线的指标的计算效率低,与反复不断地查询和复制self._manager中的bar的属性有关,其实准备数据的时间比调用talib函数的时间长多了!
怎么办?把准备数据的时间分散到平时K线更新的函数中,用一个数组管理器来管理这些数据。也就是用存储空间换取一次性准备大量数据的时间。这样就可以提升临时K线的指标的计算效率了。

3. 实现方法

3.1 扩展vnpy的ArrayManager成为数组数字管理器

vnpy的ArrayManager很好,计算效率也不错。可是它不是为动态变化的临时K线设计的。因为每次调用update_bar它都会整体把数据平移一个位置,这对历史K线是可以的,但对应临时K线就不可以。我们希望它在更新临时K线时不要移动位置,只更新尾部(索引为-1)就OK了。
为此我们需要对其进行扩展,设计一个可以适应临时K线的动态数组管理器,我把它叫做DynaArrayManager。
DynaArrayManager的代码如下:

class DynaArrayManager(ArrayManager):
    """ 
    DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。 
    作者:hxxjava
    """
    def __init__(self, size: int = 100) -> None:
        super().__init__(size)

        self.bar_datetimes:List[datetime] = []

    def update_bar(self, bar: BarData) -> None:
        if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
            self.bar_datetimes.append(bar.datetime)
            super().update_bar(bar)        

        else:
            """
            Only Update all arrays in array manager with temporary bar data.
            """
            self.open_array[-1] = bar.open_price
            self.high_array[-1] = bar.high_price
            self.low_array[-1] = bar.low_price
            self.close_array[-1] = bar.close_price
            self.volume_array[-1] = bar.volume
            self.turnover_array[-1] = bar.turnover
            self.open_interest_array[-1] = bar.open_interest

    def dmi(self,N:int=14,M:int=7,array: bool = False)-> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
        """ 
        Directional Movement Indicator:
        TR := SUM(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(LOW-REF(CLOSE,1))),N);
        HD := HIGH-REF(HIGH,1); // 创新高量
        LD := REF(LOW,1)-LOW;   // 创新低量
        DMP:= SUM(IFELSE(HD>0 && HD>LD,HD,0),N); // N日创新高量累计,赢家通吃——做多力量
        DMM:= SUM(IFELSE(LD>0 && LD>HD,LD,0),N); // N日创新低量累计,赢家通吃——做空力量
        PDI: DMP*100/TR,LINETHICK2; //做多力量
        MDI: DMM*100/TR,LINETHICK2; //做空力量
        ADX: MA(ABS(MDI-PDI)/(MDI+PDI)*100,M),LINETHICK2; // ADX:多空力量差值M日平滑
        ADXR:(ADX+REF(ADX,M))/2,LINETHICK2;  // ADR:ADX与M日前ADX的均值        
        """
        TR = SUM(talib.TRANGE(self.high,self.low,self.close),N)
        # TR = SUM(MAX(MAX(self.high-self.low,ABS(self.high-REF(self.close,1))),ABS(self.low-REF(self.close,1))),N)
        HD = self.high - REF(self.high,1)
        LD = REF(self.low,1) - self.low 

        DMP = SUM(IIF((HD>0)&(HD>LD),HD,0),N)
        DMM = SUM(IIF((LD>0)&(LD>HD),LD,0),N)

        PDI = DMP*100/TR;
        MDI = DMM*100/TR;

        ADX = talib.MA(np.abs(MDI-PDI)/(MDI+PDI)*100,M)  # ADX:多空力量差值M日平滑
        ADXR = (ADX+REF(ADX,M))/2                         # ADR:ADX与M日前ADX的均值     

        if array:
            return PDI,MDI,ADX,ADXR

        return PDI[-1],MDI[-1],ADX[-1],ADXR[-1]

    def macd3(self, fast_period: int, slow_period: int, signal_period: int, array: bool = False) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
        """
        MACD having three lines:(diff,dea,slow_dea) and macd histgram
        """
        diff, dea, macd = talib.MACD(
            self.close, fast_period, slow_period, signal_period
        )
        slow_dea = talib.EMA(dea,signal_period)

        if array:
            return diff, dea, macd, slow_dea
        return diff[-1], dea[-1], macd[-1],slow_dea[-1]

3.2 改变CandleItem和ChartItem的实现机制

下面是两个典型的ChartItem的实现方法,其中都用到了DynaArrayManager,并且都重新实现了ChartItem的update_history()和update_bar()两个方法,这两个方法就是为了用存储空间换取一次性准备大量数据的时间。
另外还要注意_get_macd_value()和_get_dmi_value()的后半段,它们的作用就是为了适应临时K线的指标计算的特点而特别这样写的。

3.2.1 MacdItem的修改方法

class Macd3Item(ChartItem):
    """ 三根线的MACD """
    def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
        self.magetan_pen: QtGui.QPen = pg.mkPen(color=(255, 0,255),width=1,style = QtCore.Qt.DashLine)

        self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

        self.short_window = short_window
        self.long_window = long_window
        self.M = M

        self.dyna_am = DynaArrayManager(max(short_window,long_window) + 2*(M-1))
        self.macd_data: Dict[int, List[float,float,float]] = {}

    def update_history(self, history: List[BarData]) -> None:
        """ reimpliment of update_history """
        for bar in history:
            self.dyna_am.update_bar(bar)
        super().update_history(history)

    def update_bar(self, bar: BarData) -> None:
        """ reimpliment of update_bar """
        self.dyna_am.update_bar(bar)
        super().update_bar(bar)

    def _get_macd_value(self, ix: int) -> List:
        """"""
        max_ix = self._manager.get_count() - 1
        invalid_value = [np.nan,np.nan,np.nan,np.nan]
        if ix < 0 or ix > max_ix:
            return invalid_value

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars:List[BarData] = self._manager.get_all_bars()
            close_prices = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_prices), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)
            slow_deas = talib.EMA(deas,self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = [diffs[n],deas[n],macds[n],slow_deas[n]]

        # Return if already calcualted
        if ix != max_ix and ix in self.macd_data:
            return self.macd_data[ix]

        if self.dyna_am.inited:
            diff,dea,macd,slow_dea = self.dyna_am.macd3(self.short_window,self.long_window,self.M)
            self.macd_data[ix] = [diff,dea,macd,slow_dea]
            return [diff,dea,macd,slow_dea]

        return [np.nan,np.nan,np.nan,np.nan]

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        if ix < 1:
            return picture

        macd_value = self._get_macd_value(ix)
        last_macd_value = self._get_macd_value(ix - 1)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if np.isnan(macd_value[3]) or np.isnan(last_macd_value[3]):
            pass
        else:
            end_point2 = QtCore.QPointF(ix, macd_value[3])
            start_point2 = QtCore.QPointF(ix - 1, last_macd_value[3])
            painter.setPen(self.magetan_pen)
            painter.drawLine(start_point2, end_point2)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        # print(f"{self.short_window,self.long_window,self.M} min_y, max_y={min_y, max_y}")
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """ 
        获得4个指标在y轴方向的范围 
        hxxjava 修改,2022-12-14
        当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
        """  
        if not self.macd_data:
            # 如果Ofcd数据没有计算过
            return (-100,100)

        this_range = (min_ix,max_ix)       
        if this_range == (None,None):
            # 如果是查询全范围
            min_ix,max_ix = self._rect_area                 # 显示索引范围
            max_ix = min(max_ix,len(self.macd_data)) - 1    # 数据索引范围
            this_range = min_ix,max_ix

        if this_range in self._values_ranges:
            # 查询范围已经存在,直接返回已经计算过的y范围值
            result = self._values_ranges[this_range]
            return result

        # 查询范围不存在,重新计算y范围值
        macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
        ndarray = np.array(macd_list)  
        if ndarray.shape[0] == 0:
            return (-100,100)

        # 求值范围内的的MACD值的最大和最小值
        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)
        if np.isnan(max_price) or np.isnan(max_price):
            return (-100,100)

        # 保存y方向范围,同时返回结果
        result = (min_price, max_price)
        self._values_ranges[this_range] = result
        return result

    def get_info_text(self, ix: int) -> str:
        """ """
        barscount = len(self._manager._bars) # hxxjava debug
        if ix in self.macd_data:
            [diff,dea,macd,slow_dea] = self.macd_data[ix]
            words = [
                f"Macd3{(self.short_window,self.long_window,self.M)}:",
                f"diff {diff:.3f}",
                f"dea {dea:.3f}",
                f"slow_dea={slow_dea:.3f}",
                f"macd {macd:.3f}",
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nslow_dea - \nmacd -"

        return text

3.2.1 DmiItem的修改方法

class DmiItem(ChartItem): 
    """  """
    def __init__(self, manager: BarManager,N:int=14,M:int=7):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.magenta_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 255), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
        self.ref_pen: QtGui.QPen = pg.mkPen(color=(127, 127, 127,127), width=1, style = QtCore.Qt.DashLine)

        self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

        self.N = N
        self.M = M

        self.dyna_am = DynaArrayManager(2*max(N,M))

        self.dmi_data: Dict[int, Tuple[float,float,float,float]] = {} # (PDI,MDI,ADX,ADXR)

    def update_history(self, history: List[BarData]) -> None:
        """ reimpliment of update_history """
        for bar in history:
            self.dyna_am.update_bar(bar)
        super().update_history(history)

    def update_bar(self, bar: BarData) -> None:
        """ reimpliment of update_bar """
        self.dyna_am.update_bar(bar)
        super().update_bar(bar)

    def _get_dmi_value(self, ix: int) -> Tuple[float,float,float,float]:
        """"""
        max_ix = self._manager.get_count()-1
        invalid_data = (np.nan,np.nan,np.nan,np.nan)
        if ix < 0 or ix > max_ix:
            print(f"DmiItem{ix},invalid_data1")
            return invalid_data

        # When initialize, calculate all macd value
        if not self.dmi_data:
            bars:List[BarData] = self._manager.get_all_bars()
            highs = np.array([bar.high_price for bar in bars])
            lows = np.array([bar.low_price for bar in bars])
            closes = np.array([bar.close_price for bar in bars])

            pdi,mdi,adx,adxr = DMI(high=highs,low=lows,close=closes,N=self.N,M=self.M,array=True)

            for n in range(0,len(adx)):
                self.dmi_data[n] = (pdi[n],mdi[n],adx[n],adxr[n])


        # Return if already calcualted
        if ix != max_ix and ix in self.dmi_data:
            return self.dmi_data[ix]

        if self.dyna_am.inited:
            pdi,mdi,adx,adxr = self.dyna_am.dmi(N=self.N,M=self.M)
            self.dmi_data[ix] = [pdi,mdi,adx,adxr]
            return [pdi,mdi,adx,adxr]

        return invalid_data

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        if ix > self.N + self.M:
            # 画参考线
            painter.setPen(self.ref_pen)
            for ref in [20.0,50,80]:
                painter.drawLine(QtCore.QPointF(ix-0.5,ref),QtCore.QPointF(ix+0.5,ref))  

            # 画4根线
            dmi_value = self._get_dmi_value(ix)
            last_dmi_value = self._get_dmi_value(ix - 1)
            pens = [self.white_pen,self.yellow_pen,self.magenta_pen,self.green_pen]
            for i in range(4):
                end_point0 = QtCore.QPointF(ix, dmi_value[i])
                start_point0 = QtCore.QPointF(ix - 1, last_dmi_value[i])
                painter.setPen(pens[i])
                painter.drawLine(start_point0, end_point0)        

            # 多空颜色标示
            pdi,mdi = dmi_value[0],dmi_value[1] 
            if not(np.isnan(pdi) or np.isnan(mdi)):
                if abs(pdi - mdi) > 1e-2:
                    painter.setPen(pg.mkPen(color=(168, 0, 0) if pdi > mdi else (0, 168, 0),width=3))
                    painter.drawLine(QtCore.QPointF(ix,pdi),QtCore.QPointF(ix,mdi))  

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """  """
        return (0.0,100.0)  

    def get_info_text(self, ix: int) -> str:
        """ """
        if ix in self.dmi_data:
            [pdi,mdi,adx,adxr] = self.dmi_data.get(ix,None)
            words = [
                f"DMI{self.N,self.M}:",
                f"PDI {pdi:.2f}",
                f"MDI {mdi:.2f}",
                f"ADX {adx:.2f}",
                f"ADXR {adxr:.2f}",
                ]
            text = "\n".join(words)
        else:
            text = f"DMI{self.N,self.M}:-"

        return text

4. 快了,果然快了

这样写作的,再加载您的K线图表时,你会发现即使显示是需要显示临时K线指标,显示的速度也会快多了!!!

Member
avatar
加入于:
帖子: 1446
声望: 102

感谢分享!

Member
avatar
加入于:
帖子: 7
声望: 0

感谢大牛的慷慨分享!在GPT帮助下,对代码做些许更改后成功实现了。太感谢了!

Member
avatar
加入于:
帖子: 1
声望: 0

大神好,请问pdi,mdi,adx,adxr = DMI(high=highs,low=lows,close=closes,N=self.N,M=self.M,array=True)中的DMI是哪里的函数?

Member
avatar
加入于:
帖子: 1446
声望: 102

talib里面的

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】