VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 1
声望: 0

如下图,在k线图上画文字发现结果是反向的,代码是第二张图,原因应该是画k线后整个坐标系左下角是(0, 0),但是qt画文字时认为左上角的坐标时(0,0)导致的
道理我都懂,但是怎么能正常显示文字呀,有没有大佬知道,求指点

description

description

Member
avatar
加入于:
帖子: 2
声望: 0

description
main.py:
from vnpy.trader.database import get_database
from datetime import datetime
import pyqtgraph as pg

from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from vnpy.trader.constant import Exchange, Interval
from vnpy.chart import ChartWidget, VolumeItem, CandleItem

class NewChartWidget(ChartWidget):
""""""
MIN_BAR_COUNT = 1000

def __init__(self, parent: QtWidgets.QWidget = None):
    """"""
    super().__init__(parent)

    self.last_price_line: pg.InfiniteLine = None


if name == "main":
app = create_qapp()

symbol = "bu2503"
exchange = Exchange.SHFE
interval = Interval.MINUTE
start = datetime(2024, 1, 19)
end = datetime(2025, 1, 23)

dynamic = False  # 是否动态演示
n = 500       # 缓冲K线根数#10000

bars = get_database().load_bar_data(
    # self=None,
    symbol=symbol,
    exchange=exchange,
    interval=interval,
    start=start,
    end=end
)

widget = NewChartWidget()
widget.setWindowTitle(f"K线图表——{symbol}.{exchange.value},{interval},{start}-{end}")
widget.add_plot("candle", hide_x_axis=True)
widget.add_plot("volume", maximum_height=150)
widget.add_item(CandleItem, "candle", "candle")
widget.add_item(VolumeItem, "volume", "volume")
from vnpy_chartwizard.ui.chan_chart_wizard import CLItem
widget.add_item(CLItem, "clitem", "candle")
widget.add_cursor()

if dynamic:
    history = bars[:n]      # 先取得最早的n根bar作为历史
    new_data = bars[n:]     # 其它留着演示
else:
    history = bars[-n:]     # 先取得最新的n根bar作为历史
    new_data = []           # 演示的为空
widget.update_history(history)
def update_bar():
    if new_data:
        bar = new_data.pop(0)
        widget.update_bar(bar)

timer = QtCore.QTimer()
timer.timeout.connect(update_bar)
if dynamic:
    timer.start(100)

widget.show()
app.exec()

代码:chan_chart_wizard.py

from zoneinfo import ZoneInfo
import pytz
from PySide6.QtCore import QPointF, Qt
from tzlocal import get_localzone_name

from chan_py.ChanConfig import CChanConfig
from chan_py.KLine.KLine_Unit import CKLine_Unit
from chan_py.Common.CTime import CTime
from vnpy.chart import ChartWidget, CandleItem, VolumeItem

from vnpy.chart.item import LineItem, MarkerItem

from vnpy.event import EventEngine, Event
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtWidgets, QtCore, QtGui
from vnpy.trader.object import BarData, TickData, ContractData
from vnpy.trader.utility import BarGenerator
from vnpy.trader.constant import Interval
from chan_py.Chan import CChan
from chan_py.Plot.PlotDriver_origin import CPlotDriver as CPlotlyDriver_origin

from vnpy.chart import ChartWidget, CandleItem, VolumeItem

import pyqtgraph as pg
from vnpy_chartwizard.ui.widget import ChartWizardWidget
from chan_py.Common.CEnum import AUTYPE, BSP_TYPE, DATA_SRC, FX_TYPE, KL_TYPE, DATA_FIELD, KLINE_DIR
from chan_py.Plot.PlotMeta import CChanPlotMeta
from chan_py.Common.ChanException import CChanException, ErrCode
from typing import Dict, List, Literal, Optional, Tuple, Union

import datetime
from datetime import datetime, timedelta, timezone
from dateutil.parser import parse
from vnpy.trader.object import ContractData, TickData, BarData, SubscribeRequest
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager
from vnpy.chart.base import BAR_WIDTH, NORMAL_FONT

class CPlotlyDriver:
def init(self, chan: CChan, plot_config: Union[str, dict, list] = '', plot_para=None):
if plot_para is None:
plot_para = {}
self.figure_config: dict = plot_para.get('figure', {})

    self.plot_config = self.parse_plot_config(plot_config, chan.lv_list)
    self.plot_metas = self.GetPlotMeta(chan, self.figure_config)
    self.lv_lst = chan.lv_list[:len(self.plot_metas)]
    # self.chan = chan

def GetPlotMeta(self, chan: CChan, figure_config) -> List[CChanPlotMeta]:
    plot_metas = [CChanPlotMeta(chan[kl_type]) for kl_type in chan.lv_list]
    if figure_config.get("only_top_lv", False):
        plot_metas = [plot_metas[0]]
    return plot_metas

def reformat_plot_config(self, plot_config: Dict[str, bool]):
    """
    兼容不填写`plot_`前缀的情况
    """

    def _format(s):
        return s if s.startswith("plot_") else f"plot_{s}"

    return {_format(k): v for k, v in plot_config.items()}

def parse_single_lv_plot_config(self, plot_config: Union[str, dict, list]) -> Dict[str, bool]:
    """
    返回单一级别的plot_config配置
    """
    if isinstance(plot_config, dict):
        return self.reformat_plot_config(plot_config)
    elif isinstance(plot_config, str):
        return self.reformat_plot_config(dict([(k.strip().lower(), True) for k in plot_config.split(",")]))
    elif isinstance(plot_config, list):
        return self.reformat_plot_config(dict([(k.strip().lower(), True) for k in plot_config]))
    else:
        raise CChanException("plot_config only support list/str/dict", ErrCode.PLOT_ERR)

def parse_plot_config(self, plot_config: Union[str, dict, list], lv_list: List[KL_TYPE]) -> Dict[
    KL_TYPE, Dict[str, bool]]:
    """
    支持:
        - 传入字典
        - 传入字符串,逗号分割
        - 传入数组,元素为各个需要画的笔的元素
        - 传入key为各个级别的字典
        - 传入key为各个级别的字符串
        - 传入key为各个级别的数组
    """
    if isinstance(plot_config, dict):
        if all(isinstance(_key, str) for _key in plot_config.keys()):  # 单层字典
            return {lv: self.parse_single_lv_plot_config(plot_config) for lv in lv_list}
        elif all(isinstance(_key, KL_TYPE) for _key in plot_config.keys()):  # key为KL_TYPE
            for lv in lv_list:
                assert lv in plot_config
            return {lv: self.parse_single_lv_plot_config(plot_config[lv]) for lv in lv_list}
        else:
            raise CChanException("plot_config if is dict, key must be str/KL_TYPE", ErrCode.PLOT_ERR)
    return {lv: self.parse_single_lv_plot_config(plot_config) for lv in lv_list}


class CLItem(CandleItem, pg.ScatterPlotItem):
def init(self, manager: BarManager):
"""缠论笔"""
super().init(manager)
self.klc_list = None
self.bi_list = None
self.duan_list = None
self.zs_list = None
self.sell_buy_point_list = None
self.cchan: CChan = None
self.pictures_bar_range = None
self.config = CChanConfig({
"bi_strict": True,
"trigger_step": False,
"skip_step": 0,
"divergence_rate": float("inf"),
"bsp2_follow_1": False,
"bsp3_follow_1": False,
"min_zs_cnt": 1,
"bs1_peak": False,
"macd_algo": "peak",
"bs_type": '1,2,3a,1p,2s,3b',
"print_warning": True,
"zs_algo": "normal",
"cal_rsi": False,
"cal_kdj": False,
})
self.plot_config = {
"plot_kline": True,
"plot_kline_combine": True,
"plot_bi": False,
"plot_seg": False,
"plot_eigen": False,
"plot_zs": False,
"plot_segzs": False,
"plot_macd": False,
"plot_mean": False,
"plot_channel": False,
"plot_boll": False,
"plot_bsp": False,
"plot_extrainfo": False,
"plot_demark": False,
"plot_marker": False,
"plot_rsi": False,
"plot_kdj": False,
}

    self.plot_para = {
        "seg": {
            # "plot_trendline": True,
        },
        "bi": {
            # "show_num": True,
            # "disp_end": True,
        },
        "figure": {
            "x_range": 800,
        },
        "marker": {
            # "markers": {  # text, position, color
            #     '2023/06/01': ('marker here', 'up', 'red'),
            #     '2023/06/08': ('marker here', 'down')
            # },
        }
    }
    self.start = None
    self.end = None
    self.lv_list = None
    self.lv_map = {
        Interval.MINUTE: KL_TYPE.K_1M,
        Interval.HOUR: KL_TYPE.K_60M,
        Interval.DAILY: KL_TYPE.K_DAY,
        Interval.WEEKLY: KL_TYPE.K_WEEK,
        Interval.MONTHLY: KL_TYPE.K_MON,
    }

def get_info_text(self, ix: int) -> str:
    return ''


def _draw_item_picture(self, min_ix: int, max_ix: int) -> None:
    self._item_picuture = QtGui.QPicture()
    # get all data in bars
    all_bars = self._manager.get_all_bars()
    if all_bars is None or len(all_bars) < 2:
        return
    if self.pictures_bar_range:
        if self.pictures_bar_range[1].datetime > all_bars[-1].datetime:
            return
        else:
            self.pictures_bar_range[1] = all_bars[-1]
    self._draw_picture(all_bars)

def _draw_picture(self, all_bars: List[BarData]):
    self.klc_list, self.bi_list, self.duan_list, self.zs_list, self.sell_buy_point_list = self.get_chanlun_data(
        all_bars)
    painter: QtGui.QPainter = QtGui.QPainter(self._item_picuture)
    # 清空画布
    painter.eraseRect(self._item_picuture.boundingRect())

    # Create objects
    magenta_pen = pg.mkPen(color=(255, 0, 255, 255), width=1)
    green_pen = pg.mkPen(color=(0, 255, 0, 255), width=1)
    blue_pen = pg.mkPen(color=(0, 0, 255, 255), width=1)
    for klc in self.klc_list:
        color_pen = {FX_TYPE.TOP: magenta_pen, FX_TYPE.BOTTOM: blue_pen, KLINE_DIR.UP: green_pen,
                     KLINE_DIR.DOWN: green_pen}

        type = klc[4]
        pen = color_pen[type]
        # (item.low, item.high, begin, end, item.fx,item.dir)
        start_idx = self._manager.get_index(klc[2])
        end_idx = self._manager.get_index(klc[3])
        if start_idx == end_idx:
            continue
        painter.setPen(pen)
        # 左上角,右下角
        painter.drawRect(
            QtCore.QRectF(QPointF(start_idx - BAR_WIDTH, klc[1]), QPointF(end_idx + BAR_WIDTH, klc[0])))

    for bi in self.bi_list:
        # item.get_begin_val(), item.get_end_val(), begin, end, item.is_sure, item.dir,
        start_idx = self._manager.get_index(bi[2])
        end_idx = self._manager.get_index(bi[3])
        start_price = bi[0]
        end_price = bi[1]
        if bi[4]:
            pen = pg.mkPen('b', width=2)
        else:
            pen = pg.mkPen('b', width=2, style=QtCore.Qt.DashLine)
        painter.setPen(pen)
        start_point = QtCore.QPointF(start_idx, start_price)
        end_point = QtCore.QPointF(end_idx, end_price)
        painter.drawLine(start_point, end_point)
    for duan in self.duan_list:
        # (item.start_bi.start_bi.get_begin_val(), item.end_bi.end_bi.get_end_val(), begin,
        # end, item.is_sure, item.dir)
        start_idx = self._manager.get_index(duan[2])
        end_idx = self._manager.get_index(duan[3])
        if duan[4]:
            pen = pg.mkPen('m', width=2)
        else:
            pen = pg.mkPen('m', width=2, style=QtCore.Qt.DashLine)
        painter.setPen(pen)
        start_point = QtCore.QPointF(start_idx, duan[0])
        end_point = QtCore.QPointF(end_idx, duan[1])
        painter.drawLine(start_point, end_point)

    for zs in self.zs_list:
        # (item.low, item.high, begin, end, item.is_sure)
        if zs[4]:
            pen = pg.mkPen('orange', width=2)
        else:
            pen = pg.mkPen('orange', width=2, style=QtCore.Qt.DashLine)

        start_idx = self._manager.get_index(zs[2])
        end_idx = self._manager.get_index(zs[3])
        painter.setPen(pen)
        painter.drawRect(QtCore.QRectF(QPointF(start_idx, zs[1]), QPointF(end_idx, zs[0])))
    for bsp in self.sell_buy_point_list:
        # time,y,is_buy,type
        arrow_l = 0.15
        arrow_h = 1  # 增加箭头高度
        arrow_w = 2
        offset = 2  # 添加额外偏移量
        line_length = 10  # 箭头尾部线段长度
        bs_time, y, is_buy, type = bsp
        x = self._manager.get_index(bs_time)

        color = QtGui.QColor("red") if is_buy else QtGui.QColor("green")
        painter.setPen(QtGui.QPen(color, 0.5, Qt.SolidLine))
        painter.setBrush(color)
        # 在箭头末端显示 type 值
        text_offset = 0  # 文本偏移量
        font = painter.font()
        font.setFamily("Arial")  # 强制使用标准字体
        font.setPointSize(4)  # 可选:调整字体大小
        painter.setFont(font)
        # 画箭头尾部线段
        if is_buy:
            # print(x, y, is_buy, type)
            start_y = y - offset - line_length
            end_y = y - offset
            painter.drawLine(QtCore.QPointF(x, start_y), QtCore.QPointF(x, end_y))

            # 画三角形箭头头部
            painter.drawPolygon(
                QtGui.QPolygonF([
                    QtCore.QPointF(x, y + arrow_h - offset),  # 向上箭头,减少偏移
                    QtCore.QPointF(x - arrow_w, y - arrow_h - offset),
                    QtCore.QPointF(x + arrow_w, y - arrow_h - offset)
                ])
            )
        else:
            # 画箭头尾部线段
            start_y = y + offset
            end_y = y + offset + line_length
            painter.drawLine(QtCore.QPointF(x, start_y), QtCore.QPointF(x, end_y))

            # 画三角形箭头头部
            painter.drawPolygon(
                QtGui.QPolygonF([
                    QtCore.QPointF(x, y - arrow_h + offset),  # 向下箭头,增加偏移
                    QtCore.QPointF(x - arrow_w, y + arrow_h + offset),
                    QtCore.QPointF(x + arrow_w, y + arrow_h + offset)
                ])
            )
        # painter.save()  # 保存当前绘图状态
        # painter.resetTransform()  # 重置所有变换
        if is_buy:
            start_y = y - offset - line_length
            end_y = y - offset
            text = pg.TextItem(type, color=QtGui.QColor("red"))

            painter.drawText(QtCore.QPointF(x - text_offset, start_y), type)
        else:

            painter.drawText(QtCore.QPointF(x - text_offset, y + line_length + offset), type)

    # painter.restore()
    painter.end()

def get_chanlun_data(self, bars):
    shanghai_tz = ZoneInfo('Asia/Shanghai')
    self.start = self._manager.get_datetime(0)
    self.end = None
    self.lv_list = [self.lv_map[bars[0].interval]]

    need_init = False
    data_list = None
    trigger_load_data = None
    if self.pictures_bar_range is None:
        need_init = True
        data_list = bars[:-1]
        self.pictures_bar_range = [data_list[0], data_list[-1]]
    else:
        picture_start_time = self.pictures_bar_range[0].datetime
        picture_end_time = self.pictures_bar_range[-1].datetime
        if picture_start_time > bars[0].datetime:
            need_init = True
            data_list = bars[:-1]
        elif picture_end_time < bars[-1].datetime:
            need_init = False
            start_idx = self._manager.get_index(picture_end_time)
            trigger_load_data = bars[start_idx + 1:-1]
        else:
            return self.klc_list, self.bi_list, self.duan_list, self.zs_list, self.sell_buy_point_list

    if self.cchan is None or need_init:
        self.cchan = CChan(
            code=bars[0].vt_symbol,
            begin_time=self.start,
            end_time=self.end,
            data_src='custom:vnpy_bar_api.VNPY_API',
            lv_list=self.lv_list,
            config=self.config,
            autype=AUTYPE.QFQ,
            data_list=data_list,
        )
    else:
        self.cchan.trigger_load({self.lv_list[0]: trigger_load_data})
    plot_driver = CPlotlyDriver(
        self.cchan,
        plot_config=self.plot_config,
        plot_para=self.plot_para,
    )
    # origin_plot = CPlotlyDriver_origin(
    #     self.cchan,
    #     plot_config=plot_config,
    #     plot_para=plot_para,
    # )
    # origin_plot.figure.show()
    # # plot_driver.figure.write_html("./test.png")
    # origin_plot.save2img("/tmp/label.png")

    plot_metas = plot_driver.plot_metas[0]
    klc_list = []
    bi_list = []
    duan_list = []
    zs_list = []
    sell_buy_point_list = []
    print(f'cklstart:{datetime.now()}')
    for klc_meta in plot_metas.klc_list:
        time_begin = klc_meta.klu_list[0].time
        time_end = klc_meta.klu_list[-1].time
        begin = datetime(time_begin.year, time_begin.month, time_begin.day, time_begin.hour, time_begin.minute,
                         time_begin.second, tzinfo=shanghai_tz)
        end = datetime(time_end.year, time_end.month, time_end.day, time_end.hour, time_end.minute, time_end.second,
                       tzinfo=shanghai_tz)
        low = klc_meta.low
        high = klc_meta.high
        type = klc_meta.type

        # 低值,高值,开始时间,结束时间,方向
        klc_list.append((low, high, begin, end, type))
    print(f'cklend:{datetime.now()}')
    #     # 开始值,结束值,开始klu值-x,结束klu值-x,是否确定,方向
    for bi in plot_metas.bi_list:
        begin_y = bi.begin_y
        end_y = bi.end_y
        begin_x = bi.begin_x
        begin = parse(plot_metas.datetick[begin_x]).astimezone(shanghai_tz)
        end_x = bi.end_x
        end = parse(plot_metas.datetick[end_x]).astimezone(shanghai_tz)
        bi_list.append((begin_y, end_y, begin, end, bi.is_sure, bi.dir))
    print(f'bi end:{datetime.now()}')
    #     # 开始y,结束y,开始时间,结束时间,是否确定,方向
    for duan in plot_metas.seg_list:
        begin_y = duan.begin_y
        end_y = duan.end_y
        begin_x = duan.begin_x
        begin = parse(plot_metas.datetick[begin_x]).astimezone(shanghai_tz)
        end_x = duan.end_x
        end = parse(plot_metas.datetick[end_x]).astimezone(shanghai_tz)
        duan_list.append((begin_y, end_y, begin, end, duan.is_sure, duan.dir))
    # # zs
    #     # 开始值,结束值,开始klu值-x,结束klu值-x,是否确定,方向
    for zs in plot_metas.zs_lst:
        begin_x = zs.begin
        begin_ctime = plot_metas.datetick[begin_x]
        # begin = datetime(begin_ctime.year, begin_ctime.month, begin_ctime.day, begin_ctime.hour, begin_ctime.minute,
        #                  begin_ctime.second,
        #                  tzinfo=shanghai_tz)
        begin = parse(begin_ctime).astimezone(shanghai_tz)
        end_x = zs.end
        end_ctime = plot_metas.datetick[end_x]
        end = parse(end_ctime).astimezone(shanghai_tz)
        low = zs.low
        high = zs.high
        zs_list.append((low, high, begin, end, zs.is_sure))
    print(f'zhongshu end:{datetime.now()}')
    for bs in plot_metas.bs_point_lst:
        is_buy = bs.is_buy
        type = bs.desc()
        bs_time_idx = bs.x
        bs_ctime = plot_metas.datetick[bs_time_idx]
        bs_time = parse(bs_ctime).astimezone(shanghai_tz)
        sell_buy_point_list.append((bs_time, bs.y, is_buy, type))
    return klc_list, bi_list, duan_list, zs_list, sell_buy_point_list
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】