绘制K线图表时有两种选择,一种是不显示未结束的临时K线,另外一种是显示未结束的临时K线。
不知道您有没有发现,当您选择显示未结束的临时K线时,麻烦也就来了,计算机显得非常的慢!
什么原因?是因为python不适合做此类的图表显示吗?还是指标添加的太多?
非也,是因为我们图表控件的写作思路出了问题!
答案是:主要的原因处在临时K线的指标的计算上。让我们仔细地分析下......
1. 计算临时K线指标与历史K线指标有什么不同?
无论什么指标,历史K线的指标的计算是一次的,即每根K线只计算一次,这是一定的!
但是计算临时K线的指标却不同,原理上是每个tick都可能改变K线的开、高、低、收和成交量等的值,那么收到一个tick都需要重新计算一下该临时K线的指标,收到的tick越快,显示的指标越多,计算的次数就越多,计算量就越大,这也是一定的!
目前我们的指标在计算历史K线的指标时,是把历史数据一次性的调用talib的相关函数, 把一个巨大数组进行计算一次性地出来,然后把每个指标对应地保存到一个按K线索引为键值对字典中,方便绘制图形的时候查询的。
目前vnpy就没有考虑计算临时K线的指标显示,更不用说考虑如何高效地计算计算临时K线的指标了。我探索了临时K线的显示了,做法是对于没有计算过的K线,查询self._manager中的bar的,将最后的若干bar的需要的属性值查询出来,然后用talib的相关函数来计算出此时临时K线的指标,然后保存到指标字典的对应键值下,方便绘制图形的时候查询的。不过正是这个做法,让临时K线的指标的计算效率极低!
2. 如何提升计算临时K线指标的速度
临时K线的指标的计算效率低,与反复不断地查询和复制self._manager中的bar的属性有关,其实准备数据的时间比调用talib函数的时间长多了!
怎么办?把准备数据的时间分散到平时K线更新的函数中,用一个数组管理器来管理这些数据。也就是用存储空间换取一次性准备大量数据的时间。这样就可以提升临时K线的指标的计算效率了。
3. 实现方法
3.1 扩展vnpy的ArrayManager成为数组数字管理器
vnpy的ArrayManager很好,计算效率也不错。可是它不是为动态变化的临时K线设计的。因为每次调用update_bar它都会整体把数据平移一个位置,这对历史K线是可以的,但对应临时K线就不可以。我们希望它在更新临时K线时不要移动位置,只更新尾部(索引为-1)就OK了。
为此我们需要对其进行扩展,设计一个可以适应临时K线的动态数组管理器,我把它叫做DynaArrayManager。
DynaArrayManager的代码如下:
class DynaArrayManager(ArrayManager):
"""
DynaArrayManager是对ArrayManager的扩展,解决它无法用于临时K线的指标计算问题。
作者:hxxjava
"""
def __init__(self, size: int = 100) -> None:
super().__init__(size)
self.bar_datetimes:List[datetime] = []
def update_bar(self, bar: BarData) -> None:
if not self.bar_datetimes or self.bar_datetimes[-1] < bar.datetime:
self.bar_datetimes.append(bar.datetime)
super().update_bar(bar)
else:
"""
Only Update all arrays in array manager with temporary bar data.
"""
self.open_array[-1] = bar.open_price
self.high_array[-1] = bar.high_price
self.low_array[-1] = bar.low_price
self.close_array[-1] = bar.close_price
self.volume_array[-1] = bar.volume
self.turnover_array[-1] = bar.turnover
self.open_interest_array[-1] = bar.open_interest
def dmi(self,N:int=14,M:int=7,array: bool = False)-> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
"""
Directional Movement Indicator:
TR := SUM(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(LOW-REF(CLOSE,1))),N);
HD := HIGH-REF(HIGH,1); // 创新高量
LD := REF(LOW,1)-LOW; // 创新低量
DMP:= SUM(IFELSE(HD>0 && HD>LD,HD,0),N); // N日创新高量累计,赢家通吃——做多力量
DMM:= SUM(IFELSE(LD>0 && LD>HD,LD,0),N); // N日创新低量累计,赢家通吃——做空力量
PDI: DMP*100/TR,LINETHICK2; //做多力量
MDI: DMM*100/TR,LINETHICK2; //做空力量
ADX: MA(ABS(MDI-PDI)/(MDI+PDI)*100,M),LINETHICK2; // ADX:多空力量差值M日平滑
ADXR:(ADX+REF(ADX,M))/2,LINETHICK2; // ADR:ADX与M日前ADX的均值
"""
TR = SUM(talib.TRANGE(self.high,self.low,self.close),N)
# TR = SUM(MAX(MAX(self.high-self.low,ABS(self.high-REF(self.close,1))),ABS(self.low-REF(self.close,1))),N)
HD = self.high - REF(self.high,1)
LD = REF(self.low,1) - self.low
DMP = SUM(IIF((HD>0)&(HD>LD),HD,0),N)
DMM = SUM(IIF((LD>0)&(LD>HD),LD,0),N)
PDI = DMP*100/TR;
MDI = DMM*100/TR;
ADX = talib.MA(np.abs(MDI-PDI)/(MDI+PDI)*100,M) # ADX:多空力量差值M日平滑
ADXR = (ADX+REF(ADX,M))/2 # ADR:ADX与M日前ADX的均值
if array:
return PDI,MDI,ADX,ADXR
return PDI[-1],MDI[-1],ADX[-1],ADXR[-1]
def macd3(self, fast_period: int, slow_period: int, signal_period: int, array: bool = False) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray], Tuple[float, float, float,float]]:
"""
MACD having three lines:(diff,dea,slow_dea) and macd histgram
"""
diff, dea, macd = talib.MACD(
self.close, fast_period, slow_period, signal_period
)
slow_dea = talib.EMA(dea,signal_period)
if array:
return diff, dea, macd, slow_dea
return diff[-1], dea[-1], macd[-1],slow_dea[-1]
3.2 改变CandleItem和ChartItem的实现机制
下面是两个典型的ChartItem的实现方法,其中都用到了DynaArrayManager,并且都重新实现了ChartItem的update_history()和update_bar()两个方法,这两个方法就是为了用存储空间换取一次性准备大量数据的时间。
另外还要注意_get_macd_value()和_get_dmi_value()的后半段,它们的作用就是为了适应临时K线的指标计算的特点而特别这样写的。
3.2.1 MacdItem的修改方法
class Macd3Item(ChartItem):
""" 三根线的MACD """
def __init__(self, manager: BarManager,short_window:int=12,long_window:int=26,M:int=9):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self.magetan_pen: QtGui.QPen = pg.mkPen(color=(255, 0,255),width=1,style = QtCore.Qt.DashLine)
self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
self.short_window = short_window
self.long_window = long_window
self.M = M
self.dyna_am = DynaArrayManager(max(short_window,long_window) + 2*(M-1))
self.macd_data: Dict[int, List[float,float,float]] = {}
def update_history(self, history: List[BarData]) -> None:
""" reimpliment of update_history """
for bar in history:
self.dyna_am.update_bar(bar)
super().update_history(history)
def update_bar(self, bar: BarData) -> None:
""" reimpliment of update_bar """
self.dyna_am.update_bar(bar)
super().update_bar(bar)
def _get_macd_value(self, ix: int) -> List:
""""""
max_ix = self._manager.get_count() - 1
invalid_value = [np.nan,np.nan,np.nan,np.nan]
if ix < 0 or ix > max_ix:
return invalid_value
# When initialize, calculate all macd value
if not self.macd_data:
bars:List[BarData] = self._manager.get_all_bars()
close_prices = [bar.close_price for bar in bars]
diffs,deas,macds = talib.MACD(np.array(close_prices),
fastperiod=self.short_window,
slowperiod=self.long_window,
signalperiod=self.M)
slow_deas = talib.EMA(deas,self.M)
for n in range(0,len(diffs)):
self.macd_data[n] = [diffs[n],deas[n],macds[n],slow_deas[n]]
# Return if already calcualted
if ix != max_ix and ix in self.macd_data:
return self.macd_data[ix]
if self.dyna_am.inited:
diff,dea,macd,slow_dea = self.dyna_am.macd3(self.short_window,self.long_window,self.M)
self.macd_data[ix] = [diff,dea,macd,slow_dea]
return [diff,dea,macd,slow_dea]
return [np.nan,np.nan,np.nan,np.nan]
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
# # Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
if ix < 1:
return picture
macd_value = self._get_macd_value(ix)
last_macd_value = self._get_macd_value(ix - 1)
# # Draw macd lines
if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
# print("略过macd lines0")
pass
else:
end_point0 = QtCore.QPointF(ix, macd_value[0])
start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
painter.setPen(self.white_pen)
painter.drawLine(start_point0, end_point0)
if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
# print("略过macd lines1")
pass
else:
end_point1 = QtCore.QPointF(ix, macd_value[1])
start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
painter.setPen(self.yellow_pen)
painter.drawLine(start_point1, end_point1)
if np.isnan(macd_value[3]) or np.isnan(last_macd_value[3]):
pass
else:
end_point2 = QtCore.QPointF(ix, macd_value[3])
start_point2 = QtCore.QPointF(ix - 1, last_macd_value[3])
painter.setPen(self.magetan_pen)
painter.drawLine(start_point2, end_point2)
if not np.isnan(macd_value[2]):
if (macd_value[2]>0):
painter.setPen(self.red_pen)
painter.setBrush(pg.mkBrush(255,0,0))
else:
painter.setPen(self.green_pen)
painter.setBrush(pg.mkBrush(0,255,0))
painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
else:
# print("略过macd lines2")
pass
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
min_y, max_y = self.get_y_range()
# print(f"{self.short_window,self.long_window,self.M} min_y, max_y={min_y, max_y}")
rect = QtCore.QRectF(
0,
min_y,
len(self._bar_picutures),
max_y
)
return rect
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
"""
获得4个指标在y轴方向的范围
hxxjava 修改,2022-12-14
当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
"""
if not self.macd_data:
# 如果Ofcd数据没有计算过
return (-100,100)
this_range = (min_ix,max_ix)
if this_range == (None,None):
# 如果是查询全范围
min_ix,max_ix = self._rect_area # 显示索引范围
max_ix = min(max_ix,len(self.macd_data)) - 1 # 数据索引范围
this_range = min_ix,max_ix
if this_range in self._values_ranges:
# 查询范围已经存在,直接返回已经计算过的y范围值
result = self._values_ranges[this_range]
return result
# 查询范围不存在,重新计算y范围值
macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
ndarray = np.array(macd_list)
if ndarray.shape[0] == 0:
return (-100,100)
# 求值范围内的的MACD值的最大和最小值
max_price = np.nanmax(ndarray)
min_price = np.nanmin(ndarray)
if np.isnan(max_price) or np.isnan(max_price):
return (-100,100)
# 保存y方向范围,同时返回结果
result = (min_price, max_price)
self._values_ranges[this_range] = result
return result
def get_info_text(self, ix: int) -> str:
""" """
barscount = len(self._manager._bars) # hxxjava debug
if ix in self.macd_data:
[diff,dea,macd,slow_dea] = self.macd_data[ix]
words = [
f"Macd3{(self.short_window,self.long_window,self.M)}:",
f"diff {diff:.3f}",
f"dea {dea:.3f}",
f"slow_dea={slow_dea:.3f}",
f"macd {macd:.3f}",
]
text = "\n".join(words)
else:
text = "diff - \ndea - \nslow_dea - \nmacd -"
return text
3.2.1 DmiItem的修改方法
class DmiItem(ChartItem):
""" """
def __init__(self, manager: BarManager,N:int=14,M:int=7):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.magenta_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 255), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self.ref_pen: QtGui.QPen = pg.mkPen(color=(127, 127, 127,127), width=1, style = QtCore.Qt.DashLine)
self._values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
self.N = N
self.M = M
self.dyna_am = DynaArrayManager(2*max(N,M))
self.dmi_data: Dict[int, Tuple[float,float,float,float]] = {} # (PDI,MDI,ADX,ADXR)
def update_history(self, history: List[BarData]) -> None:
""" reimpliment of update_history """
for bar in history:
self.dyna_am.update_bar(bar)
super().update_history(history)
def update_bar(self, bar: BarData) -> None:
""" reimpliment of update_bar """
self.dyna_am.update_bar(bar)
super().update_bar(bar)
def _get_dmi_value(self, ix: int) -> Tuple[float,float,float,float]:
""""""
max_ix = self._manager.get_count()-1
invalid_data = (np.nan,np.nan,np.nan,np.nan)
if ix < 0 or ix > max_ix:
print(f"DmiItem{ix},invalid_data1")
return invalid_data
# When initialize, calculate all macd value
if not self.dmi_data:
bars:List[BarData] = self._manager.get_all_bars()
highs = np.array([bar.high_price for bar in bars])
lows = np.array([bar.low_price for bar in bars])
closes = np.array([bar.close_price for bar in bars])
pdi,mdi,adx,adxr = DMI(high=highs,low=lows,close=closes,N=self.N,M=self.M,array=True)
for n in range(0,len(adx)):
self.dmi_data[n] = (pdi[n],mdi[n],adx[n],adxr[n])
# Return if already calcualted
if ix != max_ix and ix in self.dmi_data:
return self.dmi_data[ix]
if self.dyna_am.inited:
pdi,mdi,adx,adxr = self.dyna_am.dmi(N=self.N,M=self.M)
self.dmi_data[ix] = [pdi,mdi,adx,adxr]
return [pdi,mdi,adx,adxr]
return invalid_data
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
# # Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
if ix > self.N + self.M:
# 画参考线
painter.setPen(self.ref_pen)
for ref in [20.0,50,80]:
painter.drawLine(QtCore.QPointF(ix-0.5,ref),QtCore.QPointF(ix+0.5,ref))
# 画4根线
dmi_value = self._get_dmi_value(ix)
last_dmi_value = self._get_dmi_value(ix - 1)
pens = [self.white_pen,self.yellow_pen,self.magenta_pen,self.green_pen]
for i in range(4):
end_point0 = QtCore.QPointF(ix, dmi_value[i])
start_point0 = QtCore.QPointF(ix - 1, last_dmi_value[i])
painter.setPen(pens[i])
painter.drawLine(start_point0, end_point0)
# 多空颜色标示
pdi,mdi = dmi_value[0],dmi_value[1]
if not(np.isnan(pdi) or np.isnan(mdi)):
if abs(pdi - mdi) > 1e-2:
painter.setPen(pg.mkPen(color=(168, 0, 0) if pdi > mdi else (0, 168, 0),width=3))
painter.drawLine(QtCore.QPointF(ix,pdi),QtCore.QPointF(ix,mdi))
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
min_y, max_y = self.get_y_range()
rect = QtCore.QRectF(
0,
min_y,
len(self._bar_picutures),
max_y
)
return rect
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
""" """
return (0.0,100.0)
def get_info_text(self, ix: int) -> str:
""" """
if ix in self.dmi_data:
[pdi,mdi,adx,adxr] = self.dmi_data.get(ix,None)
words = [
f"DMI{self.N,self.M}:",
f"PDI {pdi:.2f}",
f"MDI {mdi:.2f}",
f"ADX {adx:.2f}",
f"ADXR {adxr:.2f}",
]
text = "\n".join(words)
else:
text = f"DMI{self.N,self.M}:-"
return text
4. 快了,果然快了
这样写作的,再加载您的K线图表时,你会发现即使显示是需要显示临时K线指标,显示的速度也会快多了!!!