vn.py量化社区
By Traders, For Traders.

置顶主题

全市场录制行情数据

全市场行情录制,1核512内存应该也足够了,建议2核1G以上服务器

description

直接上代码

import sys
import multiprocessing
import re
from contextlib import closing
from copy import copy
from copy import deepcopy
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData, HistoryRequest, Product, TickData
from vnpy.trader.database import init
from vnpy.trader.setting import get_settings
from enum import Enum
from time import sleep
from datetime import datetime, time, timedelta
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
from vnpy.trader.event import EVENT_CONTRACT, EVENT_TICK

from vnpy.app.data_recorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")


def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))


class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"


class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def load_setting(self):
        pass

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", copy(tick))
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not is_trading(bar.vt_symbol, bar_time):
            return
        task = ("bar", copy(bar))
        self.queue.put(task)

    def process_contract_event(self, event):
        """"""
        contract = event.data
        vt_symbol = contract.vt_symbol
        # 不录制期权
        if is_futures(vt_symbol):
            if RecordMode.BAR in self.record_modes:
                self.add_bar_recording(vt_symbol)
            if RecordMode.TICK in self.record_modes:
                self.add_tick_recording(vt_symbol)
            self.subscribe(contract)


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)


def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(12, 0)

    AFTERNOON_START = time(12, 45)
    AFTERNOON_END = time(15, 35)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(3, 5)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)


if __name__ == "__main__":
    run_parent()


无界面模式运行CTA策略实现模拟(实盘)交易

对于新手,当然是通过VN Trader的界面操作是最直观和方便的,具体过程我就不做介绍了,入门系列教程讲的很详细。
如果想通过无界面启动模拟交易或者实盘交易,需要找到***\vnpy-2.1.1\examples\no_ui\文件夹下的run.py文件:

  1. 需要配置CTP、XTP或者其他接口的连接信息;
  2. 设置需要运行的CTA策略参数;
  3. run_child()子程序,在main_engine中增加相应接口;
  4. 创建CTA策略引擎;
  5. 创建自己的CTA策略;
  6. 初始化策略;
  7. 启动策略。

具体的示例如下:

import multiprocessing
from time import sleep
from datetime import datetime, time
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine

from vnpy.gateway.ctp import CtpGateway
from vnpy.gateway.xtp import XtpGateway
from vnpy.app.cta_strategy import CtaEngine
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True

# CTP接口连接设置
ctp_setting = {
    "用户名": "161xxx",
    "密码": "****************",
    "经纪商代码": "9999",
    "交易服务器": "180.168.146.187:10101",
    "行情服务器": "180.168.146.187:10111",
    "产品名称": "simnow_client_test",
    "授权编码": "0000000000000000",
    "产品信息": ""
}

# XTP接口连接设置
xtp_setting = {
    "账号": "53191000xxx",
    "密码": "********",
    "客户号": "1",
    "行情地址": "120.27.164.138",
    "行情端口": "6002",
    "交易地址": "120.27.164.69",
    "交易端口": "6001",
    "行情协议": "TCP",
    "授权码": "*****************************************"
}

# CTA策略信息
class_name = "DemoMaStrategyCTP"
strategy_name = "DM_ru2009"
vt_symbol = "ru2009.SHFE"
"""
class_name = "DemoMaStrategy"
strategy_name = "DM_601990"
vt_symbol = "601990.SSE"
"""
strategy_setting = {
    "fast_window": 5,
    "slow_window": 10
}


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(XtpGateway)
    main_engine.add_gateway(CtpGateway)
    main_engine.add_app(CtaStrategyApp)
    main_engine.write_log("主引擎创建成功")

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(xtp_setting, "XTP")
    main_engine.write_log("连接XTP接口")

    main_engine.connect(ctp_setting, "CTP")
    main_engine.write_log("连接CTP接口")

    sleep(10)

    # 创建CTA策略引擎
    cta_engine = CtaEngine(main_engine, event_engine)

    # 初始化CTA策略引擎, 会依次调用init_rqdata(), load_strategy_class()等函数
    cta_engine.init_engine()

    # 创建属于我们自己的策略,首次创建成功后会将参数写入到C:\Users\Administrator\.vntrader文件夹下的cta_strategy_setting.json文件内
    if strategy_name not in cta_engine.strategies:
        main_engine.write_log(f"创建{strategy_name}策略")
        cta_engine.add_strategy(class_name, strategy_name, vt_symbol, strategy_setting)
    else:
        cta_engine.update_strategy_setting(strategy_name, strategy_setting)

    # 初始化刚创建的策略
    cta_engine.init_strategy(strategy_name)

    # 留有足够的时间来进行策略初始化
    sleep(10)

    # 启动刚创建的策略
    cta_engine.start_strategy(strategy_name)

    # cta_engine.init_all_strategies()

    # sleep(60)
    # main_engine.write_log("CTA策略全部初始化")

    # cta_engine.start_all_strategies()
    # main_engine.write_log("CTA策略全部启动")

    print("正在交易中...")

    while True:
        sleep(1)

我已经成功运行SimNow的CTP接口,采用最简单的双均线策略进行了ru2009的模拟交易测试,下单的情况与策略逻辑一致。
目前vnpy的XTP接口只支持期货期权等具有'开'、'平'、'平今'、'平昨'的买卖操作,暂不支持股票的下单操作,服务器会拒单。原因是股票下单的"开平"应该选择'空',而不是'开'或者其他,询问过版主,XTP接口需要升级后才支持,但是在VN Trader界面操作中可以通过下拉菜单选择'空'进行委托下单。



也许你不会使用BarGenerator创建日K线、周K线、月K线 !!!

VNPY中的BarGenerator的构造函数是这样的:

class BarGenerator:
    ......
    def __init__(self,on_bar: Callable,window: int = 0,on_window_bar: Callable = None,interval: Interval = Interval.MINUTE ):
    ... ...

    def update_bar(self, bar: BarData) -> None:
        """
        Update 1 minute bar into generator
        """
        # If not inited, creaate window bar object
        if not self.window_bar:
            # Generate timestamp for bar data
            if self.interval == Interval.MINUTE:
                dt = bar.datetime.replace(second=0, microsecond=0)
            else:
                dt = bar.datetime.replace(minute=0, second=0, microsecond=0)

            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        # Otherwise, update high/low price into window bar
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price, bar.high_price)
            self.window_bar.low_price = min(
                self.window_bar.low_price, bar.low_price)

        # Update close price/volume into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += int(bar.volume)
        self.window_bar.open_interest = bar.open_interest

        # Check if window bar completed
        finished = False

        if self.interval == Interval.MINUTE:
            # x-minute bar
            if not (bar.datetime.minute + 1) % self.window:
                finished = True
        elif self.interval == Interval.HOUR:
            if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
                # 1-hour bar
                if self.window == 1:
                    finished = True
                # x-hour bar
                else:
                    self.interval_count += 1

                    if not self.interval_count % self.window:
                        finished = True
                        self.interval_count = 0

        if finished:
            self.on_window_bar(self.window_bar)
            self.window_bar = None

        # Cache last bar object
        self.last_bar = bar

而Interval类型是这样的:

class Interval(Enum):
    """
    Interval of bar data.
    """
    MINUTE = "1m"
    HOUR = "1h"
    DAILY = "d"
    WEEKLY = "w"

结论:

BarGenerator虽然传入的Interval类型,但是它只考虑的Interval.MINUTE和Interval.HOUR连个单位,而它合成N分钟和N小时的K线是没有问题的。
也就是说,你不可以是Interval.DAILY和Interval.WEEKLY做单位,因为它使用的米筐接口的1分钟历史数据,没有使用米筐的1h和1d数据。

因此你不可以这么创建月K线

1) self.bgm = BarGenerator(self.on_bar, 4, self.on_month_bar,interval=Interval.WEEKLY)
2) self.bgm = BarGenerator(self.on_bar,20, self.on_month_bar,interval=Interval.DAILY)
因为BarGenerator没有考虑 Interval.DAILY和Interval.WEEKLY时间间隔

创建日线以上周期K线必须考虑合约交易时间段:

交易时间段决定日K线小时数

使用Interval.MINUTE作为参数时,window不可以超过59,它表示合成不了成功1小时的K线,而Interval.HOUR作为参数时,是对1小时K线进行计数,然后把的self.interval_count % self.window作为条件来判断是否查询window小时K线是否结束的,它可以用来表达日线以上周期K线。
所以创建日线以上周期K线,你最大只可以使用Interval.HOUR为单位,而且它又是参考自然时间的生成机制:

举例:
rb2010的交易时间段 :21:00-23:30(4根小时K线)09:00-10:15 10:30-11:30(3根小时K线) 13:30-15:00(2根小时K线),因此需要9根1小时K线合成
ag2012的交易时间段 :21:00-02:30 (6根小时K线)09:00-10:15 10:30-11:30 (3根小时K线)13:30-15:00(2根小时K线),因此需要11根1小时K线合成
IF88 沪深主力连续 交易时间段:09:30-11:30(3根小时K线),13:00-15:00(2根小时K线),每日时长:5小时,因此需要6根1小时K线合成
T2009 10年期国债2009 交易时间段:09:30-11:30(3根小时K线),13:00-15:15(3根小时K线),因此需要6根1小时K线合成
TS2103 2年期国债2103 交易时间段:09:30-11:30(3根小时K线),13:00-15:15(3根小时K线),因此需要6根1小时K线合成

如果看不明白上面的叙述,就静下心来慢慢想一些吧,想不明白就看看BarGenerator的update_bar()函数代码就明白了。

下面仅以rb2010和ag2012合约为例来说明,其他周期的类似。

日K线产生器

rb2010的日K线产生器:
self.bgm = BarGenerator(self.on_bar, 9, self.on_day_bar,interval=Interval.HOUR)
ag2012的日K线产生器:
self.bgm = BarGenerator(self.on_bar, 11, self.on_day_bar,interval=Interval.HOUR)

5日K线合成周K线

rb2010的周K线产生器:
self.bgm = BarGenerator(self.on_bar, 45, self.on_week_bar,interval=Interval.HOUR)
ag2012的周K线产生器:
self.bgm = BarGenerator(self.on_bar, 55, self.on_week_bar,interval=Interval.HOUR)

4周K线合成月K线

rb2010的月K线产生器:
self.bgm = BarGenerator(self.on_bar, 180, self.on_month_bar,interval=Interval.HOUR)
ag2012的月K线产生器:
self.bgm = BarGenerator(self.on_bar, 220, self.on_month_bar,interval=Interval.HOUR)

注意:

这些合成的日K能够保证是日内对齐的,但周K和月K线并不能够保证是周内和月内对齐的,它取决你什么时候启动你的策略。

更好的创建方法:

更好的创建方法先对BarGenerator进行扩展,实现考虑交易时间段的日K、周K的生成机制,当然创建时需要传入交易时间段参数。这里就不在说了,以后可以专门讨论。



你的K线应该是确定的,可实际上却不是!

VNPY系统CTA策略中的BarGenerater的问题:

当K线周期为1,5,15分钟或者1日,1周时,它是没有问题的
当K线周期为10,20,30分钟或者一些自定义分钟周期如7分钟,还有1、2,4小时,由于合约的交易时段的不规则,导致某些K线的周期于其时间发生的交易时间不想等。
你开启CTA策略的时机是随机的,这导致self.laod_bar(20)的执行也是随机的,它从米筐或者数据库加载的1分钟数据也是随机,最终导致你所产生的上述K线也是随机的。那你已经这样的K线数据计算出来的指标在某种程度上可能也随机的。

问题出在合约的交易时间上

不赘述原因了,举例吧:
RB2010交易时间段:'21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00'
假如你的K线每天从21:01开始计算K线,
如果K线的周期为10分钟,那么在10:10-10:20的那个K线其实只交易了5分钟
如果K线的周期为20分钟,那么在10:00-10:20的那个K线其实只交易了15分钟,在10:20-10:40的那个K线其实只交易了10分钟
如果K线的周期为30分钟,那么在10:00-10:30的那个K线其实只交易了15分钟
如果K线的周期为60分钟,那么在10:00-11:00的那个K线其实只交易了45分钟
如果K线的周期为120分钟,那么在10:00-14:00的那个K线其实只交易了105分钟

K线应该是从初始化策略之时往前计算,还是从上市日期开始计算?

如果BarGenerator采用等交易时长产生K线,策略初始化时通过load_bar(n),读取1分钟历史K线,目前BarGenerator时是从n日之前的第一个1分钟K线区合约其他周期的K线的。
这导某些周期K线随n值不同,K线的起止时间会变化。而如果采用从上市日期开始计算等交易时长的K线位置,则无论何时初始化策略,K线的起止时间都是一样的。

你希望那种K线:

1 等自然时长K线——无需考虑交易时段
2 等交易时长K线——需要考虑交易时段
3 从上市日起算K线——起止位置固定
4 从策略初始化时起算K线——起止位置不固定

第6帖子已经实现目标——固定交易时长位置固定的K线图

description



VNPY模块间调用关系

画了一个周末,头晕了,简直就是个迷宫,放一放先...
description



全市场期货数据的批量下载和更新

不管是研究套利策略,还是多因子策略,都需要多品种的历史数据,所以下面介绍一下,如何调用vnpy的数据下载模块,来下载全市场的期货数据。

 

批量下载

 

1)设置合约品种

 

首先,我们要先生成一个字典,来指定需要下载的数据,关键字段有3个:

 

  • 交易所代号:上期所-> SHFE
  • 合约代号: 螺纹钢-> RB
  • 合约品种类型: 指数合约 -> 99

 

这样,在RQData中,我们要下载螺纹钢指数合约的历史数据,需要转成的代号为RB99.SHFE。
 

然后,由于是全市场行情的数据,所以字典的数据结构如下:key是交易所,value是列表,里面包含各种期货品种,这样,只要在遍历一下这个字典,就可以得到所有,如RB99.SHFE这样结构的字符串。

 

symbols = {
    "SHFE": ["CU", "AL", "ZN", "PB", "NI", "SN", "AU", "AG", "RB", "WR", "HC", "SS", "BU", "RU", "NR", "SP", "SC", "LU", "FU"],
    "DCE": ["C", "CS", "A", "B", "M", "Y", "P", "FB","BB", "JD", "RR", "L", "V", "PP", "J", "JM", "I", "EG", "EB", "PG"],
    "CZCE": ["SR", "CF", "CY", "PM","WH", "RI", "LR", "AP","JR","OI", "RS", "RM", "TA", "MA", "FG", "SF", "ZC", "SM", "UR", "SA", "CL"],
    "CFFEX": ["IH","IC","IF", "TF","T", "TS"]
}
​
symbol_type = "99"

 
2) 设置下载时间
 

我们只需要设置下载的开始和结束时间即可,需要注意的是,vnpy数据下载模块的入参是datetime.datetime格式,所以,要做到格式的一致,代码如下:
 

from datetime import datetime
start_date = datetime(2005,1,1)
end_date = datetime(2020,9,10)

 
3)批量下载全市场数据

 
批量下载数据,并不难,其运作步骤如下:

 

  1. 遍历symbols字典,
  2. 生成不同的HistoryRequest,
  3. 调用数据下载模块rqdata_client.query_history,得到数据data
  4. 调用数据保存模块database_manager.save_bar_data,把下载好的数据data写入数据库
     
    from vnpy.trader.rqdata import rqdata_client
    from vnpy.trader.database import database_manager
    from vnpy.trader.constant import Exchange, Interval
    from vnpy.trader.object import HistoryRequest
    ​
    def load_data(req):
     data = rqdata_client.query_history(req)
     database_manager.save_bar_data(data)
     print(f"{req.symbol}历史数据下载完成")
    ​
    for exchange, symbols_list in symbols.items():
     for s in symbols_list:
         req = HistoryRequest(
         symbol=s+symbol_type,
         exchange=Exchange(exchange),
         start=start_date,
         interval=Interval.DAILY,
         end=end_date,
         )
         load_data(req=req)
     

写好脚本后,我们运行一下代码,可以看到很快就下完全市场期货的日线数据啦。

description

 

若要下载小时或者分钟级别数据,只要把日线周期(Interval.DAILY)改成对应的小时,或者分钟即可。

 

定时批量更新数据

 

有了历史数据后,自然产生每天定时更新数据的需求

 

1)设置定时器

 
我们希望在收盘后,某个时间点如下午5点启动脚本,来自动下载数据。这本质上是包含了一个父进程和一个子进程。
 

父进程可以是一个永远在运行的python程序,如while循环,然后设置触发条件,如当时间刚好到下午5点就启动子进程下载更新数据,其他时间则睡觉等待。

 
代码如下:

from datetime import datetime, time
from time import sleep
​
current_time = datetime.now().time()
start_time = time(17,0)
​
while True:
  sleep(10)

  if current_time == start_time:
    download_data()

 
2)获取数据库数据

 
更新数据时候,我们要以数据库里面最新的数据的时间点,作为开始时间,而结束时间就是当天。比如,昨天刚好下载完所有市场的数据,那么今天我们只需要下载从昨天到今天的所有数据即可。
 

这样实现起来也不难,步骤如下:
 

1)调用database_manager.get_bar_data_statistics来得到字典格式的数据数据库所有信息

 

data = database_manager.get_bar_data_statistics()

 

2)获取各品种最新数据的时间信息,并且插入到data字典中

 

for d in data:
    newest_bar = database_manager.get_newest_bar_data(
        d["symbol"], Exchange(d["exchange"]), Interval(d["interval"])
    )
    d["end"] = newest_bar.datetime

 

然后我们看看data字典,发现真的包含所有行情的数据,但我们是基于RQData来定期更新信息的,所以要进行筛选,得到国内期货品种(通过交易所来判断)并且是日线级别的数据。
 
description
 

3)基于交易所和K线周期筛选品种,得到新的字典symbols,其中key包含合约代码,交易所,value就是数据库的结束时间,如下图:

 

symbols = {}
for i in data:
    if i["interval"] == "d" and  i["exchange"] in Exchanges:
        vt_symbol = f"{i['symbol']}.{i['exchange']}"
        end = i["end"].date()
        symbols[vt_symbol] = end

description
 
4)设置下载结束时间为当天,基于symbols字典的信息,遍历组合得到HistoryRequest,然后再调用上面定义好的load_data函数下载数据并写入数据库中。

 

end_date = datetime.now().date()
​
for vt_symbol, start_date in symbols.items():
    symbol = vt_symbol.split(".")[0]
    exchange = vt_symbol.split(".")[1]
    req = HistoryRequest(
    symbol=symbol,
    exchange=Exchange(exchange),
    start=start_date,
    interval=Interval.DAILY,
    end=end_date,
    )
    load_data(req=req)

 

下载好之后,我们再获取数据库里面最新的K线时间,发现成功更新到今天了。

 
description



分享一个螺纹钢30分钟均线策略

作者:爱茶语 ;来源:维恩的派论坛

 

  • 原文测试时间区间是20120111--20171117,样本内夏普比率达1.35。
  • 今进行样本外测试,时间区间20130111--20190102,夏普比率为0.78。

结果显示尽管参数不多,但是模型还是过拟合了。但是在策略内实现Tick数据聚合成X分钟K线还是值得学习一下
 

回测设置

# 设置回测使用的数据
engine.setBacktestingMode(engine.BAR_MODE)    # 设置引擎的回测模式为K线
engine.setDatabase(MINUTE_DB_NAME, 'RB99')  # RQDATA一分钟期货指数数据
engine.setStartDate('20130101')               # 设置回测用的数据起始日期

# 配置回测引擎参数
engine.setSlippage(1)      # 设置滑点为1跳
engine.setRate(1/1000)    # 设置手续费
engine.setSize(10)         # 设置合约大小 
engine.setPriceTick(1)     # 设置最小价格变动   
engine.setCapital(200000)   # 设置回测本金

 
 

回测效果

 
enter image description here

 
 
策略代码如下
 

# encoding: UTF-8


import talib
import numpy as np

from vnpy.trader.vtObject import VtBarData
from vnpy.trader.vtConstant import EMPTY_INT,EMPTY_STRING
from vnpy.trader.app.ctaStrategy.ctaTemplate import CtaTemplate

########################################################################
class RBMAStrategy(CtaTemplate):
    """结合MA的一个30分钟线交易策略"""
    className = 'RBMAStrategy'
    author = 'xldistance'

    #策略参数

    initDays = 33    # 初始化数据所用的天数默认35
    open_pos = 10   #每次交易的手数
    OCM = 30      #操作分钟周期(1,60)默认30
    # 策略变量
    bar = None                  # K线对象
    barMinute = EMPTY_STRING    # K线当前的分钟
    minutebar = None        # minuteK线对象
    ma_windows1 = 20    #默认20
    ma_windows2 = 200    #默认200
    # 参数列表,保存了参数的名称
    paramList = ['name',
                 'className',
                 'author',
                 'vtSymbol',
                 'open_pos']

    # 变量列表,保存了变量的名称
    varList = ['inited',
               'trading',
               'pos',
               'OCM',
               'ma20_value',
               'ma200_value']
    #----------------------------------------------------------------------
    def __init__(self, ctaEngine, setting):
        """Constructor"""
        super(RBMAStrategy, self).__init__(ctaEngine, setting)
        """
        如果是多合约实例的话,变量需要放在__init__里面
        """
        #self.orderList = []
        self.barList = []
        self.bufferSize = 201                    # 需要缓存的数据的大小
        self.bufferCount = 0                     # 目前已经缓存了的数据的计数
        self.highArray = np.zeros(self.bufferSize)    # K线最高价的数组
        self.lowArray = np.zeros(self.bufferSize)     # K线最低价的数组
        self.closeArray = np.zeros(self.bufferSize)   # K线收盘价的数组
        self.openArray = np.zeros(self.bufferSize)   # K线开盘价的数组
        self.LongEnterable = False
        self.ShortEnterable = False
        self.ma20_value = 0
        self.ma200_value = 0
    def onInit(self):
        self.writeCtaLog('%s策略初始化' %self.name)

        # 载入历史数据,并采用回放计算的方式初始化策略数值
        initData = self.loadBar(self.initDays)
        for bar in initData:
            self.onBar(bar)

        self.putEvent()

    #----------------------------------------------------------------------
    def onStart(self):
        """启动策略(必须由用户继承实现)"""
        self.writeCtaLog('%s策略启动' %self.name)
        self.putEvent()

    #----------------------------------------------------------------------
    def onStop(self):
        """停止策略(必须由用户继承实现)"""
        self.writeCtaLog('%s策略停止' %self.name)
        self.putEvent()

    #----------------------------------------------------------------------
    def onTick(self, tick):
        """收到行情TICK推送(必须由用户继承实现)"""


        tickMinute = tick.datetime.minute
        if tickMinute != self.barMinute:
            if self.bar:
                self.onBar(self.bar)

            bar = VtBarData()
            bar.vtSymbol = tick.vtSymbol
            bar.symbol = tick.symbol
            bar.exchange = tick.exchange

            bar.open = tick.lastPrice
            bar.high = tick.lastPrice
            bar.low = tick.lastPrice
            bar.close = tick.lastPrice

            bar.date = tick.date
            bar.time = tick.time
            bar.datetime = tick.datetime    # K线的时间设为第一个Tick的时间

            self.bar = bar                  # 这种写法为了减少一层访问,加快速度
            self.barMinute = tickMinute     # 更新当前的分钟
        else:                               # 否则继续累加新的K线
            bar = self.bar                  # 写法同样为了加快速度

            bar.high = max(bar.high, tick.lastPrice)
            bar.low = min(bar.low, tick.lastPrice)
            bar.close = tick.lastPrice

   #----------------------------------------------------------------------
    def onBar(self, bar):
        """收到Bar推送(必须由用户继承实现)"""

        if self.LongEnterable:
            if self.pos == 0:# and bar.close > self.dayOpen
                self.buy(bar.close,self.open_pos,True)
            elif self.pos < 0 :
                self.cover(bar.close,abs(self.pos),True)
        if self.ShortEnterable:
            if self.pos ==0:#and bar.close < self.dayOpen
                self.short(bar.close,self.open_pos,True)
            elif self.pos > 0:
                self.sell(bar.close,abs(self.pos),True)

        if bar.datetime.minute  % self.OCM == 0:
            # 如果已经有聚合minuteK线
            if self.minutebar:
                # 将最新分钟的数据更新到目前minute线中
                minutebar = self.minutebar
                minutebar.high = max(minutebar.high, bar.high)
                minutebar.low = min(minutebar.low, bar.low)
                minutebar.close = bar.close

                # 推送minute线数据
                self.onminutebar(minutebar)

                # 清空minute线数据缓存
                self.minutebar = None
        else:
            # 如果没有缓存则新建
            if not self.minutebar:
                minutebar = VtBarData()

                minutebar.vtSymbol = bar.vtSymbol
                minutebar.symbol = bar.symbol
                minutebar.exchange = bar.exchange

                minutebar.open = bar.open
                minutebar.high = bar.high
                minutebar.low = bar.low
                minutebar.close = bar.close

                minutebar.date = bar.date
                minutebar.time = bar.time
                minutebar.datetime = bar.datetime

                self.minutebar = minutebar
            else:
                minutebar = self.minutebar
                minutebar.high = max(minutebar.high, bar.high)
                minutebar.low = min(minutebar.low, bar.low)
                minutebar.close = bar.close
        # 发出状态更新事件
        self.putEvent()

    #----------------------------------------------------------------------
    def onminutebar(self,bar):
        """收到Bar推送(必须由用户继承实现)"""
        # 撤销之前发出的尚未成交的委托(包括限价单和停止单)
        #for orderID in self.orderList:
            #self.cancelOrder(orderID)
        #self.orderList = []

        # 保存K线数据
        self.closeArray[0:self.bufferSize-1] = self.closeArray[1:self.bufferSize]
        self.highArray[0:self.bufferSize-1] = self.highArray[1:self.bufferSize]
        self.lowArray[0:self.bufferSize-1] = self.lowArray[1:self.bufferSize]
        self.openArray[0:self.bufferSize-1] = self.openArray[1:self.bufferSize]
        self.closeArray[-1] = bar.close
        self.highArray[-1] = bar.high
        self.lowArray[-1] = bar.low
        self.openArray[-1] = bar.open
        self.bufferCount += 1
        if self.bufferCount < self.bufferSize:
            return
        # 计算指标数值
        ma_20 = talib.EMA(self.closeArray,timeperiod = self.ma_windows1)
        ma_200 = talib.EMA(self.closeArray,timeperiod = self.ma_windows2)
        self.ma20_value = ma_20[-1]
        self.ma200_value = ma_200[-1]
        self.LongEnterable = ma_20[-1] > ma_200[-1] and ma_20[-2] < ma_200[-2]
        self.ShortEnterable = ma_20[-1] < ma_200[-1] and ma_20[-2] > ma_200[-2]

        # 发出状态更新事件
        self.putEvent()

    #----------------------------------------------------------------------
    def onOrder(self, order):
        """收到委托变化推送(必须由用户继承实现)"""
        pass

    #----------------------------------------------------------------------
    def onTrade(self, trade):
        # 发出状态更新事件
        self.putEvent()
    def onStopOrder(self, so):
        """停止单推送"""
        pass


为K线图表添砖加瓦——一个完整的K线图表

1. 典型的绘图部件

保存文件:vnpy\usertools\chart_items.py
其中包含:

  • LineItem
  • RsiItem
  • SmaItem
  • MacdItem
  • TradeItem
  • OrderItem
from datetime import datetime
from typing import List, Tuple, Dict

from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from pyqtgraph import ScatterPlotItem
import pyqtgraph as pg
import numpy as np
import talib
import copy

from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager

from vnpy.trader.object import (
    BarData,
    OrderData,
    TradeData
)

from vnpy.trader.object import Direction, Exchange, Interval, Offset, Status, Product, OptionType, OrderType

from collections import OrderedDict
import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")


class LineItem(CandleItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        last_bar = self._manager.get_bar(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.white_pen)

        # Draw Line
        end_point = QtCore.QPointF(ix, bar.close_price)

        if last_bar:
            start_point = QtCore.QPointF(ix - 1, last_bar.close_price)
        else:
            start_point = end_point

        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def get_info_text(self, ix: int) -> str:
        """"""
        text = ""
        bar = self._manager.get_bar(ix)
        if bar:
            text = f"Close:{bar.close_price}"
        return text

class SmaItem(CandleItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)

        self.sma_window = 10
        self.sma_data: Dict[int, float] = {}

    def get_sma_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 0

        # When initialize, calculate all rsi value
        if not self.sma_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            sma_array = talib.SMA(np.array(close_data), self.sma_window)

            for n, value in enumerate(sma_array):
                self.sma_data[n] = value

        # Return if already calcualted
        if ix in self.sma_data:
            return self.sma_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix - self.sma_window, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        sma_array = talib.SMA(np.array(close_data), self.sma_window)
        sma_value = sma_array[-1]
        self.sma_data[ix] = sma_value

        return sma_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        sma_value = self.get_sma_value(ix)
        last_sma_value = self.get_sma_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.blue_pen)

        # Draw Line
        start_point = QtCore.QPointF(ix-1, last_sma_value)
        end_point = QtCore.QPointF(ix, sma_value)
        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.sma_data:
            sma_value = self.sma_data[ix]
            text = f"SMA {sma_value:.1f}"
        else:
            text = "SMA -"

        return text

class RsiItem(ChartItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=2)

        self.rsi_window = 14
        self.rsi_data: Dict[int, float] = {}

    def get_rsi_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 50

        # When initialize, calculate all rsi value
        if not self.rsi_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            rsi_array = talib.RSI(np.array(close_data), self.rsi_window)

            for n, value in enumerate(rsi_array):
                self.rsi_data[n] = value

        # Return if already calcualted
        if ix in self.rsi_data:
            return self.rsi_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix - self.rsi_window, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        rsi_array = talib.RSI(np.array(close_data), self.rsi_window)
        rsi_value = rsi_array[-1]
        self.rsi_data[ix] = rsi_value

        return rsi_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        rsi_value = self.get_rsi_value(ix)
        last_rsi_value = self.get_rsi_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Draw RSI line
        painter.setPen(self.yellow_pen)

        if np.isnan(last_rsi_value) or np.isnan(rsi_value):
            # print(ix - 1, last_rsi_value,ix, rsi_value,)
            pass
        else:
            end_point = QtCore.QPointF(ix, rsi_value)
            start_point = QtCore.QPointF(ix - 1, last_rsi_value)
            painter.drawLine(start_point, end_point)

        # Draw oversold/overbought line
        painter.setPen(self.white_pen)

        painter.drawLine(
            QtCore.QPointF(ix, 70),
            QtCore.QPointF(ix - 1, 70),
        )

        painter.drawLine(
            QtCore.QPointF(ix, 30),
            QtCore.QPointF(ix - 1, 30),
        )

        # Finish
        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        # min_price, max_price = self._manager.get_price_range()
        rect = QtCore.QRectF(
            0,
            0,
            len(self._bar_picutures),
            100
        )
        return rect

    def get_y_range( self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """  """
        return 0, 100

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.rsi_data:
            rsi_value = self.rsi_data[ix]
            text = f"RSI {rsi_value:.1f}"
            # print(text)
        else:
            text = "RSI -"

        return text


def to_int(value: float) -> int:
    """"""
    return int(round(value, 0))

""" 将y方向的显示范围扩大到1.1 """
def adjust_range(in_range:Tuple[float, float])->Tuple[float, float]:
    ret_range:Tuple[float, float]
    diff = abs(in_range[0] - in_range[1])
    ret_range = (in_range[0]-diff*0.05,in_range[1]+diff*0.05)
    return ret_range

class MacdItem(ChartItem):
    """"""
    _values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

    last_range:Tuple[int, int] = (-1,-1)    # 最新显示K线索引范围

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)

        self.short_window = 12
        self.long_window = 26
        self.M = 9

        self.macd_data: Dict[int, Tuple[float,float,float]] = {}

    def get_macd_value(self, ix: int) -> Tuple[float,float,float]:
        """"""
        if ix < 0:
            return (0.0,0.0,0.0)

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_data), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = (diffs[n],deas[n],macds[n])

        # Return if already calcualted
        if ix in self.macd_data:
            return self.macd_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix-self.long_window-self.M+1, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        diffs,deas,macds = talib.MACD(np.array(close_data), 
                                            fastperiod=self.short_window, 
                                            slowperiod=self.long_window, 
                                            signalperiod=self.M) 
        diff,dea,macd = diffs[-1],deas[-1],macds[-1]
        self.macd_data[ix] = (diff,dea,macd)

        return (diff,dea,macd)

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        macd_value = self.get_macd_value(ix)
        last_macd_value = self.get_macd_value(ix - 1)

        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        #   获得3个指标在y轴方向的范围   
        #   hxxjava 修改,2020-6-29
        #   当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,

        offset = max(self.short_window,self.long_window) + self.M - 1

        if not self.macd_data or len(self.macd_data) < offset:
            return 0.0, 1.0

        # print("len of range dict:",len(self._values_ranges),",macd_data:",len(self.macd_data),(min_ix,max_ix))

        if min_ix != None:          # 调整最小K线索引
            min_ix = max(min_ix,offset)

        if max_ix != None:          # 调整最大K线索引
            max_ix = min(max_ix, len(self.macd_data)-1)

        last_range = (min_ix,max_ix)    # 请求的最新范围   

        if last_range == (None,None):   # 当显示范围不变时
            if self.last_range in self._values_ranges:  
                # 如果y方向范围已经保存
                # 读取y方向范围
                result = self._values_ranges[self.last_range]
                # print("1:",self.last_range,result)
                return adjust_range(result)
            else:
                # 如果y方向范围没有保存
                # 从macd_data重新计算y方向范围
                min_ix,max_ix = 0,len(self.macd_data)-1

                macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
                ndarray = np.array(macd_list)           
                max_price = np.nanmax(ndarray)
                min_price = np.nanmin(ndarray)

                # 保存y方向范围,同时返回结果
                result = (min_price, max_price)
                self.last_range = (min_ix,max_ix)
                self._values_ranges[self.last_range] = result
                # print("2:",self.last_range,result)
                return adjust_range(result)

        """ 以下为显示范围变化时 """

        if last_range in self._values_ranges:
            # 该范围已经保存过y方向范围
            # 取得y方向范围,返回结果
            result = self._values_ranges[last_range]
            # print("3:",last_range,result)
            return adjust_range(result)

        # 该范围没有保存过y方向范围
        # 从macd_data重新计算y方向范围
        macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
        ndarray = np.array(macd_list) 
        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)

        # 取得y方向范围,返回结果
        result = (min_price, max_price)
        self.last_range = last_range
        self._values_ranges[self.last_range] = result
        # print("4:",self.last_range,result)
        return adjust_range(result)


    def get_info_text(self, ix: int) -> str:
        # """"""
        if ix in self.macd_data:
            diff,dea,macd = self.macd_data[ix]
            words = [
                f"diff {diff:.3f}"," ",
                f"dea {dea:.3f}"," ",
                f"macd {macd:.3f}"
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nmacd -"

        return text


class TradeItem(ScatterPlotItem,CandleItem): 
    """
    成交单绘图部件
    """
    def __init__(self, manager: BarManager):
        """"""
        ScatterPlotItem.__init__(self)
        # CandleItem.__init__(self,manager)
        # super(TradeItem,self).__init__(manager)
        super(CandleItem,self).__init__(manager)


        self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)

        self.trades : Dict[int,Dict[str,TradeData]] = {} # {ix:{tradeid:trade}}

    def add_trades(self,trades:List[TradeData]):
        """ 增加成交单列表到TradeItem """
        for trade in trades:
            self.add_trade(trade)

        self.set_scatter_data()
        self.update()

    def add_trade(self,trade:TradeData,draw:bool=False):
        """ 增加一个成交单到TradeItem """
        # 这里使用reverse=True,是考虑到实盘成交往往发生在最新的bar里,可以加快搜索速度
        od = OrderedDict(sorted(self._manager._datetime_index_map.items(),key = lambda t:t[0],reverse=True))
        idx = self._manager.get_count() - 1
        for dt,ix in od.items():
            # print(f"dt={dt}\ntrade.datetime {trade.datetime}")
            dt1 = CHINA_TZ.localize(datetime.combine(dt.date(),dt.time()))
            if dt1 <= trade.datetime:
                # print(f"【dt={dt},dt1={dt1},dt2={trade.datetime} ix={ix}】")
                idx = ix
                break

        # 注意:一个bar期间可能发生多个成交单
        if idx in self.trades:
            self.trades[idx][trade.tradeid] = trade
        else:
            self.trades[idx] = {trade.tradeid:trade}

        if draw:        
            self.set_scatter_data()
            self.update()

        # print(f"add_trade idx={idx} trade={trade}")

    def set_scatter_data(self):
        """ 把成交单列表绘制到ScatterPlotItem上 """
        scatter_datas = []
        for ix in self.trades:
            for trade in self.trades[ix].values():
                scatter = {
                    "pos" : (ix, trade.price),
                    "data": 1,
                    "size": 14,
                    "pen": pg.mkPen((255, 255, 255)),
                }

                if trade.direction == Direction.LONG:
                    scatter_symbol = "t1"   # Up arrow
                else:
                    scatter_symbol = "t"    # Down arrow

                if trade.offset == Offset.OPEN:
                    scatter_brush = pg.mkBrush((255, 255, 0))   # Yellow
                else:
                    scatter_brush = pg.mkBrush((0, 0, 255))     # Blue

                scatter["symbol"] = scatter_symbol
                scatter["brush"] = scatter_brush
                scatter_datas.append(scatter)

        self.setData(scatter_datas)

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.trades:
            text = "成交:"
            for tradeid,trade in self.trades[ix].items():
                # TradeData
                text += f"\n{trade.price}{trade.direction.value}{trade.offset.value}{trade.volume}手"
        else:
            text = "成交:-"

        return text


class OrderItem(ScatterPlotItem,CandleItem): 
    """
    委托单绘图部件
    """
    def __init__(self, manager: BarManager):
        """"""
        ScatterPlotItem.__init__(self)
        super(CandleItem,self).__init__(manager)

        self.orders : Dict[int,Dict[str,Order]] = {} # {ix:{orderid:order}}

    def add_orders(self,orders:List[OrderData]):
        """ 增加委托单列表到OrderItem """
        for order in orders:
            if order.datetime:
                self.add_order(order)

        self.set_scatter_data()
        self.update()

    def add_order(self,order:OrderData,draw:bool=False):
        """ 增加一个委托单到OrderItem """
        # 这里使用reverse=True,是考虑到实盘成交往往发生在最新的bar里,可以加快搜索速度

        od = OrderedDict(sorted(self._manager._datetime_index_map.items(),key = lambda t:t[0],reverse=True))
        idx = self._manager.get_count() - 1
        for dt,ix in od.items():
            # print(f"dt={dt}\ntrade.datetime {trade.datetime}")
            dt1 = CHINA_TZ.localize(datetime.combine(dt.date(),dt.time()))
            if dt1 <= order.datetime:
                # print(f"【dt={dt},dt1={dt1},dt2={order.datetime} ix={ix}】")
                idx = ix
                break

        # 注意:一个bar期间可能发生多个委托单
        if idx in self.orders:
            self.orders[idx][order.orderid] = order
        else:
            self.orders[idx] = {order.orderid:order}

        if draw:
            self.set_scatter_data()
            self.update()

    def set_scatter_data(self):
        """ 把委托单列表绘制到ScatterPlotItem上 """
        scatter_datas = []
        for ix in self.orders:
            lowest,highest=self.get_y_range()
            # print(f"range={lowest,highest}")
            for order in self.orders[ix].values():
                # 处理委托报价超出显示范围的问题
                if order.price>highest:
                    show_price = highest - 7
                elif order.price<lowest:
                    show_price = lowest + 7
                else:
                    show_price = order.price 

                scatter = {
                    "pos" : (ix, show_price),
                    "data": 1,
                    "size": 14,
                    "pen": pg.mkPen((255, 255, 255)),
                }

                if order.direction == Direction.LONG:
                    scatter_symbol = "t1"   # Up arrow
                else:
                    scatter_symbol = "t"    # Down arrow

                if order.offset == Offset.OPEN:
                    scatter_brush = pg.mkBrush((0, 128, 128))   # Yellow
                else:
                    scatter_brush = pg.mkBrush((128, 128, 0))     # Blue

                scatter["symbol"] = scatter_symbol
                scatter["brush"] = scatter_brush
                scatter_datas.append(scatter)

        self.setData(scatter_datas)

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.orders:
            text = "委托:"
            for orderid,order in self.orders[ix].items():
                # OrderData
                text += f"\n{order.price}{order.direction.value}{order.offset.value}{order.volume}手"
        else:
            text = "委托:-"


        return text

2. 修改vnpy\chart\widget.py

为ChartWidget类增加下面的函数:

    def get_item(self,item_name:str):   # hxxjava add
        """
        Get chart item by item's name.
        """
        return self._items.get(item_name,None)

3. K线图表——各绘图部件使用方法演示

保存文件:vnpy\usertools\kx_chart.py

from datetime import datetime
from typing import List, Tuple, Dict

import numpy as np
import pyqtgraph as pg
import talib
import copy

from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from vnpy.trader.database import database_manager
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData

from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager
from vnpy.chart.base import NORMAL_FONT

from vnpy.trader.engine import MainEngine
from vnpy.event import Event, EventEngine

from vnpy.trader.event import (
    EVENT_TICK,
    EVENT_TRADE,
    EVENT_ORDER,
    EVENT_POSITION,
    EVENT_ACCOUNT,
    EVENT_LOG
)

from vnpy.app.cta_strategy.base import (   
    EVENT_CTA_TICK,     
    EVENT_CTA_BAR,      
    EVENT_CTA_ORDER,    
    EVENT_CTA_TRADE,     
    EVENT_CTA_HISTORY_BAR
)

from vnpy.trader.object import (
    Direction, 
    Exchange, 
    Interval, 
    Offset, 
    Status, 
    Product, 
    OptionType, 
    OrderType,
    OrderData,
    TradeData,
)

from vnpy.usertools.chart_items import (
    LineItem,
    RsiItem,
    SmaItem,
    MacdItem,
    TradeItem,
    OrderItem,
)


class NewChartWidget(ChartWidget):
    """ 
    基于ChartWidget的K线图表 
    """
    MIN_BAR_COUNT = 100

    signal_cta_history_bar:QtCore.pyqtSignal = QtCore.pyqtSignal(Event)
    signal_cta_tick: QtCore.pyqtSignal = QtCore.pyqtSignal(Event)
    signal_cta_bar:QtCore.pyqtSignal = QtCore.pyqtSignal(Event)

    def __init__(self, parent: QtWidgets.QWidget = None,event_engine: EventEngine = None,strategy_name:str=""):
        """ 初始化 """
        super().__init__(parent)
        self.strategy_name = strategy_name
        self.event_engine = event_engine

        # 创建K线主图及多个绘图部件
        self.add_plot("candle", hide_x_axis=True)
        self.add_item(CandleItem, "candle", "candle")
        self.add_item(LineItem, "line", "candle")
        self.add_item(SmaItem, "sma", "candle")
        self.add_item(OrderItem, "order", "candle")
        self.add_item(TradeItem, "trade", "candle")

        # 创建成交量附图及绘图部件
        self.add_plot("volume", maximum_height=150)
        self.add_item(VolumeItem, "volume", "volume")

        # 创建RSI附图及绘图部件
        self.add_plot("rsi", maximum_height=150)
        self.add_item(RsiItem, "rsi", "rsi")

        # 创建MACD附图及绘图部件
        self.add_plot("macd", maximum_height=150)
        self.add_item(MacdItem, "macd", "macd")

        # 创建最新价格线、光标
        self.add_last_price_line()
        self.add_cursor()
        self.setWindowTitle(f"K线图表——{symbol}.{exchange.value},{interval},{start}-{end}")

        # 委托单列表
        self.orders:List[str,OrderData] = {}
        # 成交单列表
        self.trades:List[str,TradeData] = {}

        # self.register_event()
        # self.event_engine.start()

    def register_event(self) -> None:
        """"""
        self.signal_cta_history_bar.connect(self.process_cta_history_bar)
        self.event_engine.register(EVENT_CTA_HISTORY_BAR, self.signal_cta_history_bar.emit)

        self.signal_cta_tick.connect(self.process_tick_event)
        self.event_engine.register(EVENT_CTA_TICK, self.signal_cta_tick.emit)

        self.signal_cta_bar.connect(self.process_cta_bar)
        self.event_engine.register(EVENT_CTA_BAR, self.signal_cta_bar.emit)

    def process_cta_history_bar(self, event:Event) -> None:
        """ 处理历史K线推送 """
        strategy_name,history_bars = event.data
        if strategy_name == self.strategy_name:
            self.update_history(history_bars)

            # print(f" {strategy_name} got an EVENT_CTA_HISTORY_BAR")

    def process_tick_event(self, event: Event) -> None:
        """ 处理tick数据推送 """
        strategy_name,tick = event.data
        if strategy_name == self.strategy_name:
            if self.last_price_line:
                self.last_price_line.setValue(tick.last_price)
            #print(f" {strategy_name} got an EVENT_CTA_TICK")

    def process_cta_bar(self, event:Event)-> None:
        """ 处理K线数据推送 """

        strategy_name,bar = event.data
        if strategy_name == self.strategy_name:
            self.update_bar(bar)
            # print(f"{strategy_name} got an EVENT_CTA_BAR")

    def add_last_price_line(self):
        """"""
        plot = list(self._plots.values())[0]
        color = (255, 255, 255)

        self.last_price_line = pg.InfiniteLine(
            angle=0,
            movable=False,
            label="{value:.1f}",
            pen=pg.mkPen(color, width=1),
            labelOpts={
                "color": color,
                "position": 1,
                "anchors": [(1, 1), (1, 1)]
            }
        )
        self.last_price_line.label.setFont(NORMAL_FONT)
        plot.addItem(self.last_price_line)

    def update_history(self, history: List[BarData]) -> None:
        """
        Update a list of bar data.
        """
        self._manager.update_history(history)

        for item in self._items.values():
            item.update_history(history)

        self._update_plot_limits()

        self.move_to_right()

        self.update_last_price_line(history[-1])

    def update_bar(self, bar: BarData) -> None:
        """
        Update single bar data.
        """
        self._manager.update_bar(bar)

        for item in self._items.values():
            item.update_bar(bar)

        self._update_plot_limits()

        if self._right_ix >= (self._manager.get_count() - self._bar_count / 2):
            self.move_to_right()

        self.update_last_price_line(bar)

    def update_last_price_line(self, bar: BarData) -> None:
        """"""
        if self.last_price_line:
            self.last_price_line.setValue(bar.close_price)

    def add_orders(self,orders:List[OrderData]) -> None:
        """ 
        增加委托单列表到委托单绘图部件 
        """
        for order in orders:
            self.orders[order.orderid] = order

        order_item : OrderItem = self.get_item('order')
        if order_item:
            order_item.add_orders(self.orders.values())

    def add_trades(self,trades:List[TradeData]) -> None:
        """ 
        增加成交单列表到委托单绘图部件 
        """
        for trade in trades:
            self.trades[trade.tradeid] = trade

        trade_item : TradeItem = self.get_item('trade')
        if trade_item:
            trade_item.add_trades(self.trades.values())



################################################################
# 以下为测试代码
if __name__ == "__main__":
    def make_trades():
        import pytz
        CHINA_TZ = pytz.timezone("Asia/Shanghai")
        from vnpy.trader.object import Direction, Exchange, Interval, Offset, Status, Product, OptionType, OrderType,TradeData
        trades = [
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', tradeid='         455', direction=Direction.LONG, offset=Offset.OPEN, price=6131.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', tradeid='       12738', direction=Direction.LONG, offset=Offset.OPEN, price=6142.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', tradeid='       16233', direction=Direction.LONG, offset=Offset.OPEN, price=6158.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', tradeid='       22815', direction=Direction.LONG, offset=Offset.OPEN, price=6180.0, volume=3, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53))),
            TradeData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', tradeid='       67570', direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=6400.0, volume=12, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,35))),
        ]
        return trades

    def make_orders():
        import pytz
        CHINA_TZ = pytz.timezone("Asia/Shanghai")

        orders = [
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_1', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 0, 1)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_2', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 14, 46)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_3', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 21, 59)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_753490688_4', type=OrderType.LIMIT, direction=Direction.LONG, offset=Offset.OPEN, price=6494.0, volume=3, traded=3, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 13, 21, 39, 53)), reference=''),

            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12.0, traded=0, status=Status.SUBMITTING, datetime=None, reference='TTS-ag2012'),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,20)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12, traded=0, status=Status.SUBMITTING, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,25)), reference=''),
            OrderData(gateway_name='CTP', symbol='ag2012', exchange=Exchange.SHFE, orderid='3_1962356227_1', type=OrderType.LIMIT, direction=Direction.SHORT, offset=Offset.CLOSEYESTERDAY, price=5870.0, volume=12, traded=12, status=Status.ALLTRADED, datetime=CHINA_TZ.localize(datetime(2020, 8, 14, 1, 44,35)), reference=''),
        ]

        return orders


    # 开始测试代码
    app = create_qapp()

    symbol = "ag2012"
    exchange = Exchange.SHFE
    interval=Interval.MINUTE
    start=datetime(2020, 8, 13)
    end=datetime(2020, 8, 15)    

    dynamic = False  # 是否动态演示
    n = 1000          # 缓冲K线根数

    bars = database_manager.load_bar_data(
        symbol=symbol,
        exchange=exchange,
        interval=interval,
        start=start,
        end=end
    )

    print(f"一共读取{len(bars)}根K线")

    event_engine = EventEngine()

    widget = NewChartWidget(event_engine = event_engine)

    if dynamic:
        history = bars[:n]      # 先取得最早的n根bar作为历史
        new_data = bars[n:]     # 其它留着演示
    else:
        history = bars          # 先取得最新的n根bar作为历史
        new_data = []           # 演示的为空

    # 绘制历史K线主图及各个副图
    widget.update_history(history)

    # 绘制委托单到主图
    orders = make_orders()
    widget.add_orders(orders)

    # 绘制成交单到主图
    trades = make_trades()
    widget.add_trades(trades)

    def update_bar():
        if new_data:
            bar = new_data.pop(0)
            widget.update_bar(bar)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    if dynamic:
        timer.start(100)

    widget.show()

    event_engine.start()
    app.exec_()

4. 测试效果

kx_chart.py中自动测试代码,直接用VSCode打开就可以运行。

4.1 测试准备

在vnpy中使用数据管理模块,从米筐下载ag2012.SHFE的1分钟历史数据,必须包含8月13日~8月15日。

4.2 测试结果如下

description



vn.py官方已对接的CTP穿透式认证授权码信息

中信期货
AppID:vntech_vnpy_2.0
AuthCode:WGEN56HLB6CYCVEG

浙商期货
AppID:client_vnpy_2.0
AuthCode:6AS2Z8PL8Z6DHSMN

广发期货
AppID:client_vnpy_2.0
AuthCode:IAVCU9CZE84QEVBP

光大期货
AppID:vntech_vnpy_2.0
AuthCode:N0FX456O5HMOLG7T

盛达期货
AppID:vntech_vnpy_2.0
AuthCode:JNPQF2IDO6UI05HE

格林大华
AppID:vntech_vnpy_2.0
AuthCode:K5DBG7ARW4GXJW1V



登录失败,报错:qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed。

现象:登录失败,服务器无法启动。

解决思路:使用命令行查看解决如下:

C:\Users\Administrator>python -m vnstation
qt.network.ssl: QSslSocket: cannot resolve EVP_PKEY_param_check
qt.network.ssl: QSslSocket: cannot resolve SSL_CTX_set_ciphersuites
qt.network.ssl: QSslSocket: cannot resolve SSL_set_psk_use_session_callback
qt.network.ssl: QSslSocket: cannot resolve SSL_SESSION_is_resumable
qt.network.ssl: QSslSocket: OpenSSL >= 1.1.1 is required; OpenSSL 1.1.0i 14 Aug 2018 was found instead
qt.network.ssl: QSslSocket: OpenSSL >= 1.1.1 is required; OpenSSL 1.1.0i 14 Aug 2018 was found instead
qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed
qt.network.ssl: QSslSocket: OpenSSL >= 1.1.1 is required; OpenSSL 1.1.0i 14 Aug 2018 was found instead

原因: OpenSSL版本低,更换版本。具体操作参考:https://blog.csdn.net/xiaolong1126626497/article/details/105220160

安装完新的SSL后,将libcrypto-1_1.dll 和libssl-1_1.dll两个文件,替换vnstudio下的这两个文件(而不是替换lib中的)。

重启。


统计

主题
4942
帖子
18585
已注册用户
19284
最新用户
在线用户
233
在线来宾用户
204
© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3