VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 84
声望: 16

经过好几天的反复,终于完成了。所谓的复盘,就是盘后把行情从新播放一遍,如果使用tick数据,就和真实的盘面一模一样,我这里使用的是1分钟数据复盘,所以简化了很多。

代码如下:

import multiprocessing
import time
from datetime import datetime
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import database_manager
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.trader.ui import create_qapp, QtCore
from vnpy.trader.object import BarData
import os

bar: BarData

def putbardata(q_1m,q_5m,q_30m,q_4h,su):

#从数据库中读取1分钟数据,你的数据库必须有下载好的数据。
    bars = database_manager.load_bar_data(
        symbol="APEUSDT",
        exchange=Exchange.BINANCE,
        interval=Interval.MINUTE,
        start=datetime(2022, 5, 4),
        end=datetime(2025, 1, 1)
    )
    sudu = 0.055

    i = 0
    for bar in bars:
        q_1m.put(bar)
        q_5m.put(bar)
        q_30m.put(bar)
        q_4h.put(bar)
        if i > 1200:                                          #先快速播放一定数量的一分钟bar
            if not su.empty():
                sudu = int(su.get(True))
                print("速度已经设定为:", sudu)
            if i % 10 == 1 :
                os.system("pause")                  #正常播放以后,每10个一分钟bar暂停一下,按任意键继续,不需要这个功能的可以删掉。
        time.sleep(sudu)
        i = i + 1

def MINUTE_5m(q):
    app = create_qapp()
    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=180)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")
    widget.add_cursor()
    history : BarData
    history = []
    global i_5
    i_5 = 0
    global bar_

    def update_bar():
        global i_5
        global bar_
        if not q.empty():
            bar = q.get(True)
            if i_5 == 0  :
                bar_ = bar
                i_5 =  1
                history.append(bar_)
            if i_5 == 5  :
                bar_ = bar
                i_5 = 1
                history.append(bar_)
            else :
                bar_.close_price = bar.close_price
                if bar.high_price > bar_.high_price:
                    bar_.high_price = bar.high_price
                if bar.low_price < bar_.low_price:
                    bar_.low_price = bar.low_price
                bar_.volume = bar_.volume + bar.volume
                i_5 = i_5 + 1
                history[-1] = bar_                            #这一段是把一分钟数据形成5分钟数据

            widget.clear_all()
            widget.update_history(history)            #刷新图形数据

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    timer.start(50)
    widget.setWindowTitle("五分钟")            #设定五分钟窗口的标题和窗口大小以及位置
    widget.setGeometry(0, 0, 900, 550)
    widget.show()
    app.exec_()

def MINUTE_30m(q):                                     #30分钟和5分钟类似
    app = create_qapp()
    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=180)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")
    widget.add_cursor()
    history: BarData
    history = []

    global i_30
    i_30 = 0
    global bar_

    def update_bar():
        global i_30
        global bar_
        if not q.empty():
            bar = q.get(True)
            if i_30 == 0  :
                bar_ = bar
                i_30 =  1
                history.append(bar_)
            if i_30 == 30  :
                bar_ = bar
                i_30 = 1
                history.append(bar_)
            else :
                bar_.close_price = bar.close_price
                if bar.high_price > bar_.high_price:
                    bar_.high_price = bar.high_price
                if bar.low_price < bar_.low_price:
                    bar_.low_price = bar.low_price
                bar_.volume = bar_.volume + bar.volume
                i_30 = i_30 + 1
                history[-1] = bar_

            widget.clear_all()
            widget.update_history(history)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    timer.start(50)
    widget.setWindowTitle("三十分钟")
    widget.setGeometry(0, 560, 900, 550)
    widget.show()
    app.exec_()

def MINUTE_4h(q):
    app = create_qapp()
    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=180)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")
    widget.add_cursor()
    history: BarData
    history = []

    global i_4h
    i_4h = 0
    global bar_

    def update_bar():
        global i_4h
        global bar_
        if not q.empty():
            bar = q.get(True)
            if i_4h == 0  :
                bar_ = bar
                i_4h =  1
                history.append(bar_)
            if i_4h == 240  :
                bar_ = bar
                i_4h = 1
                history.append(bar_)
            else :
                bar_.close_price = bar.close_price
                if bar.high_price > bar_.high_price:
                    bar_.high_price = bar.high_price
                if bar.low_price < bar_.low_price:
                    bar_.low_price = bar.low_price
                bar_.volume = bar_.volume + bar.volume
                i_4h = i_4h + 1
                history[-1] = bar_

            widget.clear_all()
            widget.update_history(history)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    timer.start(50)
    widget.setWindowTitle("四小时")
    widget.setGeometry(860, 560, 1050, 530)

    widget.show()
    app.exec_()

def MINUTE(q):                     #一分钟的是最简单的,直接使用就好。
    app = create_qapp()
    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=180)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")
    widget.add_cursor()

    def update_bar():
        if not q.empty():
            bar = q.get(True)
            widget.update_bar(bar)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    timer.start(50)
    widget.setWindowTitle("一分钟")
    widget.setGeometry(860, 15, 1050, 550)
    widget.show()
    app.exec_()


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    q_1m = manager.Queue()
    q_5m = manager.Queue()
    q_30m = manager.Queue()
    q_4h = manager.Queue()
    su = manager.Queue()

    pw = multiprocessing.Process(target=putbardata, args=(q_1m,q_5m,q_30m,q_4h,su))
    pr_1m = multiprocessing.Process(target=MINUTE, args=(q_1m,))
    pr_5m = multiprocessing.Process(target=MINUTE_5m, args=(q_5m,))
    pr_30m = multiprocessing.Process(target=MINUTE_30m, args=(q_30m,))
    pr_4h = multiprocessing.Process(target=MINUTE_4h, args=(q_4h,))

    pw.start()
    pr_1m.start()
    pr_5m.start()
    pr_30m.start()
    pr_4h.start()

    sudu = input("请输入速度:")
    su.put(sudu)

    time.sleep(1000000)
    print('任务完成')

大概说一下原理,程序设定了5个进程,通过通道交换数据,其中一个进程发送数据,另外4个进程接受数据,接受数据的四个进程就是4个周期的窗口,把接受的一分钟数据变化成3分钟30分钟等,并用图形展示出来。
只要控制发送数据的节奏,就可以动态的把行情从新演示一遍了。

这是盘后复盘用的,可以回忆一下当天到底发生了什么。

国内期货有一个盘立方软件是可以完美复盘的,数字货币没有这个东西,tradingview有这个功能,但是每月要收费90元,而且tradingview也只能使用1分钟数据复盘。

身为程序员,当然不愿意掏钱,因为自己可以写一个。

感谢vnpy提供的ChartWidget,真的很好用。

Member
avatar
加入于:
帖子: 1991
声望: 158

感谢分享!!

Member
avatar
加入于:
帖子: 46
声望: 1

感谢,请问是在哪个文件下进行的修改呢?

Member
avatar
加入于:
帖子: 84
声望: 16

woodlandnight wrote:

感谢,请问是在哪个文件下进行的修改呢?
直接运行就好。

Member
avatar
加入于:
帖子: 33
声望: 1

感谢分享!

Member
avatar
加入于:
帖子: 50
声望: 1

感谢分享!确认一下,这是在3.0下运行的吧

Member
avatar
加入于:
帖子: 13
声望: 0

现在3.0使用好像用不了,首先没有了database_manager,我尝试修改了,都报错了,看看这样修改对不对,谢谢
import multiprocessing
import time
from datetime import datetime
from vnpy.trader.constant import Exchange, Interval

from vnpy.trader.database import database_manager

from vnpy.trader.database import BaseDatabase
from vnpy_datamanager.engine import ManagerEngine

from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.trader.ui import create_qapp, QtCore
from vnpy.trader.object import BarData
import os

bar: BarData

def putbardata(q_1m,q_5m,q_30m,q_4h,su):

从数据库中读取1分钟数据,你的数据库必须有下载好的数据。

bars = ManagerEngine.load_bar_data(
    #symbol="APEUSDT",
    **BaseDatabase ,**
    symbol="j99",
    exchange="DCE",
    #exchange=Exchange.BINANCE,
    interval=Interval.MINUTE,
    start=datetime(2019, 5, 4),
    end=datetime(2025, 1, 1)
)
sudu = 0.055
Member
avatar
加入于:
帖子: 2
声望: 0

感谢分享

Member
avatar
加入于:
帖子: 273
声望: 4

description

你好,这是基于vnpy3.5的运行结果,如上图所示报错:BaseDatabase.load_bar_data() missing 1 required positional argument: 'self ',看了一些其他人的写法load_bar_data这么使用好像也没错,还需要怎么修改一下吗?望回复,万分感激'

from vnpy.trader.object import BarData

因为vnpy3.0版本修改了数据库管理器的基类BaseDatabase,把上面数据库引用修改成了下面的这句

from vnpy.trader.database import BaseDatabase
import multiprocessing
import time
from datetime import datetime
from vnpy.trader.constant import Exchange, Interval
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.trader.ui import create_qapp, QtCore
from vnpy.trader.object import BarData
import os
from vnpy.trader.database import BaseDatabase
bar: BarData

def putbardata(q_1m,q_5m,q_30m,q_4h,su):

#从数据库中读取1分钟数据,你的数据库必须有下载好的数据。
    bars = BaseDatabase.load_bar_data(
        symbol="i888",
        exchange=Exchange.DCE,
        interval=Interval.MINUTE,
        start=datetime(2020, 1, 1),
        end=datetime(2025, 1, 1)
    )
    sudu = 0.055

    i = 0
    for bar in bars:
        q_1m.put(bar)
        q_5m.put(bar)
        q_30m.put(bar)
        q_4h.put(bar)
        if i > 1200:                                          #先快速播放一定数量的一分钟bar
            if not su.empty():
                sudu = int(su.get(True))
                print("速度已经设定为:", sudu)
            if i % 10 == 1 :
                os.system("pause")                  #正常播放以后,每10个一分钟bar暂停一下,按任意键继续,不需要这个功能的可以删掉。
        time.sleep(sudu)
        i = i + 1

def MINUTE_5m(q):
    app = create_qapp()
    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=180)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")
    widget.add_cursor()
    history : BarData
    history = []
    global i_5
    i_5 = 0
    global bar_

    def update_bar():
        global i_5
        global bar_
        if not q.empty():
            bar = q.get(True)
            if i_5 == 0  :
                bar_ = bar
                i_5 =  1
                history.append(bar_)
            if i_5 == 5  :
                bar_ = bar
                i_5 = 1
                history.append(bar_)
            else :
                bar_.close_price = bar.close_price
                if bar.high_price > bar_.high_price:
                    bar_.high_price = bar.high_price
                if bar.low_price < bar_.low_price:
                    bar_.low_price = bar.low_price
                bar_.volume = bar_.volume + bar.volume
                i_5 = i_5 + 1
                history[-1] = bar_                            #这一段是把一分钟数据形成5分钟数据

            widget.clear_all()
            widget.update_history(history)            #刷新图形数据

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    timer.start(50)
    widget.setWindowTitle("五分钟")            #设定五分钟窗口的标题和窗口大小以及位置
    widget.setGeometry(0, 0, 900, 550)
    widget.show()
    app.exec_()

def MINUTE_30m(q):                                     #30分钟和5分钟类似
    app = create_qapp()
    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=180)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")
    widget.add_cursor()
    history: BarData
    history = []

    global i_30
    i_30 = 0
    global bar_

    def update_bar():
        global i_30
        global bar_
        if not q.empty():
            bar = q.get(True)
            if i_30 == 0  :
                bar_ = bar
                i_30 =  1
                history.append(bar_)
            if i_30 == 30  :
                bar_ = bar
                i_30 = 1
                history.append(bar_)
            else :
                bar_.close_price = bar.close_price
                if bar.high_price > bar_.high_price:
                    bar_.high_price = bar.high_price
                if bar.low_price < bar_.low_price:
                    bar_.low_price = bar.low_price
                bar_.volume = bar_.volume + bar.volume
                i_30 = i_30 + 1
                history[-1] = bar_

            widget.clear_all()
            widget.update_history(history)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    timer.start(50)
    widget.setWindowTitle("三十分钟")
    widget.setGeometry(0, 560, 900, 550)
    widget.show()
    app.exec_()

def MINUTE_4h(q):
    app = create_qapp()
    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=180)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")
    widget.add_cursor()
    history: BarData
    history = []

    global i_4h
    i_4h = 0
    global bar_

    def update_bar():
        global i_4h
        global bar_
        if not q.empty():
            bar = q.get(True)
            if i_4h == 0  :
                bar_ = bar
                i_4h =  1
                history.append(bar_)
            if i_4h == 240  :
                bar_ = bar
                i_4h = 1
                history.append(bar_)
            else :
                bar_.close_price = bar.close_price
                if bar.high_price > bar_.high_price:
                    bar_.high_price = bar.high_price
                if bar.low_price < bar_.low_price:
                    bar_.low_price = bar.low_price
                bar_.volume = bar_.volume + bar.volume
                i_4h = i_4h + 1
                history[-1] = bar_

            widget.clear_all()
            widget.update_history(history)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    timer.start(50)
    widget.setWindowTitle("四小时")
    widget.setGeometry(860, 560, 1050, 530)

    widget.show()
    app.exec_()

def MINUTE(q):                     #一分钟的是最简单的,直接使用就好。
    app = create_qapp()
    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=180)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")
    widget.add_cursor()

    def update_bar():
        if not q.empty():
            bar = q.get(True)
            widget.update_bar(bar)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    timer.start(50)
    widget.setWindowTitle("一分钟")
    widget.setGeometry(860, 15, 1050, 550)
    widget.show()
    app.exec_()


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    q_1m = manager.Queue()
    q_5m = manager.Queue()
    q_30m = manager.Queue()
    q_4h = manager.Queue()
    su = manager.Queue()

    pw = multiprocessing.Process(target=putbardata, args=(q_1m,q_5m,q_30m,q_4h,su))
    pr_1m = multiprocessing.Process(target=MINUTE, args=(q_1m,))
    pr_5m = multiprocessing.Process(target=MINUTE_5m, args=(q_5m,))
    pr_30m = multiprocessing.Process(target=MINUTE_30m, args=(q_30m,))
    pr_4h = multiprocessing.Process(target=MINUTE_4h, args=(q_4h,))

    pw.start()
    pr_1m.start()
    pr_5m.start()
    pr_30m.start()
    pr_4h.start()

    sudu = input("请输入速度:")
    su.put(sudu)

    time.sleep(1000000)
    print('任务完成')
Member
avatar
加入于:
帖子: 1
声望: 0

支持一下

Member
avatar
加入于:
帖子: 13
声望: 0

description
请问一下 我在单图表运行正常,但在多线程下运行出现图上问题

Member
avatar
加入于:
帖子: 1
声望: 0

求助,从sqlite数据库读取1分钟数据,基于vnpy3.8运行结果报错怎么改,万分谢谢
from datetime import datetime
from vnpy.trader.database import BaseDatabase
from vnpy.trader.object import Interval
from vnpy.trader.constant import Exchange

barDatas=BaseDatabase.load_bar_data(
symbol='c2401',
exchange=Exchange.DCE,
interval = Interval.MINUTE,
start=datetime(2023,1,1),
end=datetime(2023,12,10)
)

TypeError: BaseDatabase.load_bar_data() missing 1 required positional argument: 'self'

Member
avatar
加入于:
帖子: 1991
声望: 158
from datetime import datetime

from vnpy.trader.database import get_database
from vnpy.trader.object import Interval
from vnpy.trader.constant import Exchange

db = get_database()

bars = db.load_bar_data(
    symbol='c2401',
    exchange=Exchange.DCE,
    interval=Interval.MINUTE,
    start=datetime(2023,1,1),
    end=datetime(2023,12,10)
)
Member
avatar
加入于:
帖子: 17
声望: 0

感谢分享

Member
加入于:
帖子: 7
声望: 2

感谢如此优秀的分享,好奇为什么不收费? 为什么不可以作为智力商品出售啊? 平台不允许? 编程技术高的朋友,有偿贡献代码,编程技术低的朋友,提供想法,相互取长补短, 共同提高!

遇见2个问题,恳请楼主或高人解答:
1, 好像30m图形,没有日内对齐, 还是我的个人问题? 30分钟, 起始点应该是晚上9点开始, 下午2点45 到3点应该为1根, 小问题,借用hxxjava 的贡献,估计可以解决。
2, 顺畅运行了一段时间, 显示了4小时6根后, 估计240*6=1440根1分钟k线后, 程序报错, 不知哪里错误,请高人指点,感觉怎么会是数据类型问题?

已解决:
if i > 1200: #先快速播放一定数量的一分钟bar,### 需要修改一下这个参数, 了解一下作者的意图,
if not su.empty():
sudu = int(su.get(True))
print("速度已经设定为:", sudu)
if i % 10 == 1 :
os.system("pause")

请输入速度:0.5
Process Process-2:
Traceback (most recent call last):
File "c:\veighna_studio\lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "c:\veighna_studio\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "c:\veighna_studio\Lib\site-packages\vnpy\usertools\mulperiod.py", line 35, in putbardata
sudu = int(su.get(True))
ValueError: invalid literal for int() with base 10: '0.5'

3, 是否有高人 能够提供 在一个窗口内, 显示 多个周期的 代码? 有需要的朋友,可以在这里响应一声, 假如有足够多的朋友需要,我们可以商议一个劳务报酬,给愿意提供帮助的代码高手!

Member
avatar
加入于:
帖子: 10
声望: 0

billwu wrote:

感谢如此优秀的分享,好奇为什么不收费? 为什么不可以作为智力商品出售啊? 平台不允许? 编程技术高的朋友,有偿贡献代码,编程技术低的朋友,提供想法,相互取长补短, 共同提高!

遇见2个问题,恳请楼主或高人解答:
1, 好像30m图形,没有日内对齐, 还是我的个人问题? 30分钟, 起始点应该是晚上9点开始, 下午2点45 到3点应该为1根, 小问题,借用hxxjava 的贡献,估计可以解决。
2, 顺畅运行了一段时间, 显示了4小时6根后, 估计240*6=1440根1分钟k线后, 程序报错, 不知哪里错误,请高人指点,感觉怎么会是数据类型问题?

已解决:
if i > 1200: #先快速播放一定数量的一分钟bar,### 需要修改一下这个参数, 了解一下作者的意图,
if not su.empty():
sudu = int(su.get(True))
print("速度已经设定为:", sudu)
if i % 10 == 1 :
os.system("pause")

请输入速度:0.5
Process Process-2:
Traceback (most recent call last):
File "c:\veighna_studio\lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "c:\veighna_studio\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "c:\veighna_studio\Lib\site-packages\vnpy\usertools\mulperiod.py", line 35, in putbardata
sudu = int(su.get(True))
ValueError: invalid literal for int() with base 10: '0.5'

3, 是否有高人 能够提供 在一个窗口内, 显示 多个周期的 代码? 有需要的朋友,可以在这里响应一声, 假如有足够多的朋友需要,我们可以商议一个劳务报酬,给愿意提供帮助的代码高手!
其实官方可以开一个这样的板块。。。提供一个付费写策略的板块,让更多程序员有积极性去学习vnpy框架,为框架共享自己的代码。

Member
avatar
加入于:
帖子: 1
声望: 0

感谢楼主,更新了一下代码,AI写的,完善了部分功能。

import multiprocessing
import time
from datetime import datetime
import numpy as np

from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import get_database
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from vnpy.trader.object import BarData
from vnpy.chart.manager import BarManager
from vnpy.chart.item import ChartItem

import pyqtgraph as pg

────────────── 占位 Item ──────────────

class EmptyItem(ChartItem):
def init(self, manager: BarManager):
super().init(manager)
self._y_range: tuple = (0, 1)
self._path = QtGui.QPainterPath()
self._path.addRect(QtCore.QRectF())

def set_y_range(self, y_min: float, y_max: float) -> None:
    self._y_range = (y_min, y_max)

def get_y_range(self, min_ix: int = None, max_ix: int = None) -> tuple:
    return self._y_range

def update_history(self, history: list) -> None:
    pass

def update_bar(self, bar: BarData) -> None:
    pass

def clear_all(self) -> None:
    pass

def get_info_text(self, ix: int) -> str:
    return ""

def boundingRect(self) -> QtCore.QRectF:
    return QtCore.QRectF()


────────────── 数据播放 ──────────────

def putbardata(q_1m, q_5m, q_30m, q_4h, su, control, paused_flag, history_count=200):
bars = get_database().load_bar_data(
symbol="000001",
exchange=Exchange.SSE,
interval=Interval.MINUTE,
start=datetime(2021, 12, 20),
end=datetime(2022, 12, 31)
)
sudu = 0.055

if len(bars) <= history_count:
    history_bars = bars
    new_bars = []
else:
    history_bars = bars[:history_count]
    new_bars = bars[history_count:]

# 发送历史数据包
for q in (q_1m, q_5m, q_30m, q_4h):
    q.put(("history", list(history_bars)))

# 逐根播放新数据
for bar in new_bars:
    # 速度调节
    if not su.empty():
        try:
            new_sudu = float(su.get(False))
            sudu = new_sudu
            print(f"速度已设定为:{sudu} 秒/根")
        except (ValueError, TypeError):
            pass

    # 检查暂停/恢复命令
    while not control.empty():
        cmd = control.get(False)
        if cmd == "pause":
            paused_flag.value = True
            print("⏸️  数据播放已暂停")
        elif cmd == "resume":
            paused_flag.value = False
            print("▶️  数据播放继续")

    # 暂停循环
    while paused_flag.value:
        if not control.empty():
            cmd = control.get(False)
            if cmd == "resume":
                paused_flag.value = False
                print("▶️  数据播放继续")
        else:
            time.sleep(0.1)

    q_1m.put(bar)
    q_5m.put(bar)
    q_30m.put(bar)
    q_4h.put(bar)

    time.sleep(sudu)

print("数据播放完毕")
for q in (q_1m, q_5m, q_30m, q_4h):
    q.put(None)


────────────── 指标计算 ──────────────

def sma(arr, period):
if len(arr) < period:
return [np.nan] len(arr)
kernel = np.ones(period) / period
return [np.nan]
(period - 1) + list(np.convolve(arr, kernel, mode='valid'))

def macd(arr, fast=12, slow=26, signal=9):
arr = np.asarray(arr, dtype=float)
ema_fast = ema(arr, fast)
ema_slow = ema(arr, slow)
dif = ema_fast - ema_slow
dea = ema(dif, signal)
bar = 2 * (dif - dea)
return dif, dea, bar

def ema(arr, period):
alpha = 2.0 / (period + 1)
res = np.empty_like(arr)
res[0] = arr[0]
for i in range(1, len(arr)):
res[i] = alpha arr[i] + (1 - alpha) res[i - 1]
return res

────────────── 增强图表控件 ──────────────

class DrawableChartWidget(ChartWidget):
def init(self):
super().init()
self.draw_mode = None
self._start_pos = None
self._rubber_band = None
self._user_shapes = []
self._indicator_items = []
self._user_items = []
self._macd_items = []
self.setFocusPolicy(QtCore.Qt.StrongFocus)
print("快捷键: A=箭头线, L=直线, R=矩形, C=清空, Esc=取消")

def keyPressEvent(self, event):
    key = event.key()
    if key == QtCore.Qt.Key_A:
        self.draw_mode = "arrow"; self.setCursor(QtCore.Qt.CrossCursor)
        print("→ 箭头线模式")
    elif key == QtCore.Qt.Key_L:
        self.draw_mode = "line"; self.setCursor(QtCore.Qt.CrossCursor)
        print("→ 直线模式")
    elif key == QtCore.Qt.Key_R:
        self.draw_mode = "rect"; self.setCursor(QtCore.Qt.CrossCursor)
        print("→ 矩形模式")
    elif key == QtCore.Qt.Key_C:
        self.clear_user_shapes(); print("已清空绘图")
    elif key == QtCore.Qt.Key_Escape:
        self.draw_mode = None; self.setCursor(QtCore.Qt.ArrowCursor)
        print("取消绘图模式")
    else:
        super().keyPressEvent(event)

def clear_user_shapes(self):
    self._user_shapes.clear()
    self._redraw_user_shapes()

def mousePressEvent(self, event):
    if self.draw_mode and event.button() == QtCore.Qt.LeftButton:
        self._start_pos = self.mapToScene(event.pos())
        self._rubber_band = None
        event.accept()
    else:
        super().mousePressEvent(event)

def mouseMoveEvent(self, event):
    if self.draw_mode and self._start_pos:
        scene = self.scene()
        scene_pos = self.mapToScene(event.pos())
        if self._rubber_band:
            scene.removeItem(self._rubber_band)
        if self.draw_mode == "arrow":
            self._rubber_band = self._make_arrow_scene(self._start_pos, scene_pos, color=QtGui.QColor(255,100,100))
        elif self.draw_mode == "line":
            self._rubber_band = scene.addLine(QtCore.QLineF(self._start_pos, scene_pos), QtGui.QPen(QtGui.QColor(100,200,255)))
        elif self.draw_mode == "rect":
            self._rubber_band = scene.addRect(QtCore.QRectF(self._start_pos, scene_pos), pen=QtGui.QPen(QtGui.QColor(100,255,100)))
        event.accept()
    else:
        super().mouseMoveEvent(event)

def mouseReleaseEvent(self, event):
    if self.draw_mode and self._start_pos and event.button() == QtCore.Qt.LeftButton:
        scene_pos = self.mapToScene(event.pos())
        if self._rubber_band:
            self.scene().removeItem(self._rubber_band)
            self._rubber_band = None
        vb = self._plots["candle"].vb
        start_data = vb.mapSceneToView(self._start_pos)
        end_data = vb.mapSceneToView(scene_pos)
        shape = {
            "type": self.draw_mode,
            "start": (start_data.x(), start_data.y()),
            "end": (end_data.x(), end_data.y()),
            "color": QtGui.QColor(255,0,0) if self.draw_mode == "arrow" else
                     QtGui.QColor(0,100,255) if self.draw_mode == "line" else QtGui.QColor(0,200,0)
        }
        self._user_shapes.append(shape)
        self._redraw_user_shapes()
        event.accept()
    else:
        super().mouseReleaseEvent(event)

def _make_arrow_scene(self, start, end, color):
    scene = self.scene()
    pen = QtGui.QPen(color, 1)
    line = scene.addLine(QtCore.QLineF(start, end), pen)
    sz = 8
    ang = np.arctan2(end.y() - start.y(), end.x() - start.x())
    p1 = end - QtCore.QPointF(sz * np.cos(ang - np.pi/6), sz * np.sin(ang - np.pi/6))
    p2 = end - QtCore.QPointF(sz * np.cos(ang + np.pi/6), sz * np.sin(ang + np.pi/6))
    head = scene.addPolygon(QtGui.QPolygonF([end, p1, p2]), pen, QtGui.QColor(color.red(), color.green(), color.blue(), 150))
    return scene.createItemGroup([line, head])

def _make_arrow_data(self, start_data, end_data, color):
    vb = self._plots["candle"].vb
    start = vb.mapViewToScene(QtCore.QPointF(*start_data))
    end = vb.mapViewToScene(QtCore.QPointF(*end_data))
    return self._make_arrow_scene(start, end, color)

def _redraw_user_shapes(self):
    scene = self.scene()
    for item in self._user_items:
        scene.removeItem(item)
    self._user_items.clear()
    if not self._user_shapes:
        return
    vb = self._plots["candle"].vb
    for shape in self._user_shapes:
        if shape["type"] == "arrow":
            item = self._make_arrow_data(shape["start"], shape["end"], shape["color"])
        elif shape["type"] == "line":
            start = vb.mapViewToScene(QtCore.QPointF(*shape["start"]))
            end = vb.mapViewToScene(QtCore.QPointF(*shape["end"]))
            item = scene.addLine(QtCore.QLineF(start, end), QtGui.QPen(shape["color"]))
        elif shape["type"] == "rect":
            start = vb.mapViewToScene(QtCore.QPointF(*shape["start"]))
            end = vb.mapViewToScene(QtCore.QPointF(*shape["end"]))
            rect = QtCore.QRectF(start, end)
            item = scene.addRect(rect, pen=QtGui.QPen(shape["color"]))
        if item:
            self._user_items.append(item)

def update_indicators(self, history):
    scene = self.scene()
    for item in self._indicator_items:
        scene.removeItem(item)
    self._indicator_items.clear()

    macd_plot = self._plots.get("macd")
    empty_item = self._items.get("empty")
    if macd_plot:
        for item in self._macd_items:
            macd_plot.removeItem(item)
        self._macd_items.clear()

    self._redraw_user_shapes()

    if len(history) < 5:
        return

    candle_plot = self._plots["candle"]
    vb = candle_plot.vb
    x_idx = list(range(len(history)))
    closes = np.array([b.close_price for b in history], dtype=float)

    sma5 = sma(closes, 5)
    sma10 = sma(closes, 10)
    sma30 = sma(closes, 30)

    def to_scene(x_idx, price):
        return vb.mapViewToScene(QtCore.QPointF(x_idx, price))

    def add_line(points, color, width=1):
        path = QtGui.QPainterPath()
        if points:
            path.moveTo(points[0])
            for p in points[1:]:
                path.lineTo(p)
        item = scene.addPath(path, QtGui.QPen(color, width))
        self._indicator_items.append(item)

    for sma_data, col in zip([sma5, sma10, sma30],
                             [QtGui.QColor(255,255,0), QtGui.QColor(255,165,0), QtGui.QColor(255,0,255)]):
        points = []
        for x, y in zip(x_idx, sma_data):
            if not np.isnan(y):
                points.append(to_scene(x, y))
        if points:
            add_line(points, col)

    if len(history) >= 26 and macd_plot:
        dif, dea, macd_bar = macd(closes)
        all_vals = np.concatenate([dif, dea, macd_bar])
        y_min = np.nanmin(all_vals)
        y_max = np.nanmax(all_vals)
        margin = (y_max - y_min) * 0.2 if y_max != y_min else 1.0
        if empty_item:
            empty_item.set_y_range(y_min - margin, y_max + margin)
            self._update_plot_limits()

        dif_curve = macd_plot.plot(x_idx, dif, pen=pg.mkPen(color=(255,255,255)))
        self._macd_items.append(dif_curve)
        dea_curve = macd_plot.plot(x_idx, dea, pen=pg.mkPen(color=(255,255,0)))
        self._macd_items.append(dea_curve)

        pos_mask = macd_bar >= 0
        neg_mask = ~pos_mask
        if pos_mask.any():
            bar_pos = pg.BarGraphItem(
                x=np.array(x_idx)[pos_mask],
                height=macd_bar[pos_mask],
                width=0.8,
                brush=pg.mkBrush(255, 0, 0)
            )
            macd_plot.addItem(bar_pos)
            self._macd_items.append(bar_pos)
        if neg_mask.any():
            bar_neg = pg.BarGraphItem(
                x=np.array(x_idx)[neg_mask],
                height=macd_bar[neg_mask],
                width=0.8,
                brush=pg.mkBrush(0, 255, 0)
            )
            macd_plot.addItem(bar_neg)
            self._macd_items.append(bar_neg)


────────────── 周期图表窗口 ──────────────

def create_period_chart(q, period_name, minutes_per_bar, control_queue, su_queue, paused_flag):
app = create_qapp()
window = QtWidgets.QWidget()
window.setWindowTitle(period_name)

# 工具栏
toolbar = QtWidgets.QHBoxLayout()
btn_pause = QtWidgets.QPushButton("⏸ 暂停")
btn_resume = QtWidgets.QPushButton("▶ 继续")
toolbar.addWidget(btn_pause)
toolbar.addWidget(btn_resume)

toolbar.addWidget(QtWidgets.QLabel("速度:"))
combo_speed = QtWidgets.QComboBox()
speeds = ["0.01", "0.05", "0.1", "0.5", "1.0"]
combo_speed.addItems(speeds)
combo_speed.setCurrentText("0.05")
toolbar.addWidget(combo_speed)
toolbar.addStretch()

widget = DrawableChartWidget()
widget.add_plot("candle", hide_x_axis=True)
widget.add_plot("volume", maximum_height=150)
widget.add_plot("macd", maximum_height=150)
widget.add_item(CandleItem, "candle", "candle")
widget.add_item(VolumeItem, "volume", "volume")
widget.add_item(EmptyItem, "empty", "macd")
widget.add_cursor()

layout = QtWidgets.QVBoxLayout()
layout.addLayout(toolbar)
layout.addWidget(widget)
window.setLayout(layout)

btn_pause.clicked.connect(lambda: control_queue.put("pause"))
btn_resume.clicked.connect(lambda: control_queue.put("resume"))
combo_speed.currentTextChanged.connect(
    lambda txt: su_queue.put(float(txt))
)

history = []
i_counter = 0
current_bar = None

def process_bar(bar):
    nonlocal i_counter, current_bar, history
    if minutes_per_bar == 1:
        history.append(bar)
        return True
    else:
        if i_counter == 0:
            current_bar = bar
            i_counter = 1
            history.append(current_bar)
            return True
        elif i_counter == minutes_per_bar:
            current_bar = bar
            i_counter = 1
            history.append(current_bar)
            return True
        else:
            current_bar.close_price = bar.close_price
            current_bar.high_price = max(current_bar.high_price, bar.high_price)
            current_bar.low_price = min(current_bar.low_price, bar.low_price)
            current_bar.volume += bar.volume
            i_counter += 1
            history[-1] = current_bar
            return False

def update_bar():
    nonlocal i_counter, current_bar, history

    # 暂停时不处理,但不清空队列,保证数据完整
    if paused_flag.value:
        return

    # 一次性处理队列中所有数据,确保多个周期窗口恢复后瞬间同步
    data_changed = False
    while not q.empty():
        item = q.get(False)
        if item is None:                     # 结束信号
            timer.stop()
            window.close()
            return

        # 历史数据包
        if isinstance(item, tuple) and len(item) == 2 and item[0] == "history":
            history_bars = item[1]
            history.clear()
            i_counter = 0
            current_bar = None
            for bar in history_bars:
                process_bar(bar)             # 批量合成
            # 视图显示全部历史数据
            widget._bar_count = max(len(history), 100)
            widget._update_x_range()
            data_changed = True
            continue

        # 正常 1 分钟 bar
        if process_bar(item):
            data_changed = True

    # 只在有数据变化时更新图表,减少不必要的刷新
    if data_changed:
        widget.clear_all()
        widget.update_history(history)
        widget.update_indicators(history)

timer = QtCore.QTimer()
timer.timeout.connect(update_bar)
timer.start(50)   # 50ms 检查一次,批量处理使恢复瞬间完成

positions = {
    "一分钟": (860, 15, 1050, 550),
    "五分钟": (0, 0, 900, 550),
    "三十分钟": (0, 560, 900, 550),
    "四小时": (860, 560, 1050, 530),
}
x, y, w, h = positions.get(period_name, (0, 0, 900, 550))
window.setGeometry(x, y, w, h)
window.show()
app.exec_()


────────────── 入口函数 ──────────────

def MINUTE(q, control, su, paused_flag):
return create_period_chart(q, "一分钟", 1, control, su, paused_flag)

def MINUTE_5m(q, control, su, paused_flag):
return create_period_chart(q, "五分钟", 5, control, su, paused_flag)

def MINUTE_30m(q, control, su, paused_flag):
return create_period_chart(q, "三十分钟", 30, control, su, paused_flag)

def MINUTE_4h(q, control, su, paused_flag):
return create_period_chart(q, "四小时", 240, control, su, paused_flag)

────────────── 主程序 ──────────────

if name == 'main':
multiprocessing.freeze_support()
manager = multiprocessing.Manager()
q_1m = manager.Queue()
q_5m = manager.Queue()
q_30m = manager.Queue()
q_4h = manager.Queue()
su = manager.Queue()
control = manager.Queue()
paused_flag = multiprocessing.Value('b', False)

# 所有窗口共享相同的历史数据量,保证时间范围一致
history_count = 1000

pw = multiprocessing.Process(target=putbardata,
                             args=(q_1m, q_5m, q_30m, q_4h, su, control, paused_flag, history_count))
pr_1m = multiprocessing.Process(target=MINUTE, args=(q_1m, control, su, paused_flag))
pr_5m = multiprocessing.Process(target=MINUTE_5m, args=(q_5m, control, su, paused_flag))
pr_30m = multiprocessing.Process(target=MINUTE_30m, args=(q_30m, control, su, paused_flag))
pr_4h = multiprocessing.Process(target=MINUTE_4h, args=(q_4h, control, su, paused_flag))

pw.start()
pr_1m.start()
pr_5m.start()
pr_30m.start()
pr_4h.start()

print("命令: pause / resume / quit(也可使用窗口按钮或下拉调速)")
while True:
    cmd = input().strip().lower()
    if cmd == "pause":
        control.put("pause")
    elif cmd == "resume":
        control.put("resume")
    elif cmd == "quit":
        break
    else:
        print("未知命令")

pw.join(); pr_1m.join(); pr_5m.join(); pr_30m.join(); pr_4h.join()
manager.shutdown()
print("所有进程已退出")
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】