使用方式参考
https://www.vnpy.com/forum/topic/4621-wei-kxian-tu-biao-tian-zhuan-jia-wa-yi-ge-wan-zheng-de-kxian-tu-biao
from vnpy_mysql.mysql_database import MysqlDatabase
from datetime import datetime
from typing import List, Tuple, Dict
import numpy as np
import pyqtgraph as pg
import talib
import copy
import logging
from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BaseData, BarData
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager
from vnpy.chart.base import NORMAL_FONT
from vnpy.usertools.chart_items import SmaItem, BollItem, MacdItem, LineItem, RsiItem
from vnpy_ctabacktester.ui.widget import ConvertBar
# 没有加filename参数时,控制台有输出日志信息,加了filename参数时,控制台不会再输出日志信息
logging.basicConfig(level=logging.DEBUG, # 打印的日志级别
filename='first_test.log', # 日志文件名
filemode='w', # w清空,a追加写入,不会清空
format='%(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s'
# 日志格式
)
# logging.debug('debug级别,最低级别,一般开发人员用来打印一些调试信息')
# logging.info('info级别,正常输出信息,一般用来打印一些正常的操作')
# logging.warning('waring级别,一般用来打印警信息')
# logging.error('error级别,一般用来打印一些错误信息')
# logging.critical('critical级别,一般用来打印一些致命的错误信息')
class CLpenItem(CandleItem):
def __init__(self, manager: BarManager):
"""缠论笔"""
super().__init__(manager)
self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=1)
self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)
self.noIncludtionList = []
self.noIncludtionIx = []
self.typingList = []
# 分型不要方向,用type:top 和type:bottom来区分
self.upHighest = 0
self.downLowest = 10000
self.hasStart = False
self.lastTypingIx = 0
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
""""""
logging.debug("----------------start----------------ix:" + str(ix))
logging.debug("self.hasStart:" + str(self.hasStart))
# Create objects
picture = QtGui.QPicture()
painter = QtGui.QPainter(picture)
painter.setPen(self.white_pen)
# 2023-1-21 09:55:52 版本5
# 破无方向局面
# 处理包含关系
if len(self.typingList) > 0:
if self.typingList[-1]["type"] == "top":
direction = "down"
else:
direction = "up"
else:
direction = ""
self.handle_bars_relationship(direction, ix, -1)
if len(self.noIncludtionList) > 2:
# 够了3根直接开始找分型,
# 找到第一个分型,然后连接0bar和这个分型
self.looking_for_typing()
# 画第一笔
if len(self.typingList) == 2 and self.typingList[-2]["type"] != self.typingList[-1]["type"] and not self.hasStart:
if self.typingList[0]["type"] == "bottom":
start_point = QtCore.QPointF(0, self._manager.get_bar(0).high_price)
elif self.typingList[0]["type"] == "top":
start_point = QtCore.QPointF(0, self._manager.get_bar(0).low_price)
endix = self.typingList[0]["ix"]
endprice = self.typingList[0]["price"]
end_point = QtCore.QPointF(endix, endprice)
if start_point and end_point:
painter.drawLine(start_point, end_point)
self.start = {"ix": endix, "price": endprice}
self.hasStart = True
if len(self.typingList) > 2 and self.typingList[-2]["type"] != self.typingList[-1]["type"]:
startix = self.typingList[0]["ix"]
startprice = self.typingList[0]["price"]
start_point = QtCore.QPointF(startix, startprice)
endix = self.typingList[1]["ix"]
endprice = self.typingList[1]["price"]
end_point = QtCore.QPointF(endix, endprice)
if start_point and end_point:
painter.drawLine(start_point, end_point)
self.start = {"ix": endix, "price": endprice}
del self.typingList[0]
painter.end()
return picture
def looking_for_typing(self) -> dict:
oneh = self.noIncludtionList[-3]["high_price"]
onel = self.noIncludtionList[-3]["low_price"]
twoh = self.noIncludtionList[-2]["high_price"]
twol = self.noIncludtionList[-2]["low_price"]
twoix = self.noIncludtionList[-2]["ix"]
threeh = self.noIncludtionList[-1]["high_price"]
threel = self.noIncludtionList[-1]["low_price"]
index = self.get_index_for_ix(twoix)
if len(self.typingList) > 0:
lastIndex = self.get_index_for_ix(self.typingList[-1]["ix"])
else:
lastIndex = 0
diff = index - lastIndex
logging.debug("typing-index:" + str(index))
logging.debug("typing-diff:" + str(diff))
# 顶分型
self.judege_top_typing(oneh, onel, twoh, twol, twoix, threeh, threel, index, diff)
# 底分型
self.judege_bottom_typing(oneh, onel, twoh, twol, twoix, threeh, threel, index, diff)
logging.debug("self.typingList:" + str(self.typingList))
def judege_top_typing(self, oneh: float, onel: float, twoh: float, twol: float, twoix: float, threeh: float, threel: float, index: int, diff: int) -> None:
# 2种情况要分开,即上一个是底分型和上一个是顶分型,处理方式是不同的
if twoh > oneh and twoh > threeh and twol > onel and twol > threel:
logging.debug("self.upHighest before change:" + str(self.upHighest))
logging.debug("twoh:" + str(twoh))
# 在这里要把最高点留存下来
if twoh > self.upHighest:
self.upHighest = twoh
logging.debug("self.upHighest after change:" + str(self.upHighest))
if diff > 3 and twoh >= self.upHighest and (len(self.typingList) == 0 or len(self.typingList) > 0 and self.typingList[-1]["type"] == "bottom"):
self.typingList.append({"price": twoh, "index": index, "diff": diff, "ix": twoix, "type": "top"})
# 添加顶分型重置底极限
self.downLowest = 10000
self.lastTypingIx = twoix
logging.info("appended a new top first and reset self.upHighest")
elif len(self.typingList) > 0 and self.typingList[-1]["type"] == "top" and twoh >= self.upHighest and twoix != self.lastTypingIx:
self.typingList.append({"price": twoh, "index": index, "diff": diff, "ix": twoix, "type": "top"})
self.downLowest = 10000
self.lastTypingIx = twoix
logging.info("appended a new top again")
if len(self.typingList) > 1:
if self.typingList[-2]["price"] > self.typingList[-1]["price"]:
del self.typingList[-1]
self.lastTypingIx = self.typingList[-1]["ix"]
logging.info("delete the last top typing")
else:
del self.typingList[-2]
logging.info("delete the last but one top typing")
def judege_bottom_typing(self, oneh: float, onel: float, twoh: float, twol: float, twoix: float, threeh: float, threel: float, index: int, diff: int) -> None:
if twol < onel and twol < threel and twoh < oneh and twoh < threeh:
logging.debug("self.downLowest before change:" + str(self.downLowest))
logging.debug("twol:" + str(twol))
if twol < self.downLowest:
self.downLowest = twol
logging.debug("self.downLowest after change:" + str(self.downLowest))
logging.debug("diff-before-judge:" + str(diff))
if len(self.typingList) > 0:
logging.debug("self.typingList[-1]:" + str(self.typingList[-1]))
if diff > 3 and twol <= self.downLowest and (len(self.typingList) == 0 or len(self.typingList) > 0 and self.typingList[-1]["type"] == "top"):
self.typingList.append({"price": twol, "index": index, "diff": diff, "ix": twoix, "type": "bottom"})
self.upHighest = 0
self.lastTypingIx = twoix
logging.info("appended a new bottom first and reset self.upHighest")
elif len(self.typingList) > 0 and self.typingList[-1]["type"] == "bottom" and twol <= self.downLowest and twoix != self.lastTypingIx:
self.typingList.append({"price": twol, "index": index, "diff": diff, "ix": twoix, "type": "bottom"})
self.upHighest = 0
self.lastTypingIx = twoix
logging.info("appended-a-new-bottom-again")
logging.debug("self.typingList-after-append:" + str(self.typingList))
if len(self.typingList) > 1:
if self.typingList[-2]["price"] < self.typingList[-1]["price"]:
del self.typingList[-1]
self.lastTypingIx = self.typingList[-1]["ix"]
logging.info("delete the last bottom typing")
else:
del self.typingList[-2]
logging.info("delete the last but one bottom typing")
def get_index_for_ix(self, ix: int) -> int:
index = 0
for x in range(len(self.noIncludtionIx)):
if ix == self.noIncludtionIx[x]:
index = x
return index
def handle_bars_relationship(self, direction: str, nix: int, index: int) -> None:
"""这个函数的作用只要处理包含关系,然后存入无包含关系列表就行了"""
if self.noIncludtionList:
lix = self.noIncludtionList[index]["ix"]
lh = self.noIncludtionList[index]["high_price"]
ll = self.noIncludtionList[index]["low_price"]
else:
lix = 0
lh = self._manager.get_bar(0).high_price
ll = self._manager.get_bar(0).low_price
n = self._manager.get_bar(nix)
nh = n.high_price
nl = n.low_price
if direction == "":
# 上一根k线包含当前K线
if nh <= lh and nl >= ll:
if nix == 0:
self.noIncludtionList.append({"ix": nix, "high_price": nh, "low_price": nl})
self.noIncludtionIx.append(nix)
pass
# 当前K线包含上一根K线
# 这里必须递归
elif nh >= lh and nl <= ll:
# 删除前先处理最后一个的ix是否在分型列表里
for x in range(len(self.typingList)):
if self.noIncludtionList[-1]["ix"] == self.typingList[x]["ix"]:
del self.typingList[x]
if self.noIncludtionList:
self.noIncludtionList.pop()
if self.noIncludtionIx:
self.noIncludtionIx.pop()
self.handle_bars_relationship(direction, nix, index)
else:
self.noIncludtionList.append({"ix": nix, "high_price": nh, "low_price": nl})
self.noIncludtionIx.append(nix)
if direction == "up":
# 有包含关系就得按照方向来处理包含关系
# 上一根k线包含当前K线
if nh <= lh and nl >= ll:
pass
# 当前K线包含上一根K线
elif nh >= lh and nl <= ll:
# 删除前先处理最后一个的ix是否在分型列表里
for x in range(len(self.typingList)):
if self.noIncludtionList[-1]["ix"] == self.typingList[x]["ix"] and self.typingList[x]["price"] < self.upHighest:
del self.typingList[x]
if self.noIncludtionList:
self.noIncludtionList.pop()
if self.noIncludtionIx:
self.noIncludtionIx.pop()
self.handle_bars_relationship(direction, nix, index)
else:
self.noIncludtionList.append({"ix": nix, "high_price": nh, "low_price": nl})
self.noIncludtionIx.append(nix)
elif direction == "down":
# 上一根k线包含当前K线
if nh <= lh and nl >= ll:
pass
# 当前K线包含上一根K线
elif nh >= lh and nl <= ll:
# 删除前先处理最后一个的ix是否在分型列表里
for x in range(len(self.typingList)):
if self.noIncludtionList[-1]["ix"] == self.typingList[x]["ix"] and self.typingList[x]["price"] > self.downLowest:
del self.typingList[x]
if self.noIncludtionList:
self.noIncludtionList.pop()
if self.noIncludtionIx:
self.noIncludtionIx.pop()
self.handle_bars_relationship(direction, nix, index)
else:
self.noIncludtionList.append({"ix": nix, "high_price": nh, "low_price": nl})
self.noIncludtionIx.append(nix)
if len(self.noIncludtionList) > 10:
del self.noIncludtionList[0]
if len(self.noIncludtionIx) > 100:
del self.noIncludtionIx[0]
logging.debug("self.noIncludtionList:" + str(self.noIncludtionList))
logging.debug("self.noIncludtionIx:" + str(self.noIncludtionIx))
def get_info_text(self, ix: int) -> str:
"""
Get information text to show by cursor.
"""
bar: BarData = self._manager.get_bar(ix)
if bar:
words: list = [
"ix:",
str(ix)
]
text: str = "\n".join(words)
else:
text: str = ""
return text
class NewChartWidget(ChartWidget):
""""""
MIN_BAR_COUNT = 1000
def __init__(self, parent: QtWidgets.QWidget = None):
""""""
super().__init__(parent)
self.last_price_line: pg.InfiniteLine = None
def add_last_price_line(self):
""""""
plot = list(self._plots.values())[0]
color = (255, 255, 255)
self.last_price_line = pg.InfiniteLine(
angle=0,
movable=False,
label="{value:.1f}",
pen=pg.mkPen(color, width=1),
labelOpts={
"color": color,
"position": 1,
"anchors": [(1, 1), (1, 1)]
}
)
self.last_price_line.label.setFont(NORMAL_FONT)
plot.addItem(self.last_price_line)
def update_history(self, history: List[BarData]) -> None:
"""
Update a list of bar data.
"""
self._manager.update_history(history)
for item in self._items.values():
item.update_history(history)
self._update_plot_limits()
self.move_to_right()
self.update_last_price_line(history[-1])
def update_bar(self, bar: BarData) -> None:
"""
Update single bar data.
"""
self._manager.update_bar(bar)
for item in self._items.values():
item.update_bar(bar)
self._update_plot_limits()
if self._right_ix >= (self._manager.get_count() - self._bar_count / 2):
self.move_to_right()
self.update_last_price_line(bar)
def update_last_price_line(self, bar: BarData) -> None:
""""""
if self.last_price_line:
self.last_price_line.setValue(bar.close_price)
if __name__ == "__main__":
app = create_qapp()
# bars = database_manager.load_bar_data(
# "IF888",
# Exchange.CFFEX,
# interval=Interval.MINUTE,
# start=datetime(2019, 7, 1),
# end=datetime(2019, 7, 17)
# )
symbol = "rb2305"
exchange = Exchange.SHFE
interval = Interval.MINUTE
start = datetime(2022, 10, 1)
end = datetime(2022, 11, 15)
dynamic = False # 是否动态演示
n = 10000 # 缓冲K线根数#10000
bars = MysqlDatabase.load_bar_data(
self=None,
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
widget = NewChartWidget()
widget.setWindowTitle(f"K线图表——{symbol}.{exchange.value},{interval},{start}-{end}")
widget.add_plot("candle", hide_x_axis=True)
# widget.add_plot("volume", maximum_height=150)
# widget.add_plot("rsi", maximum_height=150)
widget.add_plot("macd", maximum_height=150)
widget.add_item(CandleItem, "candle", "candle")
# widget.add_item(VolumeItem, "volume", "volume")
# widget.add_item(LineItem, "line", "candle")
# widget.add_item(SmaItem, "sma", "candle")
widget.add_item(CLpenItem, "clpen", "candle")
# widget.add_item(RsiItem, "rsi", "rsi")
widget.add_item(MacdItem, "macd", "macd")
widget.add_last_price_line()
widget.add_cursor()
if dynamic:
history = bars[:n] # 先取得最早的n根bar作为历史
new_data = bars[n:] # 其它留着演示
else:
history = bars[-n:] # 先取得最新的n根bar作为历史
new_data = [] # 演示的为空
newhistory = ConvertBar(history, 15)
widget.update_history(newhistory)
def update_bar():
if new_data:
bar = new_data.pop(0)
widget.update_bar(bar)
timer = QtCore.QTimer()
timer.timeout.connect(update_bar)
if dynamic:
timer.start(100)
widget.show()
app.exec()