VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 419
声望: 170

1. 典型的绘图部件

保存文件:vnpy\usertools\chart_items.py
其中包含:

  • LineItem
  • RsiItem
  • SmaItem
  • MacdItem
  • TradeItem
  • OrderItem
from datetime import datetime
from typing import List, Tuple, Dict

from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from pyqtgraph import ScatterPlotItem
import pyqtgraph as pg
import numpy as np
import talib
import copy

from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager

from vnpy.trader.object import (
    BarData,
    OrderData,
    TradeData
)

from vnpy.trader.object import Direction, Exchange, Interval, Offset, Status, Product, OptionType, OrderType

from collections import OrderedDict
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")


class LineItem(CandleItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        last_bar = self._manager.get_bar(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.white_pen)

        # Draw Line
        end_point = QtCore.QPointF(ix, bar.close_price)

        if last_bar:
            start_point = QtCore.QPointF(ix - 1, last_bar.close_price)
        else:
            start_point = end_point

        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def get_info_text(self, ix: int) -> str:
        """"""
        text = ""
        bar = self._manager.get_bar(ix)
        if bar:
            text = f"Close:{bar.close_price}"
        return text

class SmaItem(CandleItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)

        self.sma_window = 10
        self.sma_data: Dict[int, float] = {}

    def get_sma_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 0

        # When initialize, calculate all rsi value
        if not self.sma_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            sma_array = talib.SMA(np.array(close_data), self.sma_window)

            for n, value in enumerate(sma_array):
                self.sma_data[n] = value

        # Return if already calcualted
        if ix in self.sma_data:
            return self.sma_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix - self.sma_window, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        sma_array = talib.SMA(np.array(close_data), self.sma_window)
        sma_value = sma_array[-1]
        self.sma_data[ix] = sma_value

        return sma_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        sma_value = self.get_sma_value(ix)
        last_sma_value = self.get_sma_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.blue_pen)

        # Draw Line
        start_point = QtCore.QPointF(ix-1, last_sma_value)
        end_point = QtCore.QPointF(ix, sma_value)
        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.sma_data:
            sma_value = self.sma_data[ix]
            text = f"SMA {sma_value:.1f}"
        else:
            text = "SMA -"

        return text

class RsiItem(ChartItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=2)

        self.rsi_window = 14
        self.rsi_data: Dict[int, float] = {}

    def get_rsi_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 50

        # When initialize, calculate all rsi value
        if not self.rsi_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            rsi_array = talib.RSI(np.array(close_data), self.rsi_window)

            for n, value in enumerate(rsi_array):
                self.rsi_data[n] = value

        # Return if already calcualted
        if ix in self.rsi_data:
            return self.rsi_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix - self.rsi_window, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        rsi_array = talib.RSI(np.array(close_data), self.rsi_window)
        rsi_value = rsi_array[-1]
        self.rsi_data[ix] = rsi_value

        return rsi_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        rsi_value = self.get_rsi_value(ix)
        last_rsi_value = self.get_rsi_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Draw RSI line
        painter.setPen(self.yellow_pen)

        if np.isnan(last_rsi_value) or np.isnan(rsi_value):
            # print(ix - 1, last_rsi_value,ix, rsi_value,)
            pass
        else:
            end_point = QtCore.QPointF(ix, rsi_value)
            start_point = QtCore.QPointF(ix - 1, last_rsi_value)
            painter.drawLine(start_point, end_point)

        # Draw oversold/overbought line
        painter.setPen(self.white_pen)

        painter.drawLine(
            QtCore.QPointF(ix, 70),
            QtCore.QPointF(ix - 1, 70),
        )

        painter.drawLine(
            QtCore.QPointF(ix, 30),
            QtCore.QPointF(ix - 1, 30),
        )

        # Finish
        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        # min_price, max_price = self._manager.get_price_range()
        rect = QtCore.QRectF(
            0,
            0,
            len(self._bar_picutures),
            100
        )
        return rect

    def get_y_range( self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """  """
        return 0, 100

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.rsi_data:
            rsi_value = self.rsi_data[ix]
            text = f"RSI {rsi_value:.1f}"
            # print(text)
        else:
            text = "RSI -"

        return text


def to_int(value: float) -> int:
    """"""
    return int(round(value, 0))

""" 将y方向的显示范围扩大到1.1 """
def adjust_range(in_range:Tuple[float, float])->Tuple[float, float]:
    ret_range:Tuple[float, float]
    diff = abs(in_range[0] - in_range[1])
    ret_range = (in_range[0]-diff*0.05,in_range[1]+diff*0.05)
    return ret_range

class MacdItem(ChartItem):
    """"""
    _values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

    last_range:Tuple[int, int] = (-1,-1)    # 最新显示K线索引范围

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)

        self.short_window = 12
        self.long_window = 26
        self.M = 9

        self.macd_data: Dict[int, Tuple[float,float,float]] = {}

    def get_macd_value(self, ix: int) -> Tuple[float,float,float]:
        """"""
        if ix < 0:
            return (0.0,0.0,0.0)

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_data), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = (diffs[n],deas[n],macds[n])

        # Return if already calcualted
        if ix in self.macd_data:
            return self.macd_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix-self.long_window-self.M+1, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        diffs,deas,macds = talib.MACD(np.array(close_data), 
                                            fastperiod=self.short_window, 
                                            slowperiod=self.long_window, 
                                            signalperiod=self.M) 
        diff,dea,macd = diffs[-1],deas[-1],macds[-1]
        self.macd_data[ix] = (diff,dea,macd)

        return (diff,dea,macd)

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        macd_value = self.get_macd_value(ix)
        last_macd_value = self.get_macd_value(ix - 1)

        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        #   获得3个指标在y轴方向的范围   
        #   hxxjava 修改,2020-6-29
        #   当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,

        offset = max(self.short_window,self.long_window) + self.M - 1

        if not self.macd_data or len(self.macd_data) < offset:
            return 0.0, 1.0

        # print("len of range dict:",len(self._values_ranges),",macd_data:",len(self.macd_data),(min_ix,max_ix))

        if min_ix != None:          # 调整最小K线索引
            min_ix = max(min_ix,offset)

        if max_ix != None:          # 调整最大K线索引
            max_ix = min(max_ix, len(self.macd_data)-1)

        last_range = (min_ix,max_ix)    # 请求的最新范围   

        if last_range == (None,None):   # 当显示范围不变时
            if self.last_range in self._values_ranges:  
                # 如果y方向范围已经保存
                # 读取y方向范围
                result = self._values_ranges[self.last_range]
                # print("1:",self.last_range,result)
                return adjust_range(result)
            else:
                # 如果y方向范围没有保存
                # 从macd_data重新计算y方向范围
                min_ix,max_ix = 0,len(self.macd_data)-1

                macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
                ndarray = np.array(macd_list)           
                max_price = np.nanmax(ndarray)
                min_price = np.nanmin(ndarray)

                # 保存y方向范围,同时返回结果
                result = (min_price, max_price)
                self.last_range = (min_ix,max_ix)
                self._values_ranges[self.last_range] = result
                # print("2:",self.last_range,result)
                return adjust_range(result)

        """ 以下为显示范围变化时 """

        if last_range in self._values_ranges:
            # 该范围已经保存过y方向范围
            # 取得y方向范围,返回结果
            result = self._values_ranges[last_range]
            # print("3:",last_range,result)
            return adjust_range(result)

        # 该范围没有保存过y方向范围
        # 从macd_data重新计算y方向范围
        macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
        ndarray = np.array(macd_list) 
        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)

        # 取得y方向范围,返回结果
        result = (min_price, max_price)
        self.last_range = last_range
        self._values_ranges[self.last_range] = result
        # print("4:",self.last_range,result)
        return adjust_range(result)


    def get_info_text(self, ix: int) -> str:
        # """"""
        if ix in self.macd_data:
            diff,dea,macd = self.macd_data[ix]
            words = [
                f"diff {diff:.3f}"," ",
                f"dea {dea:.3f}"," ",
                f"macd {macd:.3f}"
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nmacd -"

        return text


class TradeItem(ScatterPlotItem,CandleItem): 
    """
    成交单绘图部件
    """
    def __init__(self, manager: BarManager):
        """"""
        ScatterPlotItem.__init__(self)
        # CandleItem.__init__(self,manager)
        # super(TradeItem,self).__init__(manager)
        super(CandleItem,self).__init__(manager)


        self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)

        self.trades : Dict[int,Dict[str,TradeData]] = {} # {ix:{tradeid:trade}}

    def add_trades(self,trades:List[TradeData]):
        """ 增加成交单列表到TradeItem """
        for trade in trades:
            self.add_trade(trade)

        self.set_scatter_data()
        self.update()

    def add_trade(self,trade:TradeData,draw:bool=False):
        """ 增加一个成交单到TradeItem """
        # 这里使用reverse=True,是考虑到实盘成交往往发生在最新的bar里,可以加快搜索速度
        od = OrderedDict(sorted(self._manager._datetime_index_map.items(),key = lambda t:t[0],reverse=True))
        idx = self._manager.get_count() - 1
        for dt,ix in od.items():
            # print(f"dt={dt}\ntrade.datetime {trade.datetime}")
            dt1 = CHINA_TZ.localize(datetime.combine(dt.date(),dt.time()))
            if dt1 <= trade.datetime:
                # print(f"【dt={dt},dt1={dt1},dt2={trade.datetime} ix={ix}】")
                idx = ix
                break

        # 注意:一个bar期间可能发生多个成交单
        if idx in self.trades:
            self.trades[idx][trade.tradeid] = trade
        else:
            self.trades[idx] = {trade.tradeid:trade}

        if draw:        
            self.set_scatter_data()
            self.update()

        # print(f"add_trade idx={idx} trade={trade}")

    def set_scatter_data(self):
        """ 把成交单列表绘制到ScatterPlotItem上 """
        scatter_datas = []
        for ix in self.trades:
            for trade in self.trades[ix].values():
                scatter = {
                    "pos" : (ix, trade.price),
                    "data": 1,
                    "size": 14,
                    "pen": pg.mkPen((255, 255, 255)),
                }

                if trade.direction == Direction.LONG:
                    scatter_symbol = "t1"   # Up arrow
                else:
                    scatter_symbol = "t"    # Down arrow

                if trade.offset == Offset.OPEN:
                    scatter_brush = pg.mkBrush((255, 255, 0))   # Yellow
                else:
                    scatter_brush = pg.mkBrush((0, 0, 255))     # Blue

                scatter["symbol"] = scatter_symbol
                scatter["brush"] = scatter_brush
                scatter_datas.append(scatter)

        self.setData(scatter_datas)

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.trades:
            text = "成交:"
            for tradeid,trade in self.trades[ix].items():
                # TradeData
                text += f"\n{trade.price}{trade.direction.value}{trade.offset.value}{trade.volume}手"
        else:
            text = "成交:-"

        return text


class OrderItem(ScatterPlotItem,CandleItem): 
    """
    委托单绘图部件
    """
    def __init__(self, manager: BarManager):
        """"""
        ScatterPlotItem.__init__(self)
        super(CandleItem,self).__init__(manager)

        self.orders : Dict[int,Dict[str,Order]] = {} # {ix:{orderid:order}}

    def add_orders(self,orders:List[OrderData]):
        """ 增加委托单列表到OrderItem """
        for order in orders:
            if order.datetime:
                self.add_order(order)

        self.set_scatter_data()
        self.update()

    def add_order(self,order:OrderData,draw:bool=False):
        """ 增加一个委托单到OrderItem """
        # 这里使用reverse=True,是考虑到实盘成交往往发生在最新的bar里,可以加快搜索速度

        od = OrderedDict(sorted(self._manager._datetime_index_map.items(),key = lambda t:t[0],reverse=True))
        idx = self._manager.get_count() - 1
        for dt,ix in od.items():
            # print(f"dt={dt}\ntrade.datetime {trade.datetime}")
            dt1 = CHINA_TZ.localize(datetime.combine(dt.date(),dt.time()))
            if dt1 <= order.datetime:
                # print(f"【dt={dt},dt1={dt1},dt2={order.datetime} ix={ix}】")
                idx = ix
                break

        # 注意:一个bar期间可能发生多个委托单
        if idx in self.orders:
            self.orders[idx][order.orderid] = order
        else:
            self.orders[idx] = {order.orderid:order}

        if draw:
            self.set_scatter_data()
            self.update()

    def set_scatter_data(self):
        """ 把委托单列表绘制到ScatterPlotItem上 """
        scatter_datas = []
        for ix in self.orders:
            lowest,highest=self.get_y_range()
            # print(f"range={lowest,highest}")
            for order in self.orders[ix].values():
                # 处理委托报价超出显示范围的问题
                if order.price>highest:
                    show_price = highest - 7
                elif order.price<lowest:
                    show_price = lowest + 7
                else:
                    show_price = order.price 

                scatter = {
                    "pos" : (ix, show_price),
                    "data": 1,
                    "size": 14,
                    "pen": pg.mkPen((255, 255, 255)),
                }

                if order.direction == Direction.LONG:
                    scatter_symbol = "t1"   # Up arrow
                else:
                    scatter_symbol = "t"    # Down arrow

                if order.offset == Offset.OPEN:
                    scatter_brush = pg.mkBrush((0, 128, 128))   # Yellow
                else:
                    scatter_brush = pg.mkBrush((128, 128, 0))     # Blue

                scatter["symbol"] = scatter_symbol
                scatter["brush"] = scatter_brush
                scatter_datas.append(scatter)

        self.setData(scatter_datas)

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.orders:
            text = "委托:"
            for orderid,order in self.orders[ix].items():
                # OrderData
                text += f"\n{order.price}{order.direction.value}{order.offset.value}{order.volume}手"
        else:
            text = "委托:-"


        return text

2. 修改vnpy\chart\widget.py

为ChartWidget类增加下面的函数:

    def get_item(self,item_name:str):   # hxxjava add
        """
        Get chart item by item's name.
        """
        return self._items.get(item_name,None)

3. K线图表——各绘图部件使用方法演示

保存文件:vnpy\usertools\kx_chart.py

from datetime import datetime
from typing import List, Tuple, Dict

import numpy as np
import pyqtgraph as pg
import talib
import copy

from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from vnpy.trader.database import database_manager
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData

from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager
from vnpy.chart.base import NORMAL_FONT

from vnpy.trader.engine import MainEngine
from vnpy.event import Event, EventEngine

from vnpy.trader.event import (
    EVENT_TICK,
    EVENT_TRADE,
    EVENT_ORDER,
    EVENT_POSITION,
    EVENT_ACCOUNT,
    EVENT_LOG
)

from vnpy.app.cta_strategy.base import (   
    EVENT_CTA_TICK,     
    EVENT_CTA_BAR,      
    EVENT_CTA_ORDER,    
    EVENT_CTA_TRADE,     
    EVENT_CTA_HISTORY_BAR
)

from vnpy.trader.object import (
    Direction, 
    Exchange, 
    Interval, 
    Offset, 
    Status, 
    Product, 
    OptionType, 
    OrderType,
    OrderData,
    TradeData,
)

from vnpy.usertools.chart_items import (
    LineItem,
    RsiItem,
    SmaItem,
    MacdItem,
    TradeItem,
    OrderItem,
)


class NewChartWidget(ChartWidget):
    """ 
    基于ChartWidget的K线图表 
    """
    MIN_BAR_COUNT = 100

    signal_cta_history_bar:QtCore.pyqtSignal = QtCore.pyqtSignal(Event)
    signal_cta_tick: QtCore.pyqtSignal = QtCore.pyqtSignal(Event)
    signal_cta_bar:QtCore.pyqtSignal = QtCore.pyqtSignal(Event)

    def __init__(self, parent: QtWidgets.QWidget = None,event_engine: EventEngine = None,strategy_name:str=""):
        """ 初始化 """
        super().__init__(parent)
        self.strategy_name = strategy_name
        self.event_engine = event_engine

        # 创建K线主图及多个绘图部件
        self.add_plot("candle", hide_x_axis=True)
        self.add_item(CandleItem, "candle", "candle")
        self.add_item(LineItem, "line", "candle")
        self.add_item(SmaItem, "sma", "candle")
        self.add_item(OrderItem, "order", "candle")
        self.add_item(TradeItem, "trade", "candle")

        # 创建成交量附图及绘图部件
        self.add_plot("volume", maximum_height=150)
        self.add_item(VolumeItem, "volume", "volume")

        # 创建RSI附图及绘图部件
        self.add_plot("rsi", maximum_height=150)
        self.add_item(RsiItem, "rsi", "rsi")

        # 创建MACD附图及绘图部件
        self.add_plot("macd", maximum_height=150)
        self.add_item(MacdItem, "macd", "macd")

        # 创建最新价格线、光标
        self.add_last_price_line()
        self.add_cursor()
        self.setWindowTitle(f"K线图表——{symbol}.{exchange.value},{interval},{start}-{end}")

        # 委托单列表
        self.orders:List[str,OrderData] = {}
        # 成交单列表
        self.trades:List[str,TradeData] = {}

        # self.register_event()
        # self.event_engine.start()

    def register_event(self) -> None:
        """"""
        self.signal_cta_history_bar.connect(self.process_cta_history_bar)
        self.event_engine.register(EVENT_CTA_HISTORY_BAR, self.signal_cta_history_bar.emit)

        self.signal_cta_tick.connect(self.process_tick_event)
        self.event_engine.register(EVENT_CTA_TICK, self.signal_cta_tick.emit)

        self.signal_cta_bar.connect(self.process_cta_bar)
        self.event_engine.register(EVENT_CTA_BAR, self.signal_cta_bar.emit)

    def process_cta_history_bar(self, event:Event) -> None:
        """ 处理历史K线推送 """
        strategy_name,history_bars = event.data
        if strategy_name == self.strategy_name:
            self.update_history(history_bars)

            # print(f" {strategy_name} got an EVENT_CTA_HISTORY_BAR")

    def process_tick_event(self, event: Event) -> None:
        """ 处理tick数据推送 """
        strategy_name,tick = event.data
        if strategy_name == self.strategy_name:
            if self.last_price_line:
                self.last_price_line.setValue(tick.last_price)
            #print(f" {strategy_name} got an EVENT_CTA_TICK")

    def process_cta_bar(self, event:Event)-> None:
        """ 处理K线数据推送 """

        strategy_name,bar = event.data
        if strategy_name == self.strategy_name:
            self.update_bar(bar)
            # print(f"{strategy_name} got an EVENT_CTA_BAR")

    def add_last_price_line(self):
        """"""
        plot = list(self._plots.values())[0]
        color = (255, 255, 255)

        self.last_price_line = pg.InfiniteLine(
            angle=0,
            movable=False,
            label="{value:.1f}",
            pen=pg.mkPen(color, width=1),
            labelOpts={
                "color": color,
                "position": 1,
                "anchors": [(1, 1), (1, 1)]
            }
        )
        self.last_price_line.label.setFont(NORMAL_FONT)
        plot.addItem(self.last_price_line)

    def update_history(self, history: List[BarData]) -> None:
        """
        Update a list of bar data.
        """
        self._manager.update_history(history)

        for item in self._items.values():
            item.update_history(history)

        self._update_plot_limits()

        self.move_to_right()

        self.update_last_price_line(history[-1])

    def update_bar(self, bar: BarData) -> None:
        """
        Update single bar data.
        """
        self._manager.update_bar(bar)

        for item in self._items.values():
            item.update_bar(bar)

        self._update_plot_limits()

        if self._right_ix >= (self._manager.get_count() - self._bar_count / 2):
            self.move_to_right()

        self.update_last_price_line(bar)

    def update_last_price_line(self, bar: BarData) -> None:
        """"""
        if self.last_price_line:
            self.last_price_line.setValue(bar.close_price)

    def add_orders(self,orders:List[OrderData]) -> None:
        """ 
        增加委托单列表到委托单绘图部件 
        """
        for order in orders:
            self.orders[order.orderid] = order

        order_item : OrderItem = self.get_item('order')
        if order_item:
            order_item.add_orders(self.orders.values())

    def add_trades(self,trades:List[TradeData]) -> None:
        """ 
        增加成交单列表到委托单绘图部件 
        """
        for trade in trades:
            self.trades[trade.tradeid] = trade

        trade_item : TradeItem = self.get_item('trade')
        if trade_item:
            trade_item.add_trades(self.trades.values())



################################################################
# 以下为测试代码
if __name__ == "__main__":
    def make_trades():
        import pytz
        CHINA_TZ = pytz.timezone("Asia/Shanghai")
        from vnpy.trader.object import Direction, Exchange, Interval, Offset, Status, Product, OptionType, OrderType,TradeData
        trades = [
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', tradeid='         455', direction=Direction.LONG, offset=Offset.OPEN, price=6131.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', tradeid='       12738', direction=Direction.LONG, offset=Offset.OPEN, price=6142.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', tradeid='       16233', direction=Direction.LONG, offset=Offset.OPEN, price=6158.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', tradeid='       22815', direction=Direction.LONG, offset=Offset.OPEN, price=6180.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', tradeid='       67570', direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=6400.0, volume=12, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,35))),
        ]
        return trades

    def make_orders():
        import pytz
        CHINA_TZ = pytz.timezone("Asia/Shanghai")

        orders = [
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,20)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,25)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12, traded=12, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,35)), reference=''),
        ]

        return orders


    # 开始测试代码
    app = create_qapp()

    symbol = "ag2012"
    exchange = Exchange.SHFE
    interval=Interval.MINUTE
    start=datetime(2020, 8, 13)
    end=datetime(2020, 8, 15)    

    dynamic = False  # 是否动态演示
    n = 1000          # 缓冲K线根数

    bars = database_manager.load_bar_data(
        symbol=symbol,
        exchange=exchange,
        interval=interval,
        start=start,
        end=end
    )

    print(f"一共读取{len(bars)}根K线")

    event_engine = EventEngine()

    widget = NewChartWidget(event_engine = event_engine)

    if dynamic:
        history = bars[:n]      # 先取得最早的n根bar作为历史
        new_data = bars[n:]     # 其它留着演示
    else:
        history = bars          # 先取得最新的n根bar作为历史
        new_data = []           # 演示的为空

    # 绘制历史K线主图及各个副图
    widget.update_history(history)

    # 绘制委托单到主图
    orders = make_orders()
    widget.add_orders(orders)

    # 绘制成交单到主图
    trades = make_trades()
    widget.add_trades(trades)

    def update_bar():
        if new_data:
            bar = new_data.pop(0)
            widget.update_bar(bar)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    if dynamic:
        timer.start(100)

    widget.show()

    event_engine.start()
    app.exec_()

4. 测试效果

kx_chart.py中自动测试代码,直接用VSCode打开就可以运行。

4.1 测试准备

在vnpy中使用数据管理模块,从米筐下载ag2012.SHFE的1分钟历史数据,必须包含8月13日~8月15日。

4.2 测试结果如下

description

Member
avatar
加入于:
帖子: 16
声望: 0

牛鼻

Member
avatar
加入于:
帖子: 1
声望: 0

提交merge啊, 这些功能合入到主干。

Member
avatar
加入于:
帖子: 2
声望: 0

楼主属实牛逼啊

Member
avatar
加入于:
帖子: 24
声望: 0

厉害!支持大神! 奥利给!

Member
avatar
加入于:
帖子: 24
声望: 0

厉害!支持大神! 奥利给!

Member
avatar
加入于:
帖子: 24
声望: 0

厉害!支持大神! 奥利给!

Member
avatar
加入于:
帖子: 6
声望: 0

求大神的提交合入

Member
avatar
加入于:
帖子: 4
声望: 0

膜拜大佬!

Member
avatar
加入于:
帖子: 50
声望: 1

能在同一笔交易的开平仓之间画条虚线吗?
不然太难找了

Member
avatar
加入于:
帖子: 50
声望: 1

楼主,点个赞。
只是显示信息的小窗口,现在是固定在左上角,如果数据开始就有交易信号,就会被遮挡住了,能否改成可自动找最合适显示位置的?

Member
avatar
加入于:
帖子: 419
声望: 170

晴空 wrote:

楼主,点个赞。
只是显示信息的小窗口,现在是固定在左上角,如果数据开始就有交易信号,就会被遮挡住了,能否改成可自动找最合适显示位置的?

想法不错,你自己试试改下吧,从ChartWidget的代码入手。
这涉及到主图中哪个区域有K线,哪个区域没有K线的计算。这样做出来的效果是,只要主图K线一动,指标提示子窗口就会不停移动,那样会造成你更加不舒服。
比较好的方法可能是,增加一个设置:增加显示/隐藏指标提示子窗口的开关和快捷键,如果觉得有遮挡主图K线就隐藏上面的指标提示子窗口,如果想看具体的指标值就显示指标提示子窗口。

Member
avatar
加入于:
帖子: 50
声望: 1

嗯,用一个快捷键控制信息子窗口的显示隐藏,这个办法更好一些,也好实现。

Member
avatar
加入于:
帖子: 7
声望: 0

牛逼哄哄

Member
avatar
加入于:
帖子: 6
声望: 0

楼主,你好。我用的是2.1.4版本的vnstudio,运行这个程序的时候下面这一部分import在源程序是没有的
from vnpy.app.cta_strategy.base import (
EVENT_CTA_TICK,
EVENT_CTA_BAR,
EVENT_CTA_ORDER,
EVENT_CTA_TRADE,
EVENT_CTA_HISTORY_BAR
)

ImportError: cannot import name 'EVENT_CTA_TICK' from 'vnpy.app.cta_strategy.base'
是base文件做过修改吗?

Member
avatar
加入于:
帖子: 419
声望: 170

summerkey wrote:

楼主,你好。我用的是2.1.4版本的vnstudio,运行这个程序的时候下面这一部分import在源程序是没有的
from vnpy.app.cta_strategy.base import (
EVENT_CTA_TICK,
EVENT_CTA_BAR,
EVENT_CTA_ORDER,
EVENT_CTA_TRADE,
EVENT_CTA_HISTORY_BAR
)

ImportError: cannot import name 'EVENT_CTA_TICK' from 'vnpy.app.cta_strategy.base'
是base文件做过修改吗?

是的,在base.py中增加了这些消息:

EVENT_CTA_TICK = "eCtaTick"                     # hxxjava add
EVENT_CTA_HISTORY_BAR = "eCtaHistoryBar"        # hxxjava add
EVENT_CTA_BAR = "eCtaBar"                       # hxxjava add
EVENT_CTA_ORDER = "eCtaOrder"                   # hxxjava add  
EVENT_CTA_TRADE = "eCtaTrade"                   # hxxjava add
Member
avatar
加入于:
帖子: 6
声望: 0

老师您好。
这个文件 中的 from vnpy.trader.object import OrderData 有一个 'reference' 原来的程序里面没有,运行会报错,浏览你之前的帖子也没有找到这个修改的源码,再次求助一下,谢谢
OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'

Member
avatar
加入于:
帖子: 419
声望: 170

summerkey wrote:

老师您好。
这个文件 中的 from vnpy.trader.object import OrderData 有一个 'reference' 原来的程序里面没有,运行会报错,浏览你之前的帖子也没有找到这个修改的源码,再次求助一下,谢谢
OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'

查看这个帖子:https://www.vnpy.com/forum/topic/4733-ru-he-qu-fen-ctace-lue-wei-tuo-dan-cheng-jiao-dan-shi-shu-yu-na-ge-ce-lue-de
不过要注意我这个帖子是在这次升级之前发的,其中的reference的被官方采纳了以后,它的含义已经比我原来定义的外延更广,我目前还没有升级到最新版,如果有问题自己尝试解决。

Member
avatar
加入于:
帖子: 4
声望: 0

老师厉害,按照您的方法把MACD幅图运行起来了。感谢

Member
avatar
加入于:
帖子: 42
声望: 4

请问下楼主,怎么不让K线、成交量等图形不要顶着视图的上下边框绘制(如K线视图,最高价最低价都是顶着上下边框绘制的)?一来有些指标绘制会超过视图内所有K线的最低最高价,二来也很不美观。对Qt的图形编程完全是门外汉,看了看代码,暂时没找出修改的地方。
另外,少量K线时,每根K线显示得非常大,按缩放键也不起作用。看代码似乎最小缩放比例跟载入的K线数量有关,如果想让少量K线也能缩放到合适观看的尺寸,需要怎么修改?

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】