1. 升级原因:
由于vnpy系统升级之最新的3.0版本,python底层的对象继承机制发生变化,导致原来的一部分绘图部件因为多继承而发生初始化失败,无法使用,必须升级。
近期不少vnpy的会员朋友不断地私信我,反映这些绘图部件用不了了,因为本人最近忙于交易策略的开发,无暇顾及,实在是抽不出时间,请大家谅解!
现在问题已经解决,可以放心使用。
2. 升级代码:
2.1 修改BarManager
修改vnpy\chart\manager.py中的BarManager,为它添加一个函数:
def get_bar_idx(self,trade_dt:datetime) -> int: # hxxjava add
"""
get the index of a bar which the trade time belongs to.
return:
-1 : belongs to none
0,1,... : bar's index
"""
a1 = np.array(sorted(self._datetime_index_map.keys()))
a2 = a1 <= trade_dt
return np.sum(a2 == True) - 1
当然别忘了在该文件的引用部分添加下面的语句
import numpy as np # hxxjava add
2.2 绘图部件代码
from datetime import datetime
from typing import List, Tuple, Dict
from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from pyqtgraph import ScatterPlotItem
import pyqtgraph as pg
import numpy as np
import talib
import copy
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager
from vnpy.trader.object import (
BarData,
OrderData,
TradeData
)
from vnpy.trader.object import Direction, Exchange, Interval, Offset, Status, Product, OptionType, OrderType
class BarItem(CandleItem):
""" 美国线 """
BAR_WIDTH = 0.3
def __init__(self, manager: BarManager):
""""""
super().__init__(manager)
self.bar_pen: QtGui.QPen = pg.mkPen(color="w", width=2)
self.bar_brush: QtGui.QBrush = pg.mkBrush(color="w")
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
# Create objects
candle_picture = QtGui.QPicture()
painter = QtGui.QPainter(candle_picture)
# Set painter color
painter.setPen(self.bar_pen)
painter.setBrush(self.bar_brush)
open,high,low,close = bar.open_price,bar.high_price,bar.low_price,bar.close_price
painter.drawLine(QtCore.QPointF(ix - self.BAR_WIDTH, open),QtCore.QPointF(ix, open))
painter.drawLine(QtCore.QPointF(ix, high),QtCore.QPointF(ix, low))
painter.drawLine(QtCore.QPointF(ix + self.BAR_WIDTH, close),QtCore.QPointF(ix, close))
# Finish
painter.end()
return candle_picture
class LineItem(CandleItem):
""""""
def __init__(self, manager: BarManager):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
last_bar = self._manager.get_bar(ix - 1)
# Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
# Set painter color
painter.setPen(self.white_pen)
# Draw Line
end_point = QtCore.QPointF(ix, bar.close_price)
if last_bar:
start_point = QtCore.QPointF(ix - 1, last_bar.close_price)
else:
start_point = end_point
painter.drawLine(start_point, end_point)
# Finish
painter.end()
return picture
def get_info_text(self, ix: int) -> str:
""""""
text = ""
bar = self._manager.get_bar(ix)
if bar:
text = f"Close:{bar.close_price}"
return text
class SmaItem(CandleItem):
""""""
def __init__(self, manager: BarManager):
""""""
super().__init__(manager)
self.line_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)
self.sma_window = 10
self.sma_data: Dict[int, float] = {}
def set_pen(self,pen:QtGui.QPen):
""" 设置绘图的笔 """
self.line_pen = pen
def set_sma_window(self,sma_window:int):
""" 设置Sma的窗口 """
self.sma_window = sma_window
def get_sma_value(self, ix: int) -> float:
""""""
if ix < 0:
return 0
# When initialize, calculate all rsi value
if not self.sma_data:
bars = self._manager.get_all_bars()
close_data = [bar.close_price for bar in bars]
sma_array = talib.SMA(np.array(close_data), self.sma_window)
for n, value in enumerate(sma_array):
self.sma_data[n] = value
# Return if already calcualted
if ix in self.sma_data:
return self.sma_data[ix]
# Else calculate new value
close_data = []
for n in range(ix - self.sma_window, ix + 1):
bar = self._manager.get_bar(n)
close_data.append(bar.close_price)
sma_array = talib.SMA(np.array(close_data), self.sma_window)
sma_value = sma_array[-1]
self.sma_data[ix] = sma_value
return sma_value
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
sma_value = self.get_sma_value(ix)
last_sma_value = self.get_sma_value(ix - 1)
# Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
# Set painter color
painter.setPen(self.line_pen)
# Draw Line
start_point = QtCore.QPointF(ix-1, last_sma_value)
end_point = QtCore.QPointF(ix, sma_value)
painter.drawLine(start_point, end_point)
# Finish
painter.end()
return picture
def get_info_text(self, ix: int) -> str:
""""""
if ix in self.sma_data:
sma_value = self.sma_data[ix]
text = f"SMA{self.sma_window} {sma_value:.1f}"
else:
text = "SMA{self.sma_window} -"
return text
class RsiItem(ChartItem):
""""""
def __init__(self, manager: BarManager):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=2)
self.rsi_window = 14
self.rsi_data: Dict[int, float] = {}
def get_rsi_value(self, ix: int) -> float:
""""""
if ix < 0:
return 50
# When initialize, calculate all rsi value
if not self.rsi_data:
bars = self._manager.get_all_bars()
close_data = [bar.close_price for bar in bars]
rsi_array = talib.RSI(np.array(close_data), self.rsi_window)
for n, value in enumerate(rsi_array):
self.rsi_data[n] = value
# Return if already calcualted
if ix in self.rsi_data:
return self.rsi_data[ix]
# Else calculate new value
close_data = []
for n in range(ix - self.rsi_window, ix + 1):
bar = self._manager.get_bar(n)
close_data.append(bar.close_price)
rsi_array = talib.RSI(np.array(close_data), self.rsi_window)
rsi_value = rsi_array[-1]
self.rsi_data[ix] = rsi_value
return rsi_value
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
rsi_value = self.get_rsi_value(ix)
last_rsi_value = self.get_rsi_value(ix - 1)
# Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
# Draw RSI line
painter.setPen(self.yellow_pen)
if np.isnan(last_rsi_value) or np.isnan(rsi_value):
# print(ix - 1, last_rsi_value,ix, rsi_value,)
pass
else:
end_point = QtCore.QPointF(ix, rsi_value)
start_point = QtCore.QPointF(ix - 1, last_rsi_value)
painter.drawLine(start_point, end_point)
# Draw oversold/overbought line
painter.setPen(self.white_pen)
painter.drawLine(
QtCore.QPointF(ix, 70),
QtCore.QPointF(ix - 1, 70),
)
painter.drawLine(
QtCore.QPointF(ix, 30),
QtCore.QPointF(ix - 1, 30),
)
# Finish
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
# min_price, max_price = self._manager.get_price_range()
rect = QtCore.QRectF(
0,
0,
len(self._bar_picutures),
100
)
return rect
def get_y_range( self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
""" """
return 0, 100
def get_info_text(self, ix: int) -> str:
""""""
if ix in self.rsi_data:
rsi_value = self.rsi_data[ix]
text = f"RSI {rsi_value:.1f}"
# print(text)
else:
text = "RSI -"
return text
def to_int(value: float) -> int:
""""""
return int(round(value, 0))
def adjust_range(in_range:Tuple[float, float])->Tuple[float, float]:
""" 将y方向的显示范围扩大到1.1 """
ret_range:Tuple[float, float]
diff = abs(in_range[0] - in_range[1])
ret_range = (in_range[0]-diff*0.05,in_range[1]+diff*0.05)
return ret_range
class MacdItem(ChartItem):
""""""
_values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
last_range:Tuple[int, int] = (-1,-1) # 最新显示K线索引范围
def __init__(self, manager: BarManager):
""""""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self.short_window = 12
self.long_window = 26
self.M = 9
self.macd_data: Dict[int, Tuple[float,float,float]] = {}
def get_macd_value(self, ix: int) -> Tuple[float,float,float]:
""""""
if ix < 0:
return (0.0,0.0,0.0)
# When initialize, calculate all macd value
if not self.macd_data:
bars = self._manager.get_all_bars()
close_data = [bar.close_price for bar in bars]
diffs,deas,macds = talib.MACD(np.array(close_data),
fastperiod=self.short_window,
slowperiod=self.long_window,
signalperiod=self.M)
for n in range(0,len(diffs)):
self.macd_data[n] = (diffs[n],deas[n],macds[n])
# Return if already calcualted
if ix in self.macd_data:
return self.macd_data[ix]
# Else calculate new value
close_data = []
for n in range(ix-self.long_window-self.M+1, ix + 1):
bar = self._manager.get_bar(n)
close_data.append(bar.close_price)
diffs,deas,macds = talib.MACD(np.array(close_data),
fastperiod=self.short_window,
slowperiod=self.long_window,
signalperiod=self.M)
diff,dea,macd = diffs[-1],deas[-1],macds[-1]
self.macd_data[ix] = (diff,dea,macd)
return (diff,dea,macd)
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
macd_value = self.get_macd_value(ix)
last_macd_value = self.get_macd_value(ix - 1)
# # Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
# # Draw macd lines
if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
# print("略过macd lines0")
pass
else:
end_point0 = QtCore.QPointF(ix, macd_value[0])
start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
painter.setPen(self.white_pen)
painter.drawLine(start_point0, end_point0)
if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
# print("略过macd lines1")
pass
else:
end_point1 = QtCore.QPointF(ix, macd_value[1])
start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
painter.setPen(self.yellow_pen)
painter.drawLine(start_point1, end_point1)
if not np.isnan(macd_value[2]):
if (macd_value[2]>0):
painter.setPen(self.red_pen)
painter.setBrush(pg.mkBrush(255,0,0))
else:
painter.setPen(self.green_pen)
painter.setBrush(pg.mkBrush(0,255,0))
painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
else:
# print("略过macd lines2")
pass
painter.end()
return picture
def boundingRect(self) -> QtCore.QRectF:
""""""
min_y, max_y = self.get_y_range()
rect = QtCore.QRectF(
0,
min_y,
len(self._bar_picutures),
max_y
)
return rect
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
# 获得3个指标在y轴方向的范围
# hxxjava 修改,2020-6-29
# 当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,
offset = max(self.short_window,self.long_window) + self.M - 1
if not self.macd_data or len(self.macd_data) < offset:
# print(f'(min_ix,max_ix){(min_ix,max_ix)} offset={offset},len(self.macd_data)={len(self.macd_data)}')
# hxxjava 修改,2021-5-8,因为升级vnpy,其依赖的pyqtgraph版本也升级了,原来为return 0,1
return -100, 100
# print("len of range dict:",len(self._values_ranges),",macd_data:",len(self.macd_data),(min_ix,max_ix))
if min_ix != None: # 调整最小K线索引
min_ix = max(min_ix,offset)
if max_ix != None: # 调整最大K线索引
max_ix = min(max_ix, len(self.macd_data)-1)
last_range = (min_ix,max_ix) # 请求的最新范围
if last_range == (None,None): # 当显示范围不变时
if self.last_range in self._values_ranges:
# 如果y方向范围已经保存
# 读取y方向范围
result = self._values_ranges[self.last_range]
# print("1:",self.last_range,result)
return adjust_range(result)
else:
# 如果y方向范围没有保存
# 从macd_data重新计算y方向范围
min_ix,max_ix = 0,len(self.macd_data)-1
macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
ndarray = np.array(macd_list)
max_price = np.nanmax(ndarray)
min_price = np.nanmin(ndarray)
# 保存y方向范围,同时返回结果
result = (min_price, max_price)
self.last_range = (min_ix,max_ix)
self._values_ranges[self.last_range] = result
# print("2:",self.last_range,result)
return adjust_range(result)
""" 以下为显示范围变化时 """
if last_range in self._values_ranges:
# 该范围已经保存过y方向范围
# 取得y方向范围,返回结果
result = self._values_ranges[last_range]
# print("3:",last_range,result)
return adjust_range(result)
# 该范围没有保存过y方向范围
# 从macd_data重新计算y方向范围
macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
ndarray = np.array(macd_list)
max_price = np.nanmax(ndarray)
min_price = np.nanmin(ndarray)
# 取得y方向范围,返回结果
result = (min_price, max_price)
self.last_range = last_range
self._values_ranges[self.last_range] = result
# print("4:",self.last_range,result)
return adjust_range(result)
def get_info_text(self, ix: int) -> str:
""" """
barscount = len(self._manager._bars) # hxxjava debug
if ix in self.macd_data:
diff,dea,macd = self.macd_data[ix]
words = [
f"diff {diff:.3f}"," ",
f"dea {dea:.3f}"," ",
f"macd {macd:.3f}",
f"barscount={ix,barscount}"
]
text = "\n".join(words)
else:
text = "diff - \ndea - \nmacd -"
return text
def tip_func(x,y,data):
""" """
return f"{data}"
class BaseScatter(pg.ScatterPlotItem):
""" """
def __init__(self, plot:pg.PlotItem,manager:BarManager,*args, **kargs):
""" """
super().__init__(args=args,kargs=kargs)
self.plot = plot
self.manager = manager
self.plot.addItem(self)
self.opts['hoverable'] = True
def hoverEvent(self, ev):
""" """
if self.opts['hoverable']:
old = self.data['hovered']
if ev.exit:
new = np.zeros_like(self.data['hovered'])
else:
new = self._maskAt(ev.pos())
if self._hasHoverStyle():
self.data['sourceRect'][old ^ new] = 0
self.data['hovered'] = new
self.updateSpots()
points = self.points()[new][::-1]
# Show information about hovered points in a tool tip
vb = self.getViewBox()
if vb is not None and self.opts['tip'] is not None:
cutoff = 10
# tip = [self.opts['tip'](x=pt.pos().x(), y=pt.pos().y(), data=pt.data())
tip = [tip_func(x=pt.pos().x(), y=pt.pos().y(), data=pt.data()) for pt in points[:cutoff]]
if len(points) > cutoff:
tip.append('({} others...)'.format(len(points) - cutoff))
vb.setToolTip('\n\n'.join(tip))
self.sigHovered.emit(self, points, ev)
class TradeItem(BaseScatter):
""" 成交单绘图部件 """
TRADE_COLOR_MAP = {
(Direction.LONG,Offset.OPEN):'red',
(Direction.LONG,Offset.CLOSE):'magenta',
(Direction.LONG,Offset.CLOSETODAY):'magenta',
(Direction.LONG,Offset.CLOSEYESTERDAY):'magenta',
(Direction.SHORT,Offset.OPEN):'green',
(Direction.SHORT,Offset.CLOSE):'yellow',
(Direction.SHORT,Offset.CLOSETODAY):'yellow',
(Direction.SHORT,Offset.CLOSEYESTERDAY):'yellow',
}
TRADE_COMMAND_MAP = {
(Direction.LONG,Offset.OPEN):'买开',
(Direction.LONG,Offset.CLOSE):'买平',
(Direction.LONG,Offset.CLOSETODAY):'买平今',
(Direction.LONG,Offset.CLOSEYESTERDAY):'买平昨',
(Direction.SHORT,Offset.OPEN):'卖开',
(Direction.SHORT,Offset.CLOSE):'卖平',
(Direction.SHORT,Offset.CLOSETODAY):'卖平今',
(Direction.SHORT,Offset.CLOSEYESTERDAY):'卖平昨',
}
def __init__(self, plot:pg.PlotItem,manager:BarManager):
""" """
super().__init__(plot=plot,manager=manager,size=15, pxMode=True,pen=pg.mkPen(None), brush=pg.mkBrush(255, 255, 255, 120))
self.trades : List = []
def _to_scatter_data(self,trade:TradeData):
""" """
idx = self.manager.get_bar_idx(trade.datetime)
if idx == -1:
return {}
bar:BarData = self.manager.get_bar(idx)
color = self.TRADE_COLOR_MAP[(trade.direction,trade.offset)]
size = 10
LL,HH = self.manager.get_price_range()
y_adjustment = (HH-LL) * 0.01
if trade.direction == Direction.LONG:
symbol = 't1'
y = bar.low_price - y_adjustment
else:
symbol = 't'
y = bar.high_price + y_adjustment
# pen = pg.mkPen(QtGui.QColor(color))
# brush = pg.mkBrush(QtGui.QColor(color))
scatter_data = {
"pos": (idx, y),
"size": size,
"pen": color,
"brush": color,
"symbol": symbol,
"data": "成交单:{},单号:{},指令:{},价格:{},手数:{},时间:{}".format(
trade.vt_symbol,
trade.vt_tradeid,
self.TRADE_COMMAND_MAP[(trade.direction,trade.offset)],
trade.price,trade.volume,
trade.datetime.strftime('%Y-%m-%d %H:%M:%S')
)
}
return scatter_data
def add_trades(self, trades: List[TradeData]):
""""""
# 将trade转换为scatter数据
# self.updated = False
self.trades.extend(trades)
spots = []
for trade in self.trades:
scatter = self._to_scatter_data(trade)
if not scatter:
continue
spots.append(scatter)
# self.clear()
# self.plot.removeItem(self)
self.setData(spots,hoverable=True)
def add_trade(self,trade:TradeData):
""" """
self.trades.append(trade)
spots = []
for trade in self.trades:
scatter = self._to_scatter_data(trade)
if not scatter:
continue
spots.append(scatter)
# self.clear()
# self.plot.removeItem(self)
self.setData(spots,hoverable=True)
class OrderItem(BaseScatter):
""" 成交单绘图部件 """
ORDER_COLOR_MAP = {
(Direction.LONG,Offset.OPEN):'red',
(Direction.LONG,Offset.CLOSE):'magenta',
(Direction.LONG,Offset.CLOSETODAY):'magenta',
(Direction.LONG,Offset.CLOSEYESTERDAY):'magenta',
(Direction.SHORT,Offset.OPEN):'green',
(Direction.SHORT,Offset.CLOSE):'yellow',
(Direction.SHORT,Offset.CLOSETODAY):'yellow',
(Direction.SHORT,Offset.CLOSEYESTERDAY):'yellow',
}
ORDER_COMMAND_MAP = {
(Direction.LONG,Offset.OPEN):'买开',
(Direction.LONG,Offset.CLOSE):'买平',
(Direction.LONG,Offset.CLOSETODAY):'买平今',
(Direction.LONG,Offset.CLOSEYESTERDAY):'买平昨',
(Direction.SHORT,Offset.OPEN):'卖开',
(Direction.SHORT,Offset.CLOSE):'卖平',
(Direction.SHORT,Offset.CLOSETODAY):'卖平今',
(Direction.SHORT,Offset.CLOSEYESTERDAY):'卖平昨',
}
def __init__(self, plot:pg.PlotItem,manager:BarManager):
""" """
super().__init__(plot=plot,manager=manager,size=15, pxMode=True,pen=pg.mkPen(None), brush=pg.mkBrush(255, 255, 255, 120))
self.orders : List[OrderData] = []
def _to_scatter_data(self,order:OrderData):
""" """
if not order.datetime:
return {}
idx = self.manager.get_bar_idx(order.datetime)
if idx == -1:
return {}
bar:BarData = self.manager.get_bar(idx)
color = self.ORDER_COLOR_MAP[(order.direction,order.offset)]
size = 10
LL,HH = self.manager.get_price_range()
y_adjustment = (HH-LL) * 0.02
if order.direction == Direction.LONG:
symbol = 'o'
y = bar.low_price - y_adjustment
else:
symbol = 'o'
y = bar.high_price + y_adjustment
# pen = pg.mkPen(QtGui.QColor(color))
# brush = pg.mkBrush(QtGui.QColor(color))
scatter_data = {
"pos": (idx, y),
"size": size,
"pen": color,
"brush": color,
"symbol": symbol,
"data": "委托单:{},单号:{},指令:{},价格:{},手数:{},时间:{}".format(
order.vt_symbol,
order.vt_orderid,
self.ORDER_COMMAND_MAP[(order.direction,order.offset)],
order.price,order.volume,
order.datetime.strftime('%Y-%m-%d %H:%M:%S')
)
}
return scatter_data
def add_orders(self, orders: List[OrderData]):
""""""
# 将trade转换为scatter数据
# self.updated = False
filter_orders = [order for order in orders if order.datetime is not None and order.traded > 0]
if not filter_orders:
return
self.orders.extend(filter_orders)
spots = []
for order in self.orders:
scatter = self._to_scatter_data(order)
if not scatter:
continue
spots.append(scatter)
print(f"spots={spots}")
# self.clear()
# self.plot.removeItem(self)
self.setData(spots,hoverable=True)
def add_order(self,order:OrderData):
""" """
if order.datetime is None or order.traded == 0:
return
self.orders.append(order)
spots = []
for order in self.orders:
scatter = self._to_scatter_data(order)
if not scatter:
continue
spots.append(scatter)
print(f"spots={spots}")
# self.clear()
# self.plot.removeItem(self)
self.setData(spots,hoverable=True)
3. OrderItem和TradeItem使用注意事项
创建OrderItem和TradeItem时,必须传递主图或者附图的plot和bar管理器BarManager,示例代码如下:
candle_plot = self.chart.get_plot('candle')
manager = self.chart._manager
self.trade_item:TradeItem = TradeItem(plot=candle_plot,manager=manager)
4. 移动到成交单图标上会有提示
当十字光标移动到成交单图标时,如果当根K线上发生过多次成交,你可能只看见一个图标,但其实是有多个图标被绘制的,这反应在图中的光标提示中,如图所示:
5. 再增加一个美国线指标BarItem
看效果图: