如下图,在k线图上画文字发现结果是反向的,代码是第二张图,原因应该是画k线后整个坐标系左下角是(0, 0),但是qt画文字时认为左上角的坐标时(0,0)导致的
道理我都懂,但是怎么能正常显示文字呀,有没有大佬知道,求指点
如下图,在k线图上画文字发现结果是反向的,代码是第二张图,原因应该是画k线后整个坐标系左下角是(0, 0),但是qt画文字时认为左上角的坐标时(0,0)导致的
道理我都懂,但是怎么能正常显示文字呀,有没有大佬知道,求指点
main.py:
from vnpy.trader.database import get_database
from datetime import datetime
import pyqtgraph as pg
from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from vnpy.trader.constant import Exchange, Interval
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
class NewChartWidget(ChartWidget):
""""""
MIN_BAR_COUNT = 1000
def __init__(self, parent: QtWidgets.QWidget = None):
""""""
super().__init__(parent)
self.last_price_line: pg.InfiniteLine = None
if name == "main":
app = create_qapp()
symbol = "bu2503"
exchange = Exchange.SHFE
interval = Interval.MINUTE
start = datetime(2024, 1, 19)
end = datetime(2025, 1, 23)
dynamic = False # 是否动态演示
n = 500 # 缓冲K线根数#10000
bars = get_database().load_bar_data(
# self=None,
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
widget = NewChartWidget()
widget.setWindowTitle(f"K线图表——{symbol}.{exchange.value},{interval},{start}-{end}")
widget.add_plot("candle", hide_x_axis=True)
widget.add_plot("volume", maximum_height=150)
widget.add_item(CandleItem, "candle", "candle")
widget.add_item(VolumeItem, "volume", "volume")
from vnpy_chartwizard.ui.chan_chart_wizard import CLItem
widget.add_item(CLItem, "clitem", "candle")
widget.add_cursor()
if dynamic:
history = bars[:n] # 先取得最早的n根bar作为历史
new_data = bars[n:] # 其它留着演示
else:
history = bars[-n:] # 先取得最新的n根bar作为历史
new_data = [] # 演示的为空
widget.update_history(history)
def update_bar():
if new_data:
bar = new_data.pop(0)
widget.update_bar(bar)
timer = QtCore.QTimer()
timer.timeout.connect(update_bar)
if dynamic:
timer.start(100)
widget.show()
app.exec()
代码:chan_chart_wizard.py
from zoneinfo import ZoneInfo
import pytz
from PySide6.QtCore import QPointF, Qt
from tzlocal import get_localzone_name
from chan_py.ChanConfig import CChanConfig
from chan_py.KLine.KLine_Unit import CKLine_Unit
from chan_py.Common.CTime import CTime
from vnpy.chart import ChartWidget, CandleItem, VolumeItem
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtWidgets, QtCore, QtGui
from vnpy.trader.object import BarData, TickData, ContractData
from vnpy.trader.utility import BarGenerator
from vnpy.trader.constant import Interval
from chan_py.Chan import CChan
from chan_py.Plot.PlotDriver_origin import CPlotDriver as CPlotlyDriver_origin
import pyqtgraph as pg
from vnpy_chartwizard.ui.widget import ChartWizardWidget
from chan_py.Common.CEnum import AUTYPE, BSP_TYPE, DATA_SRC, FX_TYPE, KL_TYPE, DATA_FIELD, KLINE_DIR
from chan_py.Plot.PlotMeta import CChanPlotMeta
from chan_py.Common.ChanException import CChanException, ErrCode
from typing import Dict, List, Literal, Optional, Tuple, Union
import datetime
from datetime import datetime, timedelta, timezone
from dateutil.parser import parse
from vnpy.trader.object import ContractData, TickData, BarData, SubscribeRequest
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager
from vnpy.chart.base import BAR_WIDTH, NORMAL_FONT
class CPlotlyDriver:
def init(self, chan: CChan, plot_config: Union[str, dict, list] = '', plot_para=None):
if plot_para is None:
plot_para = {}
self.figure_config: dict = plot_para.get('figure', {})
self.plot_config = self.parse_plot_config(plot_config, chan.lv_list)
self.plot_metas = self.GetPlotMeta(chan, self.figure_config)
self.lv_lst = chan.lv_list[:len(self.plot_metas)]
# self.chan = chan
def GetPlotMeta(self, chan: CChan, figure_config) -> List[CChanPlotMeta]:
plot_metas = [CChanPlotMeta(chan[kl_type]) for kl_type in chan.lv_list]
if figure_config.get("only_top_lv", False):
plot_metas = [plot_metas[0]]
return plot_metas
def reformat_plot_config(self, plot_config: Dict[str, bool]):
"""
兼容不填写`plot_`前缀的情况
"""
def _format(s):
return s if s.startswith("plot_") else f"plot_{s}"
return {_format(k): v for k, v in plot_config.items()}
def parse_single_lv_plot_config(self, plot_config: Union[str, dict, list]) -> Dict[str, bool]:
"""
返回单一级别的plot_config配置
"""
if isinstance(plot_config, dict):
return self.reformat_plot_config(plot_config)
elif isinstance(plot_config, str):
return self.reformat_plot_config(dict([(k.strip().lower(), True) for k in plot_config.split(",")]))
elif isinstance(plot_config, list):
return self.reformat_plot_config(dict([(k.strip().lower(), True) for k in plot_config]))
else:
raise CChanException("plot_config only support list/str/dict", ErrCode.PLOT_ERR)
def parse_plot_config(self, plot_config: Union[str, dict, list], lv_list: List[KL_TYPE]) -> Dict[
KL_TYPE, Dict[str, bool]]:
"""
支持:
- 传入字典
- 传入字符串,逗号分割
- 传入数组,元素为各个需要画的笔的元素
- 传入key为各个级别的字典
- 传入key为各个级别的字符串
- 传入key为各个级别的数组
"""
if isinstance(plot_config, dict):
if all(isinstance(_key, str) for _key in plot_config.keys()): # 单层字典
return {lv: self.parse_single_lv_plot_config(plot_config) for lv in lv_list}
elif all(isinstance(_key, KL_TYPE) for _key in plot_config.keys()): # key为KL_TYPE
for lv in lv_list:
assert lv in plot_config
return {lv: self.parse_single_lv_plot_config(plot_config[lv]) for lv in lv_list}
else:
raise CChanException("plot_config if is dict, key must be str/KL_TYPE", ErrCode.PLOT_ERR)
return {lv: self.parse_single_lv_plot_config(plot_config) for lv in lv_list}
class CLItem(CandleItem, pg.ScatterPlotItem):
def init(self, manager: BarManager):
"""缠论笔"""
super().init(manager)
self.klc_list = None
self.bi_list = None
self.duan_list = None
self.zs_list = None
self.sell_buy_point_list = None
self.cchan: CChan = None
self.pictures_bar_range = None
self.config = CChanConfig({
"bi_strict": True,
"trigger_step": False,
"skip_step": 0,
"divergence_rate": float("inf"),
"bsp2_follow_1": False,
"bsp3_follow_1": False,
"min_zs_cnt": 1,
"bs1_peak": False,
"macd_algo": "peak",
"bs_type": '1,2,3a,1p,2s,3b',
"print_warning": True,
"zs_algo": "normal",
"cal_rsi": False,
"cal_kdj": False,
})
self.plot_config = {
"plot_kline": True,
"plot_kline_combine": True,
"plot_bi": False,
"plot_seg": False,
"plot_eigen": False,
"plot_zs": False,
"plot_segzs": False,
"plot_macd": False,
"plot_mean": False,
"plot_channel": False,
"plot_boll": False,
"plot_bsp": False,
"plot_extrainfo": False,
"plot_demark": False,
"plot_marker": False,
"plot_rsi": False,
"plot_kdj": False,
}
self.plot_para = {
"seg": {
# "plot_trendline": True,
},
"bi": {
# "show_num": True,
# "disp_end": True,
},
"figure": {
"x_range": 800,
},
"marker": {
# "markers": { # text, position, color
# '2023/06/01': ('marker here', 'up', 'red'),
# '2023/06/08': ('marker here', 'down')
# },
}
}
self.start = None
self.end = None
self.lv_list = None
self.lv_map = {
Interval.MINUTE: KL_TYPE.K_1M,
Interval.HOUR: KL_TYPE.K_60M,
Interval.DAILY: KL_TYPE.K_DAY,
Interval.WEEKLY: KL_TYPE.K_WEEK,
Interval.MONTHLY: KL_TYPE.K_MON,
}
def get_info_text(self, ix: int) -> str:
return ''
def _draw_item_picture(self, min_ix: int, max_ix: int) -> None:
self._item_picuture = QtGui.QPicture()
# get all data in bars
all_bars = self._manager.get_all_bars()
if all_bars is None or len(all_bars) < 2:
return
if self.pictures_bar_range:
if self.pictures_bar_range[1].datetime > all_bars[-1].datetime:
return
else:
self.pictures_bar_range[1] = all_bars[-1]
self._draw_picture(all_bars)
def _draw_picture(self, all_bars: List[BarData]):
self.klc_list, self.bi_list, self.duan_list, self.zs_list, self.sell_buy_point_list = self.get_chanlun_data(
all_bars)
painter: QtGui.QPainter = QtGui.QPainter(self._item_picuture)
# 清空画布
painter.eraseRect(self._item_picuture.boundingRect())
# Create objects
magenta_pen = pg.mkPen(color=(255, 0, 255, 255), width=1)
green_pen = pg.mkPen(color=(0, 255, 0, 255), width=1)
blue_pen = pg.mkPen(color=(0, 0, 255, 255), width=1)
for klc in self.klc_list:
color_pen = {FX_TYPE.TOP: magenta_pen, FX_TYPE.BOTTOM: blue_pen, KLINE_DIR.UP: green_pen,
KLINE_DIR.DOWN: green_pen}
type = klc[4]
pen = color_pen[type]
# (item.low, item.high, begin, end, item.fx,item.dir)
start_idx = self._manager.get_index(klc[2])
end_idx = self._manager.get_index(klc[3])
if start_idx == end_idx:
continue
painter.setPen(pen)
# 左上角,右下角
painter.drawRect(
QtCore.QRectF(QPointF(start_idx - BAR_WIDTH, klc[1]), QPointF(end_idx + BAR_WIDTH, klc[0])))
for bi in self.bi_list:
# item.get_begin_val(), item.get_end_val(), begin, end, item.is_sure, item.dir,
start_idx = self._manager.get_index(bi[2])
end_idx = self._manager.get_index(bi[3])
start_price = bi[0]
end_price = bi[1]
if bi[4]:
pen = pg.mkPen('b', width=2)
else:
pen = pg.mkPen('b', width=2, style=QtCore.Qt.DashLine)
painter.setPen(pen)
start_point = QtCore.QPointF(start_idx, start_price)
end_point = QtCore.QPointF(end_idx, end_price)
painter.drawLine(start_point, end_point)
for duan in self.duan_list:
# (item.start_bi.start_bi.get_begin_val(), item.end_bi.end_bi.get_end_val(), begin,
# end, item.is_sure, item.dir)
start_idx = self._manager.get_index(duan[2])
end_idx = self._manager.get_index(duan[3])
if duan[4]:
pen = pg.mkPen('m', width=2)
else:
pen = pg.mkPen('m', width=2, style=QtCore.Qt.DashLine)
painter.setPen(pen)
start_point = QtCore.QPointF(start_idx, duan[0])
end_point = QtCore.QPointF(end_idx, duan[1])
painter.drawLine(start_point, end_point)
for zs in self.zs_list:
# (item.low, item.high, begin, end, item.is_sure)
if zs[4]:
pen = pg.mkPen('orange', width=2)
else:
pen = pg.mkPen('orange', width=2, style=QtCore.Qt.DashLine)
start_idx = self._manager.get_index(zs[2])
end_idx = self._manager.get_index(zs[3])
painter.setPen(pen)
painter.drawRect(QtCore.QRectF(QPointF(start_idx, zs[1]), QPointF(end_idx, zs[0])))
for bsp in self.sell_buy_point_list:
# time,y,is_buy,type
arrow_l = 0.15
arrow_h = 1 # 增加箭头高度
arrow_w = 2
offset = 2 # 添加额外偏移量
line_length = 10 # 箭头尾部线段长度
bs_time, y, is_buy, type = bsp
x = self._manager.get_index(bs_time)
color = QtGui.QColor("red") if is_buy else QtGui.QColor("green")
painter.setPen(QtGui.QPen(color, 0.5, Qt.SolidLine))
painter.setBrush(color)
# 在箭头末端显示 type 值
text_offset = 0 # 文本偏移量
font = painter.font()
font.setFamily("Arial") # 强制使用标准字体
font.setPointSize(4) # 可选:调整字体大小
painter.setFont(font)
# 画箭头尾部线段
if is_buy:
# print(x, y, is_buy, type)
start_y = y - offset - line_length
end_y = y - offset
painter.drawLine(QtCore.QPointF(x, start_y), QtCore.QPointF(x, end_y))
# 画三角形箭头头部
painter.drawPolygon(
QtGui.QPolygonF([
QtCore.QPointF(x, y + arrow_h - offset), # 向上箭头,减少偏移
QtCore.QPointF(x - arrow_w, y - arrow_h - offset),
QtCore.QPointF(x + arrow_w, y - arrow_h - offset)
])
)
else:
# 画箭头尾部线段
start_y = y + offset
end_y = y + offset + line_length
painter.drawLine(QtCore.QPointF(x, start_y), QtCore.QPointF(x, end_y))
# 画三角形箭头头部
painter.drawPolygon(
QtGui.QPolygonF([
QtCore.QPointF(x, y - arrow_h + offset), # 向下箭头,增加偏移
QtCore.QPointF(x - arrow_w, y + arrow_h + offset),
QtCore.QPointF(x + arrow_w, y + arrow_h + offset)
])
)
# painter.save() # 保存当前绘图状态
# painter.resetTransform() # 重置所有变换
if is_buy:
start_y = y - offset - line_length
end_y = y - offset
text = pg.TextItem(type, color=QtGui.QColor("red"))
painter.drawText(QtCore.QPointF(x - text_offset, start_y), type)
else:
painter.drawText(QtCore.QPointF(x - text_offset, y + line_length + offset), type)
# painter.restore()
painter.end()
def get_chanlun_data(self, bars):
shanghai_tz = ZoneInfo('Asia/Shanghai')
self.start = self._manager.get_datetime(0)
self.end = None
self.lv_list = [self.lv_map[bars[0].interval]]
need_init = False
data_list = None
trigger_load_data = None
if self.pictures_bar_range is None:
need_init = True
data_list = bars[:-1]
self.pictures_bar_range = [data_list[0], data_list[-1]]
else:
picture_start_time = self.pictures_bar_range[0].datetime
picture_end_time = self.pictures_bar_range[-1].datetime
if picture_start_time > bars[0].datetime:
need_init = True
data_list = bars[:-1]
elif picture_end_time < bars[-1].datetime:
need_init = False
start_idx = self._manager.get_index(picture_end_time)
trigger_load_data = bars[start_idx + 1:-1]
else:
return self.klc_list, self.bi_list, self.duan_list, self.zs_list, self.sell_buy_point_list
if self.cchan is None or need_init:
self.cchan = CChan(
code=bars[0].vt_symbol,
begin_time=self.start,
end_time=self.end,
data_src='custom:vnpy_bar_api.VNPY_API',
lv_list=self.lv_list,
config=self.config,
autype=AUTYPE.QFQ,
data_list=data_list,
)
else:
self.cchan.trigger_load({self.lv_list[0]: trigger_load_data})
plot_driver = CPlotlyDriver(
self.cchan,
plot_config=self.plot_config,
plot_para=self.plot_para,
)
# origin_plot = CPlotlyDriver_origin(
# self.cchan,
# plot_config=plot_config,
# plot_para=plot_para,
# )
# origin_plot.figure.show()
# # plot_driver.figure.write_html("./test.png")
# origin_plot.save2img("/tmp/label.png")
plot_metas = plot_driver.plot_metas[0]
klc_list = []
bi_list = []
duan_list = []
zs_list = []
sell_buy_point_list = []
print(f'cklstart:{datetime.now()}')
for klc_meta in plot_metas.klc_list:
time_begin = klc_meta.klu_list[0].time
time_end = klc_meta.klu_list[-1].time
begin = datetime(time_begin.year, time_begin.month, time_begin.day, time_begin.hour, time_begin.minute,
time_begin.second, tzinfo=shanghai_tz)
end = datetime(time_end.year, time_end.month, time_end.day, time_end.hour, time_end.minute, time_end.second,
tzinfo=shanghai_tz)
low = klc_meta.low
high = klc_meta.high
type = klc_meta.type
# 低值,高值,开始时间,结束时间,方向
klc_list.append((low, high, begin, end, type))
print(f'cklend:{datetime.now()}')
# # 开始值,结束值,开始klu值-x,结束klu值-x,是否确定,方向
for bi in plot_metas.bi_list:
begin_y = bi.begin_y
end_y = bi.end_y
begin_x = bi.begin_x
begin = parse(plot_metas.datetick[begin_x]).astimezone(shanghai_tz)
end_x = bi.end_x
end = parse(plot_metas.datetick[end_x]).astimezone(shanghai_tz)
bi_list.append((begin_y, end_y, begin, end, bi.is_sure, bi.dir))
print(f'bi end:{datetime.now()}')
# # 开始y,结束y,开始时间,结束时间,是否确定,方向
for duan in plot_metas.seg_list:
begin_y = duan.begin_y
end_y = duan.end_y
begin_x = duan.begin_x
begin = parse(plot_metas.datetick[begin_x]).astimezone(shanghai_tz)
end_x = duan.end_x
end = parse(plot_metas.datetick[end_x]).astimezone(shanghai_tz)
duan_list.append((begin_y, end_y, begin, end, duan.is_sure, duan.dir))
# # zs
# # 开始值,结束值,开始klu值-x,结束klu值-x,是否确定,方向
for zs in plot_metas.zs_lst:
begin_x = zs.begin
begin_ctime = plot_metas.datetick[begin_x]
# begin = datetime(begin_ctime.year, begin_ctime.month, begin_ctime.day, begin_ctime.hour, begin_ctime.minute,
# begin_ctime.second,
# tzinfo=shanghai_tz)
begin = parse(begin_ctime).astimezone(shanghai_tz)
end_x = zs.end
end_ctime = plot_metas.datetick[end_x]
end = parse(end_ctime).astimezone(shanghai_tz)
low = zs.low
high = zs.high
zs_list.append((low, high, begin, end, zs.is_sure))
print(f'zhongshu end:{datetime.now()}')
for bs in plot_metas.bs_point_lst:
is_buy = bs.is_buy
type = bs.desc()
bs_time_idx = bs.x
bs_ctime = plot_metas.datetick[bs_time_idx]
bs_time = parse(bs_ctime).astimezone(shanghai_tz)
sell_buy_point_list.append((bs_time, bs.y, is_buy, type))
return klc_list, bi_list, duan_list, zs_list, sell_buy_point_list