VeighNa量化社区
你的开源社区量化交易平台

置顶主题

米筐答疑版块上线,欢迎选择 RQData 数据服务

熟悉我们的老用户可能知道,米筐自 2017 年开始与 VN.PY 形成战略合作伙伴关系。米筐的“数据”和“策略”解决方案 VN.PY 的“交易”服务互补,两年多来,我们已经联合为多位用户提供了高性价比的量化投研与交易服务。感兴趣的新朋友可以回顾我们的老帖子《米筐科技和 VN.PY 的战略合作重磅升级》

为了给各位用户带来更好的产品体验,RQData 相关的答疑功能来了。在这个版块中,我们将提供更好的售前答疑和售后服务,帮助您解决从数据购买到产品应用的技术性问题。欢迎大家在本版块中多多交流~

若您在投研过程中有金融数据需求,米筐的 RQData 产品可为您提供对应的数据服务。RQData 是一个基于 Python 的金融数据工具包,提供丰富整齐的历史数据以及简单高效的 API 接口,免除了您进行数据搜索、清洗的烦恼。

针对期货、商品期权,我们提供了对应的方案,欢迎您点击此处进行体验和购买

更多下单疑问,您可以在本帖中进行交流。感谢大家的关注~



喜欢PyCharm的同学,请收好这份开发指南!

发布于vn.py社区公众号【vnpy-community】
 
原文作者:何若楠 | 发布时间:2023-08-02
 
社区有不少新接触VeighNa的同学咨询如何上手学习,这里推荐下官方团队推出的小鹅通线上课程:《零基础入门系列》覆盖【Python基础】【数据分析】【GUI开发】三阶段,适合有金融背景的编程小白快速建立自己的Python开发知识体系;《全实战进阶系列》则针对具体的量化策略应用,适合有开发背景的金融小白通过实践来迅速掌握量化投研能力,包括【CTA实战】【超越海龟】【期权入门】【投组策略】

 

PyCharm是由JetBrains公司推出针对Python语言的IDE,其内置一整套可以帮助用户在使用Python语言开发时提高其效率的工具。本文意在为用户提供通过PyCharm开发使用VeighNa的方案以供参考。

本中的内容基于Windows系统编写,但对于Linux和Mac系统大部分也都适用。

当前时点,VeighNa适用的Windows系统包括:

  • Windows 10/11
  • Windows Server 2019/2022

    其他版本的Windows系统安装时可能遇到各种依赖库问题,不推荐使用。

在Windows系统上使用VeighNa,推荐安装官方推出的【VeighNa Studio】Python发行版,尤其是初次接触Python开发的新手用户

 

PyCharm安装

 

首先从PyCharm官网下载PyCharm Community安装包:

description

下载完成后,双击安装包则可进入PyCharm安装向导:

description

如果想对安装选项进行设置,可以在PyCharm Community Edition Setup页面对相关选项进行勾选:

description

安装完成后,会跳转到安装成功页面:

description

如果前面勾选了Create Desktop Shortcut选项来创建桌面快捷方式的话,此时桌面上会出现PyCharm的图标,双击图标即可运行PyCharm。

 

VeighNa开发

 

创建项目

启动PyCharm之后,在弹出的欢迎界面中点击【New Project】创建新项目,如下图所示:

description

在弹出的新项目窗口中,首先需要选择存放项目的文件夹路径【Location】,然后勾选Python解释器选项中的【Previously configured interpreter】选项(即当前系统中已经安装的Python环境):

description

点击右侧Add Interpreter下拉框中的【Add Local Interpreter】,在弹出的对话框中点击左侧的【System Interpreter】标签,并在右侧出现的下拉框中选择VeighNa Studio自带Python解释器所在的路径:

description

点击底部的【OK】按钮保存解释器配置,回到新项目窗口中,点击右下方的【Create】按钮来完成新项目的创建:

description

创建成功的项目窗口如下图所示:

description

此时点击左上方的【External Libraries】,即可看到项目中可以调用的外部库:

description

点击site_packages文件夹,往下滚动就能找到VeighNa Studio中的vnpy核心框架包以及vnpy_前缀的插件模块包。此时可以通过点击对应图标来查看每个包中的文件源码,如下图所示:

description

把鼠标光标移到代码上方,会自动弹出对应代码的文档信息:

description

若按住Ctrl键的同时用鼠标左键点击代码,则会跳转到代码的声明部分:

description

点击窗口右下角的【Python 3.10】按钮,会弹出【Settings】项目配置窗口,可以看到当前解释器环境下安装的包名称和版本号信息。带有升级符号(向上箭头)的包,说明当前版本有更新版本,点击升级符号即可自动升级。

请注意:由于VeighNa对于部分依赖库有严格的版本要求,不建议用户手动升级安装的包,可能会出现版本冲突。

description

 

运行程序

从Github代码仓库下载VeighNa Trader启动脚本文件run.py,并将其放置于trader文件夹下,即可在窗口左侧的项目导航栏中看见run.py文件:

description

若部分代码下方可以看见绿色波浪线显示(变量名称英文词语检查),可以点击项目名称左方的主菜单按钮 -【File】-【Settings】-【Editor】-【Inspections】-【Proofreading】,取消【Typo】的勾选后点击【OK】确认。再回到主窗口,可以发现绿色波浪线已经消失:

description

点击鼠标右键,选择【Run 'run'】,即可开始运行run.py脚本:

description

此时在界面底部的终端内容输出区域中,可以看到程序运行时的打印信息:

description

与此同时,VeighNa Trader的主窗口也会自动弹出显示:

description

回到PyCharm,可以看到项目界面右上角已经有run脚本的运行记录了,后续直接点击三角形运行按钮也可运行脚本,如下图所示:

description

 

断点调试

PyCharm的断点调试功能十分强大,这里使用一个VeighNa的策略历史回测脚本来演示。

在左侧项目导航栏中点击鼠标右键,选择【New】-【File】, 在弹出的对话框中创建backtest.py

description

然后在文件中编写简单的策略回测代码(具体可参考Github仓库中的回测示例),在想要调试的地方打上断点,如下图所示:

description

点击鼠标右键选择【Debug 'backtest'】, 即可开始调试脚本:

description

此时项目界面右上角已经可以看到backtest.py的运行记录,后续也可以通过点击这里的按钮直接启动调试任务:

description

启动调试后,可以看到主界面底部的Debug窗口开始输出程序运行信息,并且程序会暂停运行在第一个断点处。左侧显示的是线程信息,右侧则是当前上下文中的变量信息:

description

点击类似播放键的【Resume Program】即可继续运行调试,直到下一个断点处再次暂停:

description

此时可以看到底部右侧监控窗口中,当前上下文中的变量发生了变化:

description

后续重复上述步骤,点击【Resume Program】直到调试结束,可以看到Debug窗口的相应输出:

description

调试完之后,点击【Rerun 'backtest'】即可重新调试:

description

在调试过程中,点击【Step Into】可以进入函数的内部查看运行时的细节状态:

description

description

点击【Step Out】则可跳出当前函数,查看外层调用栈的状态:

description

点击【Step Over】可越过子函数(子函数会执行):

description

description

点击【Stop 'backtest'】则会直接停止当前程序的运行:

description

description

 

指定程序的运行目录

在PyCharm中新建项目时,默认是在当前目录下运行程序。若需要指定程序运行的目录,可以点击项目界面右上角的【Edit】进入【Run/Debug Configurations】界面:

description

修改程序启动时的目录【Working directory】即可:

description

 

C++回调断点调试

通常情况下,PyCharm只能在Python解释器中启动的线程里进行代码断点调试。之前有部分用户反馈过尝试在C++回调函数(如CTP API接口、PySide图形库等)中打断点但无法起效的问题。针对这种情况,可以通过在代码中设置断点的方式,来实现对非Python线程(即C++线程)的断点调试。

在项目左侧导航栏中点击鼠标右键,选择【New】-【File】, 创建geteway_test.py。

在创建成功的geteway_test.py中添加一段脚本策略的代码(可参考该文件),然后按住Ctrl同时用鼠标左键点击代码中的CtpGateway,跳转至ctp_gateway.py的源码中,在想要调试的回调函数内打上断点(注意不要打在函数定义的def那一行),如下图所示:

description

回到gateway_test.py,点击鼠标右键选择【Debug 'gateway_test'】开始调试:

description

请注意,如果用load_json函数读取connect_ctp.json,请确保读取对应.vntrader文件夹的json文件中配置了CTP账户登录信息。

此时可观察到并没有进入之前设定的断点,如下图所示:

description

终止调试后,找到之前在ctp_gateway.py中设定的断点处,在回调函数内的断点之前添加以下代码:

import pydevd
pydevd.settrace(suspend=False, trace_only_current_thread=True)

请注意:

  • pydevd是PyCharm自带的调试插件,没有安装在Python解释器所在的Python环境里;
  • suspend参数设置为True之后,调试会在这一句代码运行完之后暂停,而不是停在断点处。trace_only_current_thread参数设置为True之后,调试过程中只会监控当前线程;
  • 调试结束之后不要忘记删掉这段代码。

然后再次运行调试gateway_test.py脚本:

description

此时可以看到底部的调试窗口中开始输出相关信息,同时程序暂停在了之前设置的断点处。左侧显示的是线程信息(可以看到多了一个Dummy线程显示),右侧显示的是变量信息(可以看到回调函数的入参):

description

 

对比VS Code

 

  1. 在PyCharm中,每个项目都需要对Python环境进行配置。在VS Code中,默认通过窗口右下角的Python解释器来选择全局的Python环境(针对所有打开的文件);
  2. PyCharm的Community版仅对Jupyter提供了只读支持,需要Professional版才能编辑和运行。VS Code仅需安装功能插件,就可以使用和Jupyter相关全部功能(包括读取、编辑、运行)。

 



基于迅投研的量化数据自运维下载更新

发布于veighna社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2024-01-20
 

K线历史时序数据,不管对于期货CTA、股票多因子还是期权波动率等量化策略的开发来说,都是不可或缺的原材料。在大多数Quant的日常工作流程中,固定一块内容就是对每日增量数据的下载更新。

这块数据更新工作如果通过手动的方式执行各项步骤无疑会相当繁琐(而且容易误操作),所以本文中将会分享一套基于迅投研的量化数据自运维下载更新方案。

关于迅投研数据服务的详细介绍可以参考之前的这篇VeighNa发布v3.9.0 - 迅投研数据服务!》。

 

前期准备

 

试用申请

迅投研数据服务为VeighNa社区用户提供了14天免费试用,点击以下专属链接即可申请:

https://xuntou.net/#/signup?utm_source=vnpy

目前只需要提供手机号验证即可申请,试用账号中包括了股票、指数、期货、期权、基金等数据权限。

试用申请完成后,回到首页(https://xuntou.net/)登录账号,然后点击右上角的用户中心:

description

在弹出页面中底部即可找到【接口TOKEN】,点击【复制】按钮即可快速复制token内容,该token将用于后续VeighNa Trader中的数据服务配置:

description

同时在页面左下方【行情服务】的到期日,可以看到试用权限的到期时间。

 

模块安装

VeighNa Studio的3.9.0版中已经包含了迅投研相关的功能模块。使用其他Python环境的用户可以运行下述命令安装:

pip install xtquant vnpy_xt --index=https://pypi.vnpy.com

 

账号配置

启动VeighNa Trader,点击顶部菜单栏的【配置】按钮,在弹出的对话框中修改下述字段:

  • datafeed.name:xt
  • datafeed.username:token
  • datafeed.password:之前在用户中心复制的token内容

修改完成后点击对话框底部的【确定】按钮,重启VeighNa Trader即可生效。老用户也同样可以通过修改.vntrader目录下的vt_setting.json文件来进行配置。

 

数据更新

 

下载合约信息

迅投研数据服务的客户端xtquant在工作机制上和其他数据服务有一个较大的区别,即需要先从服务器下载所需的数据到本地缓存目录的数据文件,然后才能从数据文件中查询获取所需的数据。

合约更新任务的代码本身较为简单:

def update_history_data() -> None:
    """更新历史合约信息"""
    # 在子进程中加载xtquant
    from xtquant.xtdata import download_history_data

    # 初始化数据服务
    datafeed = get_datafeed()
    datafeed.init()

    # 下载历史合约信息
    download_history_data("", "historycontract")

    print("xtquant历史合约信息下载完成")

但对于合约信息数据,xtquant模块会在当前进程中首次初始化时读取,后续即使执行了下载更新操作,在当前进程的运行过程中也不会重载,必须等到下一次进程重启后才会更新。

因此对于历史合约信息更新函数update_history_data,需要使用multiprocessing模块在子进程中运行:

# 使用子进程更新历史合约信息
    process: Process = Process(target=update_history_data)
    process.start()
    process.join()      # 等待子进程执行完成

这样后续在主进程中获取合约信息时(等同于重启了新的进程),就是已经更新后的数据。

 

获取合约信息

为了实现自运维交易所数据更新,需要能够获取每日交易所新上市的合约代码以及之前已有存续的合约代码信息,这里需要用到xtquant.xtdata模块中所提供的get_stock_list_in_sector函数:

# 查询交易所历史合约代码
    active_symbols: list[str] = get_stock_list_in_sector(sector_name)
    expire_symbols: list[str] = get_stock_list_in_sector("过期" + sector_name)

    xt_symbols: list[str] = active_symbols + expire_symbols

函数中所传入的sector_name为迅投研定义的交易市场名称信息,一些比较常用的名称包括:

  • 中金所、上期所、大商所、郑商所、能源中心
  • 上证A股、深圳A股
  • 上证期权、深圳期权

同时对于每个市场,都包含了当前处于挂牌状态的合约,以及已经到期退市的合约(市场名称前需要加上【过期】前缀),需要分开查询后拼接成为该市场目前的整体合约代码列表。

 

数据增量过滤

对于本地已有K线数据的合约,每日仅需要下载更新增量的部分,而没有必要从头开始下载(浪费运行时间和数据流量)。这里借助于VeighNa数据库组件所提供的BarOverview汇总来查询当前本地已有的数据范围:

# 获取本地已有数据汇总
    data: list[BarOverview] = database.get_bar_overview()

    overviews: dict[str, BarOverview] = {}
    for o in data:
        vt_symbol: str = f"{o.symbol}.{o.exchange.value}"
        overviews[vt_symbol] = o

在遍历整个市场合约代码列表的过程中,可以先查询该合约的到期时间,然后只有在满足下列条件时才执行更新:

  • 本地从未下载过该合约的数据(合约刚上市或者首次运行下载)
  • 合约到期时间大于当前时间(尚未到期所以还有数据更新)

条件过滤代码如下:

# 查询合约信息
        data: dict = get_instrument_detail(xt_symbol, True)

        # 获取合约到期时间
        expiry: datetime = None
        if data["ExpireDate"]:
            expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

        # 拆分迅投研代码
        symbol, xt_exchange = xt_symbol.split(".")

        # 生成本地代码
        exchange: Exchange = EXCHANGE_XT2VT[xt_exchange]
        vt_symbol: str = f"{symbol}.{exchange.value}"

        # 查询数据汇总
        overview: BarOverview = overviews.get(vt_symbol, None)

        # 如果已经到期,则跳过
        if overview and expiry and expiry < now:
            continue

 

下载数据入库

对于通过了前述过滤筛选的合约,则执行K线下载并将数据写入到数据库:

# 实现增量查询
        start: datetime = START_TIME
        if overview:
            start = overview.end

        # 执行数据查询和更新入库
        req: HistoryRequest = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            start=start,
            end=now,
            interval=interval
        )

        bars: list[BarData] = datafeed.query_bar_history(req)

        if bars:
            database.save_bar_data(bars)

            start_dt: datetime = bars[0].datetime
            end_dt: datetime = bars[-1].datetime
            msg: str = f"{vt_symbol}数据更新成功,{start_dt} - {end_dt}"
            print(msg)

以上针对每个市场的K线数据更新功能,整体封装在了update_bar_data函数中,用户可以根据需求传入所需的市场名称参数执行更新任务:

    # 这里只更新两个期货交易所的数据    update_bar_data("中金所")    update_bar_data("上期所")

 

完整代码

 

老规矩还是附上完整的程序代码:

from multiprocessing import Process
from datetime import datetime

from vnpy.trader.database import BarOverview
from vnpy.trader.datafeed import get_datafeed
from vnpy.trader.database import get_database
from vnpy.trader.object import BarData, HistoryRequest
from vnpy.trader.constant import Exchange, Interval


# 交易所映射关系
EXCHANGE_XT2VT = {
    "SH": Exchange.SSE,
    "SZ": Exchange.SZSE,
    "BJ": Exchange.BSE,
    "SF": Exchange.SHFE,
    "IF": Exchange.CFFEX,
    "INE": Exchange.INE,
    "DF": Exchange.DCE,
    "ZF": Exchange.CZCE,
    "GF": Exchange.GFEX
}

# 开始查询时间
START_TIME = datetime(2018, 1, 1)


def update_history_data() -> None:
    """更新历史合约信息"""
    # 在子进程中加载xtquant
    from xtquant.xtdata import download_history_data

    # 初始化数据服务
    datafeed = get_datafeed()
    datafeed.init()

    # 下载历史合约信息
    download_history_data("", "historycontract")

    print("xtquant历史合约信息下载完成")


def update_bar_data(
    sector_name: str,
    interval: Interval = Interval.MINUTE
) -> None:
    """更新K线数据"""
    # 在子进程中加载xtquant
    from xtquant.xtdata import (
        get_stock_list_in_sector,
        get_instrument_detail
    )

    # 初始化数据服务
    datafeed = get_datafeed()
    datafeed.init()

    # 连接数据库
    database = get_database()

    # 获取当前时间戳
    now: datetime = datetime.now()

    # 获取本地已有数据汇总
    data: list[BarOverview] = database.get_bar_overview()

    overviews: dict[str, BarOverview] = {}
    for o in data:
        vt_symbol: str = f"{o.symbol}.{o.exchange.value}"
        overviews[vt_symbol] = o

    # 查询交易所历史合约代码
    xt_symbols: list[str] = get_stock_list_in_sector(sector_name)

    # 遍历列表查询合约信息
    for xt_symbol in xt_symbols:
        # 查询合约信息
        data: dict = get_instrument_detail(xt_symbol, True)

        # 获取合约到期时间
        expiry: datetime = None
        if data["ExpireDate"]:
            expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

        # 拆分迅投研代码
        symbol, xt_exchange = xt_symbol.split(".")

        # 生成本地代码
        exchange: Exchange = EXCHANGE_XT2VT[xt_exchange]
        vt_symbol: str = f"{symbol}.{exchange.value}"

        # 查询数据汇总
        overview: BarOverview = overviews.get(vt_symbol, None)

        # 如果已经到期,则跳过
        if overview and expiry and expiry < now:
            continue

        # 实现增量查询
        start: datetime = START_TIME
        if overview:
            start = overview.end

        # 执行数据查询和更新入库
        req: HistoryRequest = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            start=start,
            end=now,
            interval=interval
        )

        bars: list[BarData] = datafeed.query_bar_history(req)

        if bars:
            database.save_bar_data(bars)

            start_dt: datetime = bars[0].datetime
            end_dt: datetime = bars[-1].datetime
            msg: str = f"{vt_symbol}数据更新成功,{start_dt} - {end_dt}"
            print(msg)


if __name__ == "__main__":
    # 使用子进程更新历史合约信息
    process: Process = Process(target=update_history_data)
    process.start()
    process.join()      # 等待子进程执行完成

    # 更新历史数据
    update_bar_data("上期所")
    update_bar_data("过期上期所")

 



在VNPY中按照CTA策略实例记录交易条目和损益

这个是一个常见需求,按照策略实例记录交易条目和损益。整体的损益可以通过第三方行情软件来分析,如果有多个CTA策略实例(策略-品种-参数组合实例),如何对每个策略实例记录交易条目和损益情况。

下面代码是在vntrader 2.6 级别的实例代码,并不完全,因为改动比较多,这个仅仅是一个思路的示例。

先简单介绍下思路,记录分为3个部分,一个是新建触发停止单对象加入触发价,首单价和成交均价信息,二个把完成的触发停止单和对应的交易单保存到数据库,第三个根据每次offset的触发停止单,平仓价格数量和开仓触发停止单的开方价格数量,统计出当前的历史平仓总收益,如果没平仓就是当前价格的开仓收益,保存到json或者数据库都可以。

首先, cta_strategy/base.py 中增加一个新的event类型 EVENT_CTA_TRIGGERED_STOPORDER,用做触发的stop order事件

EVENT_CTA_TRIGGERED_STOPORDER = 'eCtaTriggeredStopOrder'

在 cta_strategy/ engine.py 中,在方法 check_stop_order加入下面代码, 这个代码是创建一个新的对象triggered_stop_order,是一个触发的stop order, 加入已成交笔数,触发价,首单价和成交均价信息。

# Update stop order status if placed successfully
if vt_orderids:
    # Remove from relation map.
    if vt_orderids == 1:
        self.write_log(f"取消挂单,{strategy.strategy_name} {stop_order.stop_orderid}")
        self.cancel_order(strategy,stop_order.stop_orderid)
        return
    self.stop_orders.pop(stop_order.stop_orderid)
    strategy_vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
    if stop_order.stop_orderid in strategy_vt_orderids:
        strategy_vt_orderids.remove(stop_order.stop_orderid)
    # Change stop order status to cancelled and update to strategy.
    stop_order.status = StopOrderStatus.TRIGGERED
    stop_order.vt_orderids = vt_orderids
    # Billy add to triggerred_stop_order dict
    triggered_stop_order = copy(stop_order)
    # triggered_stop_order.vt_orderids = list(map(lambda name: name.split(".")[1], triggered_stop_order.vt_orderids))
    triggered_stop_order.stop_orderid = triggered_stop_order.stop_orderid + "." + stop_order.strategy_name + "." + '.'.join(
        vt_orderids)
    triggered_stop_order.datetime = copy(tick.datetime)
    triggered_stop_order.completed_volume = 0
    triggered_stop_order.average_price = 0
    triggered_stop_order.first_price = 0
    triggered_stop_order.triggered_price = tick.last_price
    for vt_orderid in vt_orderids:
        self.vt_orderid_triggered_stop_order[vt_orderid] = triggered_stop_order
    # end Billy add
    self.call_strategy_func(
        strategy, strategy.on_stop_order, stop_order
    )
    self.put_stop_order_event(stop_order)

在方法process_trade_event, 加入下面代码,这段代码是根据在triggered_stop_order中记录的真实交易单号,如果返回的交易信息属于这个triggered_stop_order,那么记录成交数量,成交价格,算出平均价格,并且把这个trade信息保存到数据。这个保存时候把 triggered_stop_order的id放到 trade里面,方便日后关联

如果搜到的真实交易单的累计笔数和挂单笔数一致,说明挂单已经完成状态,把 triggered_stop_order放入数据

# Update strategy pos before calling on_trade method
if trade.direction == Direction.LONG:
    strategy.pos += trade.volume
else:
    strategy.pos -= trade.volume
self.call_strategy_func(strategy, strategy.on_trade, trade)
# Billy added, update triggered_stop_order
cta_trade = copy(trade)
allComplete = False
if cta_trade.vt_orderid in self.vt_orderid_triggered_stop_order.keys():
    triggered_stop_order = self.vt_orderid_triggered_stop_order[cta_trade.vt_orderid]
    if triggered_stop_order.first_price == 0:
        triggered_stop_order.first_price = cta_trade.price
    triggered_stop_order.completed_volume += cta_trade.volume
    triggered_stop_order.average_price += cta_trade.volume * cta_trade.price
    # change orderid to stop_orderid
    cta_trade.orderid = triggered_stop_order.stop_orderid
    # current_triggered_stop_order = triggered_stop_order
    if triggered_stop_order.completed_volume == triggered_stop_order.volume:
        allComplete = True
self.call_strategy_func(strategy, strategy.on_cta_trade, cta_trade)
if allComplete:
    # Sync strategy variables to data file
    self.sync_strategy_data(strategy)
    triggered_stop_order.average_price = round(
        triggered_stop_order.average_price / triggered_stop_order.completed_volume, 2)
    self.put_cta_triggered_stoporder_event(triggered_stop_order)
    database_manager.save_triggered_stop_order_data(copy(triggered_stop_order))
    for k in triggered_stop_order.vt_orderids:
        self.vt_orderid_triggered_stop_order.pop(k)
    self.call_strategy_func(strategy, strategy.save_setting_file)
# Update GUI
self.put_strategy_event(strategy)

写入数据库方法这里不做累述,主要是通过event事件来触发数据库保存操作,这个显示下trade和 triggered_stop_order的数据结构示例

Trade的orderidd

{
    "_id" : ObjectId("628b9523134fcca9c87671ea"),
    "datetime" : ISODate("2022-05-23T22:07:31.000Z"),
    "strategy" : "oneBar",
    "symbol" : "rb2210",
    "tradeid" : "       78259",
    "exchange" : "SHFE",
    "orderid" : "STOP.4.oneBar.CTP.3_-1818434211_2",
    "direction" : "空",
    "offset" : "开",
    "price" : 4594.0,
    "volume" : 1.0,
    "date" : ISODate("2022-05-23T00:00:00.000Z")
}

对应的triggered_stop_order,

{
    "_id" : ObjectId("628b9523134fcca9c87671ec"),
    "datetime" : ISODate("2022-05-23T22:07:31.500Z"),
    "stop_orderid" : "STOP.4.oneBar.CTP.3_-1818434211_2",
    "strategy_name" : "oneBar",
    "vt_symbol" : "rb2210.SHFE",
    "direction" : "空",
    "offset" : "开",
    "price" : 4594.0,
    "volume" : 1.0,
    "lock" : false,
    "net" : false,
    "vt_orderids" : [ 
        "CTP.3_-1818434211_2"
    ],
    "status" : "已触发",
    "completed_volume" : 1.0,
    "average_price" : 4594.0,
    "first_price" : 4594.0,
    "triggered_price" : 4594.0
}

至于之后查询很多方法,可以用第三方分析软件,也可以简单做个页面。

至于损益的分析,可以 根据每次offset的触发停止单,平仓价格数量和开仓触发停止单的开方价格数量,统计出当前的历史平仓总收益,如果没平仓就是当前价格的开仓收益,保存到json或者数据库都可以。

这里要注意的是移仓操作,可能对导致开平仓交易对对上。所以在移仓也要做对应的增强。具体我后面写一下增强的移仓代码



《vn.py 3.0.0源代码深入分析》

我学Python的目的很明确,就是量化交易。从一开始就有关注vn.py,但我学的是Python3,那时vn.py还处于版本1.x时期,所以只能望vn.py兴叹。
vn.py 2.0出来之后我并没有及时注意,等反应过来已经是2.0.7版。很兴奋,认真研究,并将心得写成《vn.py 2.0.7源代码深入分析》,分享在vn.py社区的经验分享板块。
出于对量化交易的爱好,出于对Python在量化交易中作用的认同,一定程度受vn.py强大功能的鼓舞,我与同事合写了《Python量化交易从入门到实战》一书,对vn.py的讨论是其中很重要的一部分内容。
后续又写了《vn.py 2.1.4源代码深入分析》和《vn.py 2.2.0源代码深入分析》两个文档,感谢各位老师的认可。
vn.py 3.0.0版发布于2022-03-23,这是我一直期待的一个版本,所以它刚一推出,我就立刻开始试用,并着手整理《vn.py 3.0.0源代码深入分析》。夜以继日,终于在前天完成。先发到了书籍的资源群中,接受了两天批评,现分享到此处。
写作本文档的一个主要目的是对vn.py的开源精神做出一点支持,希望本文档能够对大家学习使用vn.py有所帮助。

百度网盘链接:https://pan.baidu.com/s/1cl2MA9hNFhHlxfHM0gGe2A
提取码:s7u6



为K线图表添砖加瓦——一个完整的K线图表

1. 典型的绘图部件

保存文件:vnpy\usertools\chart_items.py
其中包含:

  • LineItem
  • RsiItem
  • SmaItem
  • MacdItem
  • TradeItem
  • OrderItem
from datetime import datetime
from typing import List, Tuple, Dict

from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from pyqtgraph import ScatterPlotItem
import pyqtgraph as pg
import numpy as np
import talib
import copy

from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager

from vnpy.trader.object import (
    BarData,
    OrderData,
    TradeData
)

from vnpy.trader.object import Direction, Exchange, Interval, Offset, Status, Product, OptionType, OrderType

from collections import OrderedDict
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")


class LineItem(CandleItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        last_bar = self._manager.get_bar(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.white_pen)

        # Draw Line
        end_point = QtCore.QPointF(ix, bar.close_price)

        if last_bar:
            start_point = QtCore.QPointF(ix - 1, last_bar.close_price)
        else:
            start_point = end_point

        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def get_info_text(self, ix: int) -> str:
        """"""
        text = ""
        bar = self._manager.get_bar(ix)
        if bar:
            text = f"Close:{bar.close_price}"
        return text

class SmaItem(CandleItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)

        self.sma_window = 10
        self.sma_data: Dict[int, float] = {}

    def get_sma_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 0

        # When initialize, calculate all rsi value
        if not self.sma_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            sma_array = talib.SMA(np.array(close_data), self.sma_window)

            for n, value in enumerate(sma_array):
                self.sma_data[n] = value

        # Return if already calcualted
        if ix in self.sma_data:
            return self.sma_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix - self.sma_window, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        sma_array = talib.SMA(np.array(close_data), self.sma_window)
        sma_value = sma_array[-1]
        self.sma_data[ix] = sma_value

        return sma_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        sma_value = self.get_sma_value(ix)
        last_sma_value = self.get_sma_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.blue_pen)

        # Draw Line
        start_point = QtCore.QPointF(ix-1, last_sma_value)
        end_point = QtCore.QPointF(ix, sma_value)
        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.sma_data:
            sma_value = self.sma_data[ix]
            text = f"SMA {sma_value:.1f}"
        else:
            text = "SMA -"

        return text

class RsiItem(ChartItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=2)

        self.rsi_window = 14
        self.rsi_data: Dict[int, float] = {}

    def get_rsi_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 50

        # When initialize, calculate all rsi value
        if not self.rsi_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            rsi_array = talib.RSI(np.array(close_data), self.rsi_window)

            for n, value in enumerate(rsi_array):
                self.rsi_data[n] = value

        # Return if already calcualted
        if ix in self.rsi_data:
            return self.rsi_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix - self.rsi_window, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        rsi_array = talib.RSI(np.array(close_data), self.rsi_window)
        rsi_value = rsi_array[-1]
        self.rsi_data[ix] = rsi_value

        return rsi_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        rsi_value = self.get_rsi_value(ix)
        last_rsi_value = self.get_rsi_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Draw RSI line
        painter.setPen(self.yellow_pen)

        if np.isnan(last_rsi_value) or np.isnan(rsi_value):
            # print(ix - 1, last_rsi_value,ix, rsi_value,)
            pass
        else:
            end_point = QtCore.QPointF(ix, rsi_value)
            start_point = QtCore.QPointF(ix - 1, last_rsi_value)
            painter.drawLine(start_point, end_point)

        # Draw oversold/overbought line
        painter.setPen(self.white_pen)

        painter.drawLine(
            QtCore.QPointF(ix, 70),
            QtCore.QPointF(ix - 1, 70),
        )

        painter.drawLine(
            QtCore.QPointF(ix, 30),
            QtCore.QPointF(ix - 1, 30),
        )

        # Finish
        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        # min_price, max_price = self._manager.get_price_range()
        rect = QtCore.QRectF(
            0,
            0,
            len(self._bar_picutures),
            100
        )
        return rect

    def get_y_range( self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """  """
        return 0, 100

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.rsi_data:
            rsi_value = self.rsi_data[ix]
            text = f"RSI {rsi_value:.1f}"
            # print(text)
        else:
            text = "RSI -"

        return text


def to_int(value: float) -> int:
    """"""
    return int(round(value, 0))

""" 将y方向的显示范围扩大到1.1 """
def adjust_range(in_range:Tuple[float, float])->Tuple[float, float]:
    ret_range:Tuple[float, float]
    diff = abs(in_range[0] - in_range[1])
    ret_range = (in_range[0]-diff*0.05,in_range[1]+diff*0.05)
    return ret_range

class MacdItem(ChartItem):
    """"""
    _values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

    last_range:Tuple[int, int] = (-1,-1)    # 最新显示K线索引范围

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)

        self.short_window = 12
        self.long_window = 26
        self.M = 9

        self.macd_data: Dict[int, Tuple[float,float,float]] = {}

    def get_macd_value(self, ix: int) -> Tuple[float,float,float]:
        """"""
        if ix < 0:
            return (0.0,0.0,0.0)

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_data), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = (diffs[n],deas[n],macds[n])

        # Return if already calcualted
        if ix in self.macd_data:
            return self.macd_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix-self.long_window-self.M+1, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        diffs,deas,macds = talib.MACD(np.array(close_data), 
                                            fastperiod=self.short_window, 
                                            slowperiod=self.long_window, 
                                            signalperiod=self.M) 
        diff,dea,macd = diffs[-1],deas[-1],macds[-1]
        self.macd_data[ix] = (diff,dea,macd)

        return (diff,dea,macd)

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        macd_value = self.get_macd_value(ix)
        last_macd_value = self.get_macd_value(ix - 1)

        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        #   获得3个指标在y轴方向的范围   
        #   hxxjava 修改,2020-6-29
        #   当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,

        offset = max(self.short_window,self.long_window) + self.M - 1

        if not self.macd_data or len(self.macd_data) < offset:
            return 0.0, 1.0

        # print("len of range dict:",len(self._values_ranges),",macd_data:",len(self.macd_data),(min_ix,max_ix))

        if min_ix != None:          # 调整最小K线索引
            min_ix = max(min_ix,offset)

        if max_ix != None:          # 调整最大K线索引
            max_ix = min(max_ix, len(self.macd_data)-1)

        last_range = (min_ix,max_ix)    # 请求的最新范围   

        if last_range == (None,None):   # 当显示范围不变时
            if self.last_range in self._values_ranges:  
                # 如果y方向范围已经保存
                # 读取y方向范围
                result = self._values_ranges[self.last_range]
                # print("1:",self.last_range,result)
                return adjust_range(result)
            else:
                # 如果y方向范围没有保存
                # 从macd_data重新计算y方向范围
                min_ix,max_ix = 0,len(self.macd_data)-1

                macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
                ndarray = np.array(macd_list)           
                max_price = np.nanmax(ndarray)
                min_price = np.nanmin(ndarray)

                # 保存y方向范围,同时返回结果
                result = (min_price, max_price)
                self.last_range = (min_ix,max_ix)
                self._values_ranges[self.last_range] = result
                # print("2:",self.last_range,result)
                return adjust_range(result)

        """ 以下为显示范围变化时 """

        if last_range in self._values_ranges:
            # 该范围已经保存过y方向范围
            # 取得y方向范围,返回结果
            result = self._values_ranges[last_range]
            # print("3:",last_range,result)
            return adjust_range(result)

        # 该范围没有保存过y方向范围
        # 从macd_data重新计算y方向范围
        macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
        ndarray = np.array(macd_list) 
        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)

        # 取得y方向范围,返回结果
        result = (min_price, max_price)
        self.last_range = last_range
        self._values_ranges[self.last_range] = result
        # print("4:",self.last_range,result)
        return adjust_range(result)


    def get_info_text(self, ix: int) -> str:
        # """"""
        if ix in self.macd_data:
            diff,dea,macd = self.macd_data[ix]
            words = [
                f"diff {diff:.3f}"," ",
                f"dea {dea:.3f}"," ",
                f"macd {macd:.3f}"
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nmacd -"

        return text


class TradeItem(ScatterPlotItem,CandleItem): 
    """
    成交单绘图部件
    """
    def __init__(self, manager: BarManager):
        """"""
        ScatterPlotItem.__init__(self)
        # CandleItem.__init__(self,manager)
        # super(TradeItem,self).__init__(manager)
        super(CandleItem,self).__init__(manager)


        self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)

        self.trades : Dict[int,Dict[str,TradeData]] = {} # {ix:{tradeid:trade}}

    def add_trades(self,trades:List[TradeData]):
        """ 增加成交单列表到TradeItem """
        for trade in trades:
            self.add_trade(trade)

        self.set_scatter_data()
        self.update()

    def add_trade(self,trade:TradeData,draw:bool=False):
        """ 增加一个成交单到TradeItem """
        # 这里使用reverse=True,是考虑到实盘成交往往发生在最新的bar里,可以加快搜索速度
        od = OrderedDict(sorted(self._manager._datetime_index_map.items(),key = lambda t:t[0],reverse=True))
        idx = self._manager.get_count() - 1
        for dt,ix in od.items():
            # print(f"dt={dt}\ntrade.datetime {trade.datetime}")
            dt1 = CHINA_TZ.localize(datetime.combine(dt.date(),dt.time()))
            if dt1 <= trade.datetime:
                # print(f"【dt={dt},dt1={dt1},dt2={trade.datetime} ix={ix}】")
                idx = ix
                break

        # 注意:一个bar期间可能发生多个成交单
        if idx in self.trades:
            self.trades[idx][trade.tradeid] = trade
        else:
            self.trades[idx] = {trade.tradeid:trade}

        if draw:        
            self.set_scatter_data()
            self.update()

        # print(f"add_trade idx={idx} trade={trade}")

    def set_scatter_data(self):
        """ 把成交单列表绘制到ScatterPlotItem上 """
        scatter_datas = []
        for ix in self.trades:
            for trade in self.trades[ix].values():
                scatter = {
                    "pos" : (ix, trade.price),
                    "data": 1,
                    "size": 14,
                    "pen": pg.mkPen((255, 255, 255)),
                }

                if trade.direction == Direction.LONG:
                    scatter_symbol = "t1"   # Up arrow
                else:
                    scatter_symbol = "t"    # Down arrow

                if trade.offset == Offset.OPEN:
                    scatter_brush = pg.mkBrush((255, 255, 0))   # Yellow
                else:
                    scatter_brush = pg.mkBrush((0, 0, 255))     # Blue

                scatter["symbol"] = scatter_symbol
                scatter["brush"] = scatter_brush
                scatter_datas.append(scatter)

        self.setData(scatter_datas)

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.trades:
            text = "成交:"
            for tradeid,trade in self.trades[ix].items():
                # TradeData
                text += f"\n{trade.price}{trade.direction.value}{trade.offset.value}{trade.volume}手"
        else:
            text = "成交:-"

        return text


class OrderItem(ScatterPlotItem,CandleItem): 
    """
    委托单绘图部件
    """
    def __init__(self, manager: BarManager):
        """"""
        ScatterPlotItem.__init__(self)
        super(CandleItem,self).__init__(manager)

        self.orders : Dict[int,Dict[str,Order]] = {} # {ix:{orderid:order}}

    def add_orders(self,orders:List[OrderData]):
        """ 增加委托单列表到OrderItem """
        for order in orders:
            if order.datetime:
                self.add_order(order)

        self.set_scatter_data()
        self.update()

    def add_order(self,order:OrderData,draw:bool=False):
        """ 增加一个委托单到OrderItem """
        # 这里使用reverse=True,是考虑到实盘成交往往发生在最新的bar里,可以加快搜索速度

        od = OrderedDict(sorted(self._manager._datetime_index_map.items(),key = lambda t:t[0],reverse=True))
        idx = self._manager.get_count() - 1
        for dt,ix in od.items():
            # print(f"dt={dt}\ntrade.datetime {trade.datetime}")
            dt1 = CHINA_TZ.localize(datetime.combine(dt.date(),dt.time()))
            if dt1 <= order.datetime:
                # print(f"【dt={dt},dt1={dt1},dt2={order.datetime} ix={ix}】")
                idx = ix
                break

        # 注意:一个bar期间可能发生多个委托单
        if idx in self.orders:
            self.orders[idx][order.orderid] = order
        else:
            self.orders[idx] = {order.orderid:order}

        if draw:
            self.set_scatter_data()
            self.update()

    def set_scatter_data(self):
        """ 把委托单列表绘制到ScatterPlotItem上 """
        scatter_datas = []
        for ix in self.orders:
            lowest,highest=self.get_y_range()
            # print(f"range={lowest,highest}")
            for order in self.orders[ix].values():
                # 处理委托报价超出显示范围的问题
                if order.price>highest:
                    show_price = highest - 7
                elif order.price<lowest:
                    show_price = lowest + 7
                else:
                    show_price = order.price 

                scatter = {
                    "pos" : (ix, show_price),
                    "data": 1,
                    "size": 14,
                    "pen": pg.mkPen((255, 255, 255)),
                }

                if order.direction == Direction.LONG:
                    scatter_symbol = "t1"   # Up arrow
                else:
                    scatter_symbol = "t"    # Down arrow

                if order.offset == Offset.OPEN:
                    scatter_brush = pg.mkBrush((0, 128, 128))   # Yellow
                else:
                    scatter_brush = pg.mkBrush((128, 128, 0))     # Blue

                scatter["symbol"] = scatter_symbol
                scatter["brush"] = scatter_brush
                scatter_datas.append(scatter)

        self.setData(scatter_datas)

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.orders:
            text = "委托:"
            for orderid,order in self.orders[ix].items():
                # OrderData
                text += f"\n{order.price}{order.direction.value}{order.offset.value}{order.volume}手"
        else:
            text = "委托:-"


        return text

2. 修改vnpy\chart\widget.py

为ChartWidget类增加下面的函数:

    def get_item(self,item_name:str):   # hxxjava add
        """
        Get chart item by item's name.
        """
        return self._items.get(item_name,None)

3. K线图表——各绘图部件使用方法演示

保存文件:vnpy\usertools\kx_chart.py

from datetime import datetime
from typing import List, Tuple, Dict

import numpy as np
import pyqtgraph as pg
import talib
import copy

from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from vnpy.trader.database import database_manager
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData

from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager
from vnpy.chart.base import NORMAL_FONT

from vnpy.trader.engine import MainEngine
from vnpy.event import Event, EventEngine

from vnpy.trader.event import (
    EVENT_TICK,
    EVENT_TRADE,
    EVENT_ORDER,
    EVENT_POSITION,
    EVENT_ACCOUNT,
    EVENT_LOG
)

from vnpy.app.cta_strategy.base import (   
    EVENT_CTA_TICK,     
    EVENT_CTA_BAR,      
    EVENT_CTA_ORDER,    
    EVENT_CTA_TRADE,     
    EVENT_CTA_HISTORY_BAR
)

from vnpy.trader.object import (
    Direction, 
    Exchange, 
    Interval, 
    Offset, 
    Status, 
    Product, 
    OptionType, 
    OrderType,
    OrderData,
    TradeData,
)

from vnpy.usertools.chart_items import (
    LineItem,
    RsiItem,
    SmaItem,
    MacdItem,
    TradeItem,
    OrderItem,
)


class NewChartWidget(ChartWidget):
    """ 
    基于ChartWidget的K线图表 
    """
    MIN_BAR_COUNT = 100

    signal_cta_history_bar:QtCore.pyqtSignal = QtCore.pyqtSignal(Event)
    signal_cta_tick: QtCore.pyqtSignal = QtCore.pyqtSignal(Event)
    signal_cta_bar:QtCore.pyqtSignal = QtCore.pyqtSignal(Event)

    def __init__(self, parent: QtWidgets.QWidget = None,event_engine: EventEngine = None,strategy_name:str=""):
        """ 初始化 """
        super().__init__(parent)
        self.strategy_name = strategy_name
        self.event_engine = event_engine

        # 创建K线主图及多个绘图部件
        self.add_plot("candle", hide_x_axis=True)
        self.add_item(CandleItem, "candle", "candle")
        self.add_item(LineItem, "line", "candle")
        self.add_item(SmaItem, "sma", "candle")
        self.add_item(OrderItem, "order", "candle")
        self.add_item(TradeItem, "trade", "candle")

        # 创建成交量附图及绘图部件
        self.add_plot("volume", maximum_height=150)
        self.add_item(VolumeItem, "volume", "volume")

        # 创建RSI附图及绘图部件
        self.add_plot("rsi", maximum_height=150)
        self.add_item(RsiItem, "rsi", "rsi")

        # 创建MACD附图及绘图部件
        self.add_plot("macd", maximum_height=150)
        self.add_item(MacdItem, "macd", "macd")

        # 创建最新价格线、光标
        self.add_last_price_line()
        self.add_cursor()
        self.setWindowTitle(f"K线图表——{symbol}.{exchange.value},{interval},{start}-{end}")

        # 委托单列表
        self.orders:List[str,OrderData] = {}
        # 成交单列表
        self.trades:List[str,TradeData] = {}

        # self.register_event()
        # self.event_engine.start()

    def register_event(self) -> None:
        """"""
        self.signal_cta_history_bar.connect(self.process_cta_history_bar)
        self.event_engine.register(EVENT_CTA_HISTORY_BAR, self.signal_cta_history_bar.emit)

        self.signal_cta_tick.connect(self.process_tick_event)
        self.event_engine.register(EVENT_CTA_TICK, self.signal_cta_tick.emit)

        self.signal_cta_bar.connect(self.process_cta_bar)
        self.event_engine.register(EVENT_CTA_BAR, self.signal_cta_bar.emit)

    def process_cta_history_bar(self, event:Event) -> None:
        """ 处理历史K线推送 """
        strategy_name,history_bars = event.data
        if strategy_name == self.strategy_name:
            self.update_history(history_bars)

            # print(f" {strategy_name} got an EVENT_CTA_HISTORY_BAR")

    def process_tick_event(self, event: Event) -> None:
        """ 处理tick数据推送 """
        strategy_name,tick = event.data
        if strategy_name == self.strategy_name:
            if self.last_price_line:
                self.last_price_line.setValue(tick.last_price)
            #print(f" {strategy_name} got an EVENT_CTA_TICK")

    def process_cta_bar(self, event:Event)-> None:
        """ 处理K线数据推送 """

        strategy_name,bar = event.data
        if strategy_name == self.strategy_name:
            self.update_bar(bar)
            # print(f"{strategy_name} got an EVENT_CTA_BAR")

    def add_last_price_line(self):
        """"""
        plot = list(self._plots.values())[0]
        color = (255, 255, 255)

        self.last_price_line = pg.InfiniteLine(
            angle=0,
            movable=False,
            label="{value:.1f}",
            pen=pg.mkPen(color, width=1),
            labelOpts={
                "color": color,
                "position": 1,
                "anchors": [(1, 1), (1, 1)]
            }
        )
        self.last_price_line.label.setFont(NORMAL_FONT)
        plot.addItem(self.last_price_line)

    def update_history(self, history: List[BarData]) -> None:
        """
        Update a list of bar data.
        """
        self._manager.update_history(history)

        for item in self._items.values():
            item.update_history(history)

        self._update_plot_limits()

        self.move_to_right()

        self.update_last_price_line(history[-1])

    def update_bar(self, bar: BarData) -> None:
        """
        Update single bar data.
        """
        self._manager.update_bar(bar)

        for item in self._items.values():
            item.update_bar(bar)

        self._update_plot_limits()

        if self._right_ix >= (self._manager.get_count() - self._bar_count / 2):
            self.move_to_right()

        self.update_last_price_line(bar)

    def update_last_price_line(self, bar: BarData) -> None:
        """"""
        if self.last_price_line:
            self.last_price_line.setValue(bar.close_price)

    def add_orders(self,orders:List[OrderData]) -> None:
        """ 
        增加委托单列表到委托单绘图部件 
        """
        for order in orders:
            self.orders[order.orderid] = order

        order_item : OrderItem = self.get_item('order')
        if order_item:
            order_item.add_orders(self.orders.values())

    def add_trades(self,trades:List[TradeData]) -> None:
        """ 
        增加成交单列表到委托单绘图部件 
        """
        for trade in trades:
            self.trades[trade.tradeid] = trade

        trade_item : TradeItem = self.get_item('trade')
        if trade_item:
            trade_item.add_trades(self.trades.values())



################################################################
# 以下为测试代码
if __name__ == "__main__":
    def make_trades():
        import pytz
        CHINA_TZ = pytz.timezone("Asia/Shanghai")
        from vnpy.trader.object import Direction, Exchange, Interval, Offset, Status, Product, OptionType, OrderType,TradeData
        trades = [
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', tradeid='         455', direction=Direction.LONG, offset=Offset.OPEN, price=6131.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', tradeid='       12738', direction=Direction.LONG, offset=Offset.OPEN, price=6142.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', tradeid='       16233', direction=Direction.LONG, offset=Offset.OPEN, price=6158.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', tradeid='       22815', direction=Direction.LONG, offset=Offset.OPEN, price=6180.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', tradeid='       67570', direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=6400.0, volume=12, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,35))),
        ]
        return trades

    def make_orders():
        import pytz
        CHINA_TZ = pytz.timezone("Asia/Shanghai")

        orders = [
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,20)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,25)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12, traded=12, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,35)), reference=''),
        ]

        return orders


    # 开始测试代码
    app = create_qapp()

    symbol = "ag2012"
    exchange = Exchange.SHFE
    interval=Interval.MINUTE
    start=datetime(2020, 8, 13)
    end=datetime(2020, 8, 15)    

    dynamic = False  # 是否动态演示
    n = 1000          # 缓冲K线根数

    bars = database_manager.load_bar_data(
        symbol=symbol,
        exchange=exchange,
        interval=interval,
        start=start,
        end=end
    )

    print(f"一共读取{len(bars)}根K线")

    event_engine = EventEngine()

    widget = NewChartWidget(event_engine = event_engine)

    if dynamic:
        history = bars[:n]      # 先取得最早的n根bar作为历史
        new_data = bars[n:]     # 其它留着演示
    else:
        history = bars          # 先取得最新的n根bar作为历史
        new_data = []           # 演示的为空

    # 绘制历史K线主图及各个副图
    widget.update_history(history)

    # 绘制委托单到主图
    orders = make_orders()
    widget.add_orders(orders)

    # 绘制成交单到主图
    trades = make_trades()
    widget.add_trades(trades)

    def update_bar():
        if new_data:
            bar = new_data.pop(0)
            widget.update_bar(bar)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    if dynamic:
        timer.start(100)

    widget.show()

    event_engine.start()
    app.exec_()

4. 测试效果

kx_chart.py中自动测试代码,直接用VSCode打开就可以运行。

4.1 测试准备

在vnpy中使用数据管理模块,从米筐下载ag2012.SHFE的1分钟历史数据,必须包含8月13日~8月15日。

4.2 测试结果如下

description



整理的一些我自己学习vnpy疑问和解答,欢迎指正。

问题1:
同样的bar为什么在一个函数里是5分钟K线,又在另一个函数里是10分钟K线呢?

def on_5min_bar(self, bar: BarData):
am = self.am
am.update_bar(bar) 为什么这里的bar是5分钟呢?

def on_10min_bar(self, bar: BarData):
am = self.am
am.update_bar(bar) 为什么这里的bar是10分钟呢?

以下AI的回答:
这个问题涉及到金融市场的数据处理,尤其是在使用不同时间框架的K线数据进行交易策略开发时常见。在你提供的代码片段中,on_5min_bar和on_10min_bar函数分别处理不同时间周期的K线数据。原因在于
虽然他们的内部逻辑(如更新数据、进行计算等)可能相似或相同,但是这两个函数被设计用来响应特定的时间间隔的数据更新(一个是5分钟,一个是10分钟),所以会出你所说的情况。

如何区分5分钟和10分钟K线
触发机制: 这些函数通常由不同的触发事件调用。例如,在一个交易系统中,可能有一个计时器或监听器,

当5分钟K线数据更新时,触发on_5min_bar函数,此时bar就是5分钟K线;
当10分钟K线数据更新时,触发on_10min_bar函数,此时bar就是10分钟K线。

数据源: 每个函数接收的bar参数代表一个特定时间段的K线数据。这个数据通常是由市场数据提供商按照不同的时间间隔生成并发送的,例如每5分钟或每10分钟更新一次。

代码结构
函数定义: 你的函数on_5min_bar和on_10min_bar通过名称就已经隐含了它们处理的是不同时间框架的数据。
数据处理: 函数内部,调用am.update_bar(bar)实际上是将传入的bar数据(无论是5分钟还是10分钟的数据)更新到某个数据管理系统(如am)中。这里am可能是一个累积/聚合数据的类实例,用于进一步的数据分析或交易信号生成。

实际应用
在实际应用中,确保正确的数据被送到正确的处理函数是很重要的,通常需要在数据传入系统之前就对其进行正确的分类和调度。这样的设计允许策略开发者针对不同的数据粒度制定不同的交易逻辑和管理策略。

总结来说,每个函数处理的K线时间长度由触发这些函数的事件决定,这些事件依据K线数据的生成频率来区分。

另一回答:
在交易策略或回测系统中,不同的K线周期(如1分钟、5分钟、10分钟等)通常是通过不同的回调函数或处理函数来处理的。在您提供的代码片段中,on_5min_bar 和 on_10min_bar 函数分别处理5分钟和10分钟的K线数据。

这里的 bar 之所以在 on_5min_bar 函数中是5分钟K线,在 on_10min_bar 函数中是10分钟K线,是因为这些函数是分别设计用来处理相应周期的K线数据的。通常,这样的设计是通过某种机制(可能是内部的事件驱动或回调函数注册)来实现的,该机制在特定周期的K线数据准备好时调用相应的处理函数。

例如,在vn.py这样的量化交易框架中,您可能会设置一个BarGenerator来监听实时数据流或历史数据,并根据设置的周期(如5分钟或10分钟)来生成不同周期的K线数据。当5分钟K线完成时,on_5min_bar 函数会被调用,并传入这个5分钟的K线数据。同理,当10分钟K线完成时,on_10min_bar 函数会被调用,并传入这个10分钟的K线数据。

简单来说,不同的回调函数处理不同周期的K线数据,是因为它们在设计时就被指定用来处理特定周期的K线。在您的策略代码中,您应该已经定义了这些回调函数,并且将它们与相应的K线周期进行了关联。

请注意,这里的 bar 变量只是一个参数,它代表当前处理的K线数据。在不同的函数调用中,传入的 bar 参数值会根据上下文(即当前的K线周期)而有所不同。因此,尽管变量名都是 bar,但它们代表的数据内容和周期是不同的。



利用保存的成交订单,生成实盘交易分析报表

利用之前文章写的保存成交order,借鉴vnpy已有的回测分析报表,生成一个实盘交易分析报表。

截图如下。

这里有个统计按钮,点击输出统计界面

description
因为我的代码改动不少,直接拿来用估计不合适,只做参考。

首先要自行修改database的代码,新增按照strategy名称,和时间段读取保存到数据的order,按照时许正向返回。

读取出来是一个order的队列,下面代码可以放在cta_engine.py, 是把开仓价和开仓时间作为属性附加到平仓单,输出平仓单队列。对于没有成对的,或者跨期的品种配对要抛弃;这里在做移仓操作时候,要针对策略,生成虚拟的平仓和开新仓的订单,具体实现代码后面再移仓代码中说。

def convert_strategy_triggered_order(self,strategy_name,open_date = datetime(2001,10,10), close_date = datetime(2100,10,10)):
    result = database_manager.load_triggered_stop_order_data(strategy_name,open_date,close_date)
    if result:
        matchedOrderList = []
        last_order = None
        for order in result:
            if order.offset == Offset.OPEN:
                last_order = copy(order)
            elif last_order != None and order.offset == Offset.CLOSE:
                if last_order.vt_symbol != order.vt_symbol:
                    last_order = None
                    continue
                order.open_datetime= last_order.datetime
                order.open_price = last_order.average_price
                matchedOrderList.append(copy(order))
                last_order = None
        return matchedOrderList

然后下面代码是处理的平仓单队列进行分析,计算出各种指标,输出为DataFrame格式,这里包括收益金额,收益率,单位收益率,回撤,账面价值等。

def get_strategy_triggered_order(self,strategy_name):
    # result = database_manager.load_close_triggered_stop_order_data(strategy_name)
    result = self.convert_strategy_triggered_order(strategy_name)
    objectDF = DataFrame(data=None,columns=["开仓日期","平仓日期", "开仓方向", "开仓价", "手数", "平仓价", "收益"],dtype=object)
    if result:
        parameters = self.get_strategy_parameters(strategy_name)
        # 策略中定义的合约价值
        HeYueJiaZhi = parameters["HeYueJiaZhi"] 
        # 策略中定义的品种每笔数量
        HeYueChengShu = parameters["HeYueChengShu"]
        for close_data in result:
            if close_data.direction == Direction.LONG:
                close_data.direction = Direction.SHORT
            else:
                close_data.direction = Direction.LONG
            objectDF.loc[len(objectDF) + 1] = [close_data.vt_orderids[0].replace(tzinfo=None),close_data.datetime.replace(tzinfo=None), close_data.direction, close_data.open_price,close_data.volume, close_data.average_price,0.0]
        objectDF["收益"] = objectDF.apply(lambda x: x['开仓价'] - x['平仓价'] if x['开仓方向'] == Direction.SHORT else x['平仓价'] - x['开仓价'],
                                    axis=1)
        objectDF["UnitReturn"] = objectDF["收益"] * 100 / objectDF['开仓价']
        objectDF["收益"] = objectDF["收益"]*HeYueChengShu*objectDF["手数"]
        objectDF["balance"] = objectDF["收益"].cumsum() + HeYueJiaZhi
        objectDF["return"] = objectDF["收益"]*100/HeYueJiaZhi
        objectDF.loc[0] = copy(objectDF.iloc[0])
        objectDF = objectDF.sort_index()
        objectDF.loc[0,"balance"] = HeYueJiaZhi
        objectDF["highlevel"] = (
            objectDF["balance"].rolling(
                min_periods=1, window=len(objectDF), center=False).max()
        )
        objectDF.drop(index=0, inplace=True)
        objectDF["drawdown"] = objectDF["balance"] - objectDF["highlevel"]
        objectDF["ddpercent"] = objectDF["drawdown"] / objectDF["highlevel"] * 100
    return objectDF

下面代码是分析dataframe,计算指标数据

    def calculate_statistics(self, objectDF):
        """
        """
        data = {}
        # end_balance = df["balance"].iloc[-1]
        # max_drawdown = df["drawdown"].min()
        # max_ddpercent = df["ddpercent"].min()
        HeYueJiaZhi = self._data["parameters"]["HeYueJiaZhi"]
        data["capital"]  = HeYueJiaZhi
        data["total_net_pnl"] = objectDF["收益"].sum()
        data["end_balance"] = objectDF["balance"].iloc[-1]
        data["total_return"] = data["total_net_pnl"]*100/max(HeYueJiaZhi,1)
        data["max_drawdown"] = objectDF["drawdown"].min()
        data["max_ddpercent"] = objectDF["ddpercent"].min()
        data["total_trade_count"] = len(objectDF)
        data["winningResult"] = len(objectDF[objectDF["收益"] >0])
        data["losingResult"] = len(objectDF[objectDF["收益"] <0])
        data["winningRate"] = data["winningResult"] *100/ data["total_trade_count"]
        data["totalWinning"] = objectDF[objectDF["收益"] >0]["收益"].sum()
        data["totalLosing"] = objectDF[objectDF["收益"] <0]["收益"].sum()
        data["averageWinning"] = data["totalWinning"]/max(1,data["winningResult"])
        data["averageLosing"] = data["totalLosing"]/max(1,data["losingResult"])
        data["perprofitLoss"] = data["total_net_pnl"] / data["total_trade_count"]
        data["profitLossRatio"] = data["averageWinning"] / max(1,abs(data["averageLosing"]))
        return data

然后在cta_widget.py 中,加入显示代码,这里直接使用回撤模块的BacktesterChart


    def analyze_strategy(self):
        objectDF = self.cta_engine.get_strategy_triggered_order(self.strategy_name)
        if not objectDF.empty:
            triggerd_statistics_monitor = Triggered_OrderStatisticsMonitor()
            triggerd_statistics_monitor.set_data(self.calculate_statistics(objectDF))
            triggerd_statistics_monitor.setMinimumHeight(400)
            triggerd_view = TriggeredMonitor(self.cta_manager.main_engine, self.cta_manager.main_engine.event_engine)
            triggerd_view.set_df(objectDF)
            triggerd_view.setMinimumHeight(400)
            triggerd_view.setMinimumWidth(600)
            objectDF["net_pnl"] = objectDF["收益"]
            objectDF= objectDF.set_index("平仓日期")
            chart = BacktesterChart()
            chart.set_data(objectDF)
            analyz_dialog = QDialog()
            analyz_dialog.setWindowTitle(self.strategy_name)
            analyz_dialog.setWindowModality(Qt.NonModal)
            analyz_dialog.setWindowFlags(Qt.Dialog | Qt.WindowMinMaxButtonsHint | Qt.WindowCloseButtonHint)
            gbox = QtWidgets.QGridLayout()
            analyz_dialog.resize(1200, 800)
            gbox.addWidget(triggerd_statistics_monitor,0,0)
            gbox.addWidget(triggerd_view,1,0)
            gbox.addWidget(chart,0,1,2,1)
            analyz_dialog.setLayout(gbox)
            analyz_dialog.exec_()
class PercentCell(BaseCell):
    def __init__(self, content: Any, data: Any):
        super(PercentCell, self).__init__(content, data)
    def set_content(self, content: Any, data: Any) -> None:
        self.setText(f"{content:,.2f}%")
        self._data = data
class fullDatetimeCell(BaseCell):
    def __init__(self, content: Any, data: Any):
        super(fullDatetimeCell, self).__init__(content, data)
    def set_content(self, content: Any, data: Any) -> None:
        if content is None:
            return
        timestamp = content.strftime("%Y%m%d %H:%M:%S")
        self.setText(timestamp)
        self._data = data
class TriggeredMonitor(BaseMonitor):
    event_type = ""
    data_key = ""
    sorting = False
    # ["平仓日期", "方向", "开仓价", "手数", "平仓价", "收益"]
    headers = {
        "开仓日期": {"display": "开仓日期", "cell": fullDatetimeCell, "update": False},
        "平仓日期": {"display": "平仓日期", "cell": fullDatetimeCell, "update": False},
        "开仓方向": {"display": "开仓方向", "cell": DirectionCell, "update": False},
        "开仓价": {"display": "开仓价", "cell": BaseCell, "update": False},
        "手数": {"display": "手数", "cell": BaseCell, "update": False},
        "平仓价": {"display": "平仓价", "cell": BaseCell, "update": False},
        "收益": {"display": "收益", "cell": PnlCell, "update": False},
        "return": {"display": "收益率", "cell": PercentCell, "update": False},
        "UnitReturn": {"display": "单位收益率", "cell": PercentCell, "update": False},
        "balance": {"display": "当前资金", "cell": BaseCell, "update": False},
        "drawdown": {"display": "回撤", "cell": BaseCell, "update": False}
    }
    def set_df(self,objectDF):
        objectDF_list = objectDF.to_dict(orient='records')
        for record_item in objectDF_list:
            self.insert_data(record_item)
    def insert_data(self, data):
        self.insertRow(0)
        for column, header in enumerate(self.headers.keys()):
            setting = self.headers[header]
            content = data[header]
            cell = setting["cell"](content, data)
            self.setItem(0, column, cell)
    def __del__(self) -> None:
        pass
class Triggered_OrderStatisticsMonitor(QtWidgets.QTableWidget):

    KEY_NAME_MAP = {
        "capital": "策略定义资金",
        "end_balance": "历史结算资金",
        "total_net_pnl": "总盈亏",
        "total_return": "总收益率",
        "total_trade_count": "总成交笔数",
        "winningResult": "盈利次数",
        "losingResult" : "亏损次数",
        "winningRate": "笔数胜率",
        "max_drawdown": "最大回撤",
        "max_ddpercent": "最大回撤比率",
        "totalWinning": "总盈利金额",
        "totalLosing": "总亏损金额",
        "perprofitLoss": "平均单笔损益",
        "averageWinning": "盈利平均每笔",
        "averageLosing" : "亏损平均每笔",
        "profitLossRatio" : "盈亏比",
    }
    def __init__(self):
        super().__init__()
        self.cells = {}
        self.init_ui()
    def init_ui(self):
        self.setRowCount(len(self.KEY_NAME_MAP))
        self.setVerticalHeaderLabels(list(self.KEY_NAME_MAP.values()))
        self.setColumnCount(1)
        self.horizontalHeader().setVisible(False)
        self.horizontalHeader().setSectionResizeMode(
            QtWidgets.QHeaderView.Stretch
        )
        self.setEditTriggers(self.NoEditTriggers)
        for row, key in enumerate(self.KEY_NAME_MAP.keys()):
            cell = QtWidgets.QTableWidgetItem()
            self.setItem(row, 0, cell)
            self.cells[key] = cell
    def clear_data(self):
        for cell in self.cells.values():
            cell.setText("")
    def set_data(self, data: dict):
        data["capital"] = f"{data['capital']:,.2f}"
        data["end_balance"] = f"{data['end_balance']:,.2f}"
        data["total_net_pnl"] = f"{data['total_net_pnl']:,.2f}"
        data["total_return"] = f"{data['total_return']:,.2f}%"
        data["total_trade_count"] = f"{data['total_trade_count']}"
        data["winningResult"] = f"{data['winningResult']}"
        data["losingResult"] = f"{data['losingResult']}"
        data["winningRate"] = f"{data['winningRate']:,.2f}%"
        data["max_drawdown"] = f"{data['max_drawdown']:,.2f}"
        data["max_ddpercent"] = f"{data['max_ddpercent']:,.2f}%"
        data["totalWinning"] = f"{data['totalWinning']:,.2f}"
        data["totalLosing"] = f"{data['totalLosing']:,.2f}"
        data["averageWinning"] = f"{data['averageWinning']:,.2f}"
        data["averageLosing"] = f"{data['averageLosing']:,.2f}"
        data["perprofitLoss"] = f"{data['perprofitLoss']:,.2f}"
        data["profitLossRatio"] = f"{data['profitLossRatio']:,.2f}"
        for key, cell in self.cells.items():
            value = data.get(key, "")
            cell.setText(str(value))


tick与update_tick()方法、bar与update_bar() 简单比较——学习小结

先说结论:
回调函数on_bar()是在tick数据是新的一分钟tick时调用;
回调函数on_window_bar()、on_window_bar()和on_window_bar()都是对应K线最后合成时调用

description



on_bar()在update_tick()所处的位置,到现在才明白on_bar()的应用

description

在此代码片段中,当检测到一个新的时间段(例如,从一分钟到下一分钟)时,会调用on_bar(self.bar)方法。这通常意味着上一个时间段的数据处理已经完成,可以进行进一步的处理,比如
更新数据库、
发送到前端显示、
买、卖或平仓等动作,具体视on_bar()的函数内容。

bar与on_bar()相对应
同理window_bar也是与on_window_bar相对应

感觉自己真笨,到现在才明白on_bar()的应用。


新消息

统计

主题
9235
帖子
34959
已注册用户
46826
最新用户
在线用户
151
在线来宾用户
9247
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】