vn.py量化社区
By Traders, For Traders.

置顶主题

解密并强化日内经典策略R-Breaker

R-Breaker是一种中高频的日内交易策略,这个策略也长期被Future Truth杂志评为最赚钱的策略之一。R-Breaker策略结合了趋势和反转两种交易方式,所以交易机会相对较多,比较适合日内1分钟K线或者5分钟K线级别的数据。

 
 

R-Breaker策略逻辑

 

R-Breaker的策略逻辑由以下4部分构成:

1)计算6个目标价位

根据昨日的开高低收价位计算出今日的6个目标价位,按照价格高低依次是:

  • 突破买入价(Bbreak)
  • 观察卖出价(Ssetup)
  • 反转卖出价(Senter)
  • 反转买入价(Benter)
  • 观察买入价(Bsetup)
  • 突破卖出价(Sbreak)

 

他们的计算方法如下:(其中a、b、c、d为策略参数)

  • 观察卖出价(Ssetup)= High + a * (Close – Low)
  • 观察买入(Bsetup)= Low – a * (High – Close)
  • 反转卖出价(Senter)= b / 2 * (High + Low) – c * Low
  • 反转买入价(Benter)= b / 2 * (High + Low) – c * High
  • 突破卖出价(Sbreak)= Ssetup - d * (Ssetup – Bsetup)
  • 突破买入价(Bbreak)= Bsetup + d * (Ssetup – Bsetup)

 

description

2)设计委托逻辑

趋势策略情况:

  • 若价格>突破买入价,开仓做多;
  • 若价格<突破卖出价,开仓做空;

 

反转策略情况:

  • 若日最高价>观察卖出价,然后下跌导致价格<反转卖出价,开仓做空或者反手(先平仓再反向开仓)做空;
  • 若日最低价<观察买入价,然后上涨导致价格>反转买入价,开仓做多或者反手(先平仓再反向开仓)做多;

 

3)设定相应的止盈止损。

 

4)日内策略要求收盘前平仓。

 

上面是原版R-Breaker策略逻辑,但是使用RQData从2010年至今(即2019年10月)的1分钟沪深300股指期货主力连续合约(IF88)测试,效果并不理想。

 
 

策略逻辑优化

 

实际上R-Breaker策略可以拆分成趋势策略和反转策略。下面分别对这对2种策略逻辑进行优化:

1)趋势策略:

  • 若当前x分钟的最高价>观察卖出价,认为它具有上升趋势,在突破买入价挂上买入开仓的停止单;
  • 若当前x分钟的最低价<观察买入价,认为它具有下跌趋势,在突破卖出价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 增加过滤条件:为防止横盘行情导致不断的开平仓,日内每次开仓买入开仓(卖出开仓)委托的价位都比上一次更高(更低);
  • 收盘前,必须平调所持有的仓位。

 

2)反转策略:

  • 若当前x分钟的最高价>观察卖出价,认为它已经到了当日阻力位,可能发生行情反转,在反转卖出价挂上卖出开仓的停止单;
  • 若当前x分钟的最低价>观察买入价,认为它已经到了当日支撑位,可能发生行情反转,在反转买入价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 收盘前,必须平调所持有的仓位。

 

其代码实现逻辑如下:

self.tend_high, self.tend_low = am.donchian(self.donchian_window)

if bar.datetime.time() < self.exit_time:

    if self.pos == 0:
        self.intra_trade_low = bar.low_price
        self.intra_trade_high = bar.high_price

        # Trend Condition
        if self.tend_high > self.sell_setup:
            long_entry = max(self.buy_break, self.day_high)
            self.buy(long_entry, self.fixed_size, stop=True)

            self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)

        elif self.tend_low < self.buy_setup:
            short_entry = min(self.sell_break, self.day_low)
            self.short(short_entry, self.fixed_size, stop=True)

            self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)

    elif self.pos > 0:
        self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
        long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
        self.sell(long_stop, abs(self.pos), stop=True)

    elif self.pos < 0:
        self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
        short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
        self.cover(short_stop, abs(self.pos), stop=True)

# Close existing position
else:
    if self.pos > 0:
        self.sell(bar.close_price * 0.99, abs(self.pos))
    elif self.pos < 0:
        self.cover(bar.close_price * 1.01, abs(self.pos))

 
 

策略效果

 

同样使用10年的1分钟IF88数据进行回测。不过,在展示强化版R-Breaker策略效果前,先分别展示一下拆分后的趋势策略和反转策略。

1)趋势策略:

  • 趋势策略夏普比率1.96,日均成交2.6笔,资金曲线是整体上扬的;
  • 但是在2017~2018年的盘整阶段,具有较大并且持续时间较长的回撤;
  • 这凸显出趋势类策略自身无法规避的缺点:在趋势行情中盈利,意味着震荡行情必然亏损。

description

 

2)反转策略

  • 反转策略夏普比率0.75,日均成交0.4笔,资金曲线缓慢上扬;
  • 但是在2017~2018年的盘整阶段,资金曲线上扬最快,而且这个阶段是最平滑的;
  • 这凸显出反转类策略优点:尽管在趋势行情亏损,在震荡行情必然能盈利。

description

 

综合对比2种策略的日均成交笔数和资金曲线,我们可以知道:

  • 由于趋势策略日均交易笔数较多(2.6笔),它主要负责贡献R-Breaker策略的alpha;
  • 趋势策略的亏损也是主要导致R-Breaker策略亏损的原因,但这时候的亏损由反转策略的盈利来填补。

由于趋势策略和反转策略是互斥的,在某些方面呈现出此消彼长的特点。那么,根据投资组合理论,可以把反转策略看作是看跌期权,买入一定规模的看跌期权来对消非系统性风险,那么组合的收益会更加稳健,即夏普比率更高。

由于趋势策略和反转策略日均成交手数比是2.6:0.4,若它们都只委托1手的话,反转策略的对冲效果微乎其微。

为了方便演示,我们设置趋势策略每次交易1手;反转策略则是3手。然后合成R-Breaker策略。发现夏普比率提高到2,资金曲线整体上扬,而且没有较大且持续时间较长的回撤。

description

 
 

结论

 

R-Breaker策略成功之处在于它并不是纯粹的趋势类策略,它属于复合型策略,它的alpha由2部分构成:趋势策略alpha;反转策略alpha。

这类复合型策略可以看作是轻量级的投资组合,因为它的交易标的只有一个:沪深300股指期货的主力合约。

更复杂的话,可以交易多个标的,如在商品期货做虚拟钢厂套利(同时交易螺纹钢、铁矿石、焦炭),在IF股指期货上做日内CTA策略。考虑到市场容量不同,价差套利能分配更多的资金。这样在价差套利提供稳定收益率基础上,CTA策略能在行情好的时候贡献更多alpha(高盈亏比特征导致的)。

从上面例子可以看出,一个合理的投资组合,往往比单个策略具有更高的夏普比率。因为夏普比率=超额收益/风险。夏普比率高意味着资金曲线非常平滑;这也意味着我们可以有效控制使用杠杆的风险。

当某个投资组合策略夏普足够高,而且策略资金容量允许,交易成本能有效控制等情况下,就可以通过杠杆来提升组合收益了。例如向银行贷款或者发放债券,这时候交易团队是债务人角色,即在承担更大风险同时,追求更到收益。债权人享受利息收益(类无风险收益)。

向公众发产品是另一种增加杠杆的方式,但此时投资组合风险已经转移到了客户这方,交易团队可以享受着管理费收益(类无风险收益)。根据目标客户的不同:

  • 私募基金面向高净值客户,这些客户群体风险承受能力较高;并且私募监管比较放松,能使用的衍生品较多,有提升业绩的自由度。故私募基金除了管理费,更追求业绩提成。
  • 公募基金面向普通群众,他们风险承受能力较低;并且公募监管非常严格,投资约束非常多,提升业绩难度较大。但是公募牌照的稀缺性必然导致该行业是盈利的。如万亿级别的管理规模,其管理费的收益,也不是一般的自营交易公司或者私募基金比得上的。

 
 

附录

 

最后附上策略源代码:

from datetime import time
from vnpy.app.cta_strategy import (
    CtaTemplate,
    StopOrder,
    TickData,
    BarData,
    TradeData,
    OrderData,
    BarGenerator,
    ArrayManager
)
​
​
class RBreakStrategy(CtaTemplate):
    """"""
    author = "KeKe"
​
    setup_coef = 0.25
    break_coef = 0.2
    enter_coef_1 = 1.07
    enter_coef_2 = 0.07
    fixed_size = 1
    donchian_window = 30
​
    trailing_long = 0.4
    trailing_short = 0.4
    multiplier = 3
​
    buy_break = 0   # 突破买入价
    sell_setup = 0  # 观察卖出价
    sell_enter = 0  # 反转卖出价
    buy_enter = 0   # 反转买入价
    buy_setup = 0   # 观察买入价
    sell_break = 0  # 突破卖出价
​
    intra_trade_high = 0
    intra_trade_low = 0
​
    day_high = 0
    day_open = 0
    day_close = 0
    day_low = 0
    tend_high = 0
    tend_low = 0
​
    exit_time = time(hour=14, minute=55)
​
    parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
    variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]
​
    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(RBreakStrategy, self).__init__(
            cta_engine, strategy_name, vt_symbol, setting
        )
​
        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()
        self.bars = []
​
    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.load_bar(10)
​
    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")
​
    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")
​
    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)
​
    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.cancel_all()
​
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return
​
        self.bars.append(bar)
        if len(self.bars) <= 2:
            return
        else:
            self.bars.pop(0)
        last_bar = self.bars[-2]
​
        # New Day
        if last_bar.datetime.date() != bar.datetime.date():
            if self.day_open:
​
                self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close)  # 观察买入价
                self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low)  # 观察卖出价
​
                self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high  # 反转买入价
                self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low  # 反转卖出价
​
                self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup)  # 突破买入价
                self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup)  # 突破卖出价
​
            self.day_open = bar.open_price
            self.day_high = bar.high_price
            self.day_close = bar.close_price
            self.day_low = bar.low_price
​
        # Today
        else:
            self.day_high = max(self.day_high, bar.high_price)
            self.day_low = min(self.day_low, bar.low_price)
            self.day_close = bar.close_price
​
        if not self.sell_setup:
            return
​
        self.tend_high, self.tend_low = am.donchian(self.donchian_window)
​
        if bar.datetime.time() < self.exit_time:
​
            if self.pos == 0:
                self.intra_trade_low = bar.low_price
                self.intra_trade_high = bar.high_price
​
                if self.tend_high > self.sell_setup:
                    long_entry = max(self.buy_break, self.day_high)
                    self.buy(long_entry, self.fixed_size, stop=True)
​
                    self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
​
                elif self.tend_low < self.buy_setup:
                    short_entry = min(self.sell_break, self.day_low)
                    self.short(short_entry, self.fixed_size, stop=True)
​
                    self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
​
            elif self.pos > 0:
                self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
                long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
                self.sell(long_stop, abs(self.pos), stop=True)
​
            elif self.pos < 0:
                self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
                short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
                self.cover(short_stop, abs(self.pos), stop=True)
​
        # Close existing position
        else:
            if self.pos > 0:
                self.sell(bar.close_price * 0.99, abs(self.pos))
            elif self.pos < 0:
                self.cover(bar.close_price * 1.01, abs(self.pos))
​
       self.put_event()
​
    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        pass
​
    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.put_event()
​
    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        pass


Mac下vnpy安装运行交易的图文教程

尽管使用vn.py的用户使用windows或者linux服务器居多,但是社区内也有不少用户希望能在mac下运行vn.py进行数字货币的交易,本篇教程就针对如何在Mac系列的机器上成功运行vn.py连接coinbase数字货币交易所进行讲解。本文主要分以下几个部分:

  • python环境的安装
  • vn.py及依赖项的安装
  • 运行vn.py
  • 使用vn.py

 
python环境的安装
在mac下使用miniconda可以省去很多不必要的麻烦。
打开Minconda官网:https://docs.conda.io/en/latest/miniconda.html, 选择MacOSX Installer下的Python3.7,选择64位的pkg安装包进行下载并解压:
description
安装完毕后,在terminal输入python,会发现terminal自动将miniconda附带的python作为系统默认python了。
description
 
安装vn.py
因为mac自带git神器,因此直接在terminal里下载vnpy即可:打开terminal,使用cd切换到想要下载的路径,然后输入以下命令:
description
上图我将vnpy整个文件夹clone至Desktop。当clone完毕后,在cd进入vnpy文件夹,输入如下命令:
description
(注,cd的是一个命令,例如我要切换到/Users/limingfang/Desktop/,则只需要在terminal输入

cd /Users/limingfang/Desktop

安装的时间比较久,需要耐心等待。
在安装完成后,vn.py以及依赖项就被安装至Miniconda的python库中了。
注意:如果启动VN Trader时报错说缺少了pyqtgraph和zmq等库,即出现如图一的报错,直接用pip工具安装即可,在terminal中运行图二中的命令即可(笔者在解压完vnpy包后,发现还需要安装pyqtgraph和zmq)。
description
description

 

启动vn.py
在上述安装全部完成后,即可开始运行vn.py。
在terminal中将路径切换至clone的vnpy路径,比如vn.py被我cloned在/Users/limingfang/Documents/Github/cloned,那么输入以下命令即可切换到vn.py中的vn_trader目录并启动vn.py图形化界面:
description
其中红框内的路径每个用户都不一样。紧接着会跳出一个比较精致的VN Trader图形化界面。
description
 

使用vn.py
该部分主要说两点:简要说明图形化界面的使用,vn.py在mac上可使用的接口。
在图形化界面上方,“系统”栏提供了连接接口的选项,“功能”栏提供了vn.py开放给用户的一些App例如CTA回测等,“帮助”栏提供了查询合约的功能,用于提供交易所合约代码与vn.py内部合约代码映射的信息。注意,在mac下没有“配置”栏,如果想要修改全局配置,可以直接修改源文件。VN Trader界面启动后默认会在当前用户主目录下创建一些隐藏文件夹以存放一些配置信息,在mac下用户目录是在 “/Users/limingfang/”(limingfang是我的mac用户名)。
description
上图是最初.vntrader文件夹内的内容,其中connect_bitmex.json等connect_xxx.json类文件存放接口配置信息,database是默认的sqlite数据库的文件,vt_setting.json用于配置图形化界面字体,rqdata,数据库等,rpc_service_setting.json一般不需要修改。
如果需要修改某个文件,按下图操作即可打开相应文件。
description
 
目前2.x版本的vn.py,在windows下所有交易接口都可以使用,而在OSX系统下只能使用一部分:
纯Python类接口:IB、TIGER、FUTU
IB(盈透证券)、TIGER(老虎证券)、FUTU(证券)这三个接口,使用的是其官方提供的纯Python SDK,直接进行接口函数的对接开发。得益于Python本身的跨平台解析性语言特点,这类接口在OSX系统下也能直接使用。
REST/WebSocket协议类接口:所有数字货币、Alpaca
当今几乎所有的数字货币交易所,都提供了基于REST协议(下单、查询)和WebSocket协议(行情、推送)的API,部分外盘的股票期货经纪商也开始提供这块的支持(如美股0佣金券商Alpaca)。

此外,对于在mac电脑上使用vn.py的用户,交易一些数字货币的时候,可能会碰到需要交易某些标的(例如某些数字货币合约)而网络不通的问题,因此接下来介绍如何利用shadowsocks在mac上进行翻墙从而交易国外的一些合约。
最近比较敏感,本来笔者有详细的介绍如何安装及使用,现在只简单说说如何在vnpy连接接口的时候使用shadowsocks。

使用Shadowsocks
下载配置Shadowsocks后,在vn.py中连接一些大陆外的接口(例如Coinbase)时,需要提供:
- proxy_host
- proxy_port  
一般如下图所示进行配置即可:
description



关于rest_client和ws_client连接中断的处理

像是rest_client的代理连接中断还有ws_client的一些连接中断问题都不是gateway里面重连行情能解决的,必须重启CLI子进程才能解决,下面是我在处理这些连接问题的处理方法。



Database源码阅读笔记+配置教程

最近在看文档的数据库部分,碰见了很多的数据库类型,干脆把各个数据库的配置记录一下,了解一下vnpy的数据库部分结构。
本文涉及的数据库包括MySQL,SQLite,MongoDB,(PostgreSQL就不写了,和MySQL差不多)。
第一次发长文,排版不好或是认知有偏差敬请指出。本人也算小白,仅当记录自己的心得了。

Vnpy的database

 
database模块一般用于CsvLoader模块中的数据持久化(导入)和CtaBacktester模块中的数据导出,在这两个模块中,database这个module是以BaseDatabaseManager类对象出现的,并且会直接调用这个类对象的导入导出方法。下面概括下这个类对象是如何生成的。
 
description
上图是database简略框架图。
 
在导入database这个module的时候,可以分成三层结构

  • 自动运行init.py文件,读取.vntrader/vt_setting.json中的数据库配置信息(也就是GUI界面点击菜单栏中”配置“出现的那个),然后调用.initialize中的init方法。
  • 在settings中保存着数据库driver类型,根据不同的类型又调用同一文件中的init_sql或者init_nosql。以init_sql为例,函数内部从database_sql.py中调用真正的init函数。目前为止并没有实质的内容,只是不断的在分层选择调用不同的函数。
  • 接下来以sqlite为例,并且会涉及peewee包。
     
    peewee包的作用我理解是,用统一的形式对sql数据库进行管理,开放上层接口给用户使用。(有误请指出,谢谢)
    在peewee中,一个model类对象视为一张表,相应类实例可以视为一条记录(row),一个field实例可以视为一列。先使用peewee提供的数据库引擎类实例化,建立与数据文件的连接,然后定义一个model类用于表示表,最后将model类添加到数据库引擎类(即生成数据表)。此时,db即可表示一个与数据文件连接着的数据引擎实例,上面添加到db的model类即可以表示db中的一张张表,可以取出来继续单独使用(原理是在定义model类时,使用了class meta吧我猜)
     
    那么在database_sql.py中,整个逻辑过程就是:在init中调用peewee的Database Engine(SQLiteDatabase)生成实例表示与db文件建立连接,将该实例对象称为db。调用init_models函数生成model类同时将model类添加到db中,然后将两张表返回(DbTickData和DBBarData)。最后,将这两张表(类)添加到SqlManager中,生成统一的BaseDatabaseManager,并提供给外界调用。
    大概逻辑图长这样:
    description
     
     

数据库配置

上面大致介绍了BaseDataManager的生成过程,现在介绍不同数据库具体使用时候setting如何填写。

  • #### Sqlite
    Sqlite是vnpy默认的数据库,配置不用特意去改,同时,Sqlite的数据库文件在.vntrader\\vt_setting.json中(以"database."为前缀的那几个)。
    
    额外注:在内部运行过程中,其实只用到了database.database这条配置,它的值也就是数据库文件的名字。其余都没有用到(不需要)
  • #### MySQL
    MySQL使用到的配置信息包括:database,user,password,host,port。
    
    peewee在连接本地mysql数据库时,实际调用的是MySQLdb或者是pymysql包。实际必须的只有数据库名称,用户和密码,后面两个都是默认有的(pymysql)
    使用MySQL除此之外还需特别注意,peewee不能为本地创建不存在的database,这意味着需要使用vnpy+MySQL的用户需要自己额外创建一个database,假如是”vnpyDb”,那么在配置栏就写vnpyDb"。
  • #### MongoDB
     后续补上,写太长了


另一个视角,使用对数化数据,计算非价位指标

之前在做时序数据整理时候学习时候,发现很多代码都行情数据做了对数化处理。学习了下,发现是另一个视角。

知乎查了,这个答案比较全。

在很多计算中(例如做极大似然的时候),取对数可以将本来需要做的乘法变成加法;
取对数可以避免数值巨大,计算机难于处理的困难;
与对数有关的数据可以反映出物理量尺度的变化,例如物理中,对于各个大小不同的体系,我们仍然会希望仍然可以比较这两个体系,这时候就需要做一些尺度变换,利用到 齐次函数 的有关性质,这种时候用对数来处理是很方便的,因此在处理像重整化和有限尺度标度的时候会需要用到对数曲线,只有这样,一个 2020 的格子才能跟 200200 的格子来进行比较。又例如金融数据中股票价格的涨落是与股票现在的价格有关的,高股价的股票涨跌 1 元跟低股价的股票涨跌 1 元起效果完全不同,这种时候大家考虑的是对数正态分布;
对数处理之后还可以使诸多 Power-Law 的效果凸显出来,这时候的拟合也就变成了一个线性拟合的问题。

作者:傅渥成
链接:https://www.zhihu.com/question/20831196/answer/16324269
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

上面答案比较理论,下面这个更好理解。

我们平时看到的K线图几乎都是采用 普通坐标 ,而有一种叫作 对数坐标 的K线图大部分人可能没了解过。在介绍对数坐标下的K线图的前,我们先思考一个问题:从100点涨到1300点的涨幅大还是从1000点涨到6000点的涨幅大?

普通坐标下的上证月K线图

enter image description here
对数坐标下的月K线图

看了上证的两幅图是不是有点惊讶,涨的最厉害的不是2005-2007,而是股市刚建立初那两三年。看惯了普通坐标k线图的股民们,眼里只有2005-2007那一拨牛市,2005年之前的股市看起来好像平淡无奇。普通坐标下K线图100点涨到1100点k线的长度和1000点涨到2000点k线图的长度是一样的,而对数坐标下是按照上涨/下跌的幅度计算k线图的长度的,对数坐标下,100点涨到110点和1000点涨到1100点K线图的长度是一样的。

做了一些策略实践,发现对于均线,布林线,唐安期通道这样直接基于价位求出指标,使用对数化数据计算指标相关并不好。而像MACD,KDJ,CCI这样非价位指标,使用对数化指标后,分析起来更为合理。

这里就简单说说VNPY中MACD指标更新支持对数化。其实很简单,直接调用np.log()方法,然后增加islog参数。如果True时候,就把传入指标对数化再计算。

# ----------------------------------------------------------------------
def macd(self, fastPeriod, slowPeriod, signalPeriod, islog = False, array=False):
    """MACD指标"""
    if islog:
        macd, signal, hist = talib.MACD(np.log(self.close), fastPeriod,
                                    slowPeriod, signalPeriod)
    else:
        macd, signal, hist = talib.MACD(self.close, fastPeriod,
                                        slowPeriod, signalPeriod)
    if array:
        return macd, signal, hist
    return macd[-1], signal[-1], hist[-1]


挖个小坑,利用Scikit-learn机器学习库的特征分类进行VnTrade期货量化交易

这个算是一个小坑,因为我也还在学习过程中,代码慢慢完善。一开始在python 2.7,vnpy 1.9.2环境中实现。后面在python 3.7也基本实现,增加支持了xgboost。代码写的繁杂,多多抱歉。

首先说说Scikit-learn是Python语言中专门针对机器学习应用而发展起来的一款开源框架,相对于现在深度学习库tensorflow,由于Scikit-learn本身不支持深度学习,也不支持GPU加速,但是相对于tensorflow那种近乎黑箱的多层神经网络,还是比较好从数学来解释分析。

分类是指识别给定对象的所属类别,属于监督学习的范畴,最常见的应用场景包括垃圾邮件检测和图像识别等。目前Scikit-learn已经实现的算法包括:支持向量机(SVM),最近邻,逻辑回归,随机森林,决策树以及多层感知器(MLP)神经网络等等。这里会使用逻辑回归,决策数, MLP神经网络和SVM向量机,其实都是两句代码事情。

不考虑内部的复杂数学逻辑,这里功能性的使用Scikit-learn特征分类的功能。
1) 选取的特征值,为了避免数据值非逻辑化,这里不直接使用点位最高点这些,而使用那些和具体点位无关的指标,比如atr,cci,rsi,std和bar的涨跌百分比。
2) 利用这些特征值进行分类,这里对于期货走势就是三类,1 是之后上涨,0是之后无规律,-1是之后下跌,这里使用线性回归分析当前时点之后6根K线的走势,如果斜度下,那么当前时点归为-1下跌类,斜度上,为1上涨,如果取信p值不够,或者上下斜率不大,为0。
那么特征值就是atr,cci,rsi,std和bar的涨跌百分比,当然你可以加入KDJ,MACD更多;类别就是三类1,0,-1,利用机器学习,找出特征值和类别的隐含逻辑,进而指导交易,非常粗糙。

整体代码逻辑如下,这里大量借鉴这个 https://mp.weixin.qq.com/s?__biz=MjM5MDEzNDAyNQ==&mid=2650314212&idx=1&sn=0f04627d34f4305e0386fc7562563bff&chksm=be454f828932c694f8ce107249457e0ffba705e6e9531807e86a1f468a3a549001c00bb5389e&scene=21

一,期货K线数据, 导入k线数据,并整理加入特性和类属性
• 利用之前做的DataAnalyzer,读取Mongodb或者csv的1分钟k数据, 放入Dataframe来处理,按照定义合并出n分钟k线,这里n为10
• 还是利用DataAnalyzer,利用ta-lib方法。给Dataframe加入atr,cci,rsi,std,macd和涨跌百分比。
• 使用新方法addTrend,利用scipy.stats的线性回归求出当前时点之后的斜率,给与分类值-1,0,1.

二,数据处理,为了后面机器学习,把特征数组和类数组划分出来,并且划分训练机和测试集。
(1)划分出特征数组 X,和类别数组y
(2)划分训练集和测试集
• model_selection.train_test_split()

三,特征工程,之前需求很多特征,其实有些并没有体现规律,或者完全随机,那么没有意义,可以删除
• 根据P值选:feature_selection.SelectFpr()
• 按照百分比选出最高分特征:feature_selection.SelectPercentile()
这里使用SelectPercentile,

四,模型定义调参/选择
这里使用下面模式进行分析,然后利用网格调参
1)LogisticRegression 逻辑回归
2)DecisionTreeClassifier 决策树
3)SVC 支持向量分类
4)MLP 神经网络
• 交叉验证+网格搜索:model_selection.GridSearchCV()

五,模型测试和评价,使用选取最好的模型,进行测试看看拼接
• 模型预测:model.predict()
• Accuracy:metrics.accuracy_score()
• Presicion:metrics.precision_score()
• Recall:metrics.recall_score()

六,模型保存和调用
• 模型保存:joblib.dump()
• 模型调用:joblib.load(),这里就可以在vnpy中使用了。

后面代码整理放在后面,最后,只做参考,我发现虽然准确率有差不多70%,但是都是要你空仓,大智慧。。。



python多进程简介,和VNPY多进程参数优化代码分析

之前为了实现利用遗传算法,进行多进程策略的优化,学习研究了python的多进程库Multiprocessing。以前感觉真是黑科技,学习后发现,还是python优点,简单好用,对于一般应用还是很好理解。

首先,由于GIL(全局解释锁)的问题,全局对象只能一个进程调用,python多线程并不能充分利用多核处理器,比如有时候用pandas跑大型数据分析,发现只有一核在累死累活。如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。multiprocessing可以给每个进程赋予单独的Python解释器,这样就规避了全局解释锁所带来的问题。可以理解为多核CPU分配好一个工作任务,这个工作任务包括工作方法和工作内容。

其实python多线程很简单,相对于其他语言来说。其实简单就是针对需要多线程的方法func(a),a是参数。相当于工作内容;使用 Multiprocessing. Process(target = func, args =(a,)),创建一个Prcoess对象,也就是工作任务,再启动这个对象,这样一个多进程任务就完成了。等CPU分配一个独立核去干活,func(a)就开动了。这里唯一要注意args是默认输入元祖参数。

P = Multiprocessing.Process(target = func, args =(a,))
P.start()

Multiprocessing提供了更简洁的pool做为进程池,其实叫任务池更为恰当。把需要干的工作任务打包好,放在这个池子里面,这样空闲下来的核心就捡pool的任务干活。

常见的pool的使用如下,其中prcesses = 4 是定义任务池大小,不一定要小于或者等于cpu核心数量,可以大于cpu核心数量,不过这样就有几个任务空挂着还占用内存。

然后使用pool方法apply_async(task, args=(x,)),把打包好的任务插入池中。 apply_asyncs是异步的带返回值。如果用apply也可以正常,但是会没有返回值,此处不仔细研究了。

之后close()是把这个任务池关闭,不再接受新的任务;但是还有一些已有任务在跑,所以用pool.join(),吊着主程序,直到所有任务完成才进入下一步。

if __name__ == '__main__':
    Multiprocessing.pool = Pool(processes=4)
    for x in range(10):
        pool.apply_async(task, args=(x,))
    pool.close()
    pool.join()

下面看看VNPY多进程优化方法。其实很好理解了,runParallelOptimization是类BacktestingEngine的一个方法。

传入参数strategyClass就是这个策略类,setting是要优化参数范围,后面通过optimizationSetting.generateSetting()生成策略参数队列,做为任务内容;optimizationSetting.optimizeTarget是后面返回值。至于回测品种,回测时间段,交易费用什么,在 BacktestingEngine创建时候维护了。

然后创建任务池pool,大小刚好是cpu核数,这个也是比较稳妥设置。

之后做一个l队列来放返回值。

然后打包策略类,回测参数,策略参数做为任务内容,和任务方法optimize一起组合为一个工作任务。然后插入任务池给cpu核心去跑。这个时候在系统监视器可以看到于核心数相同的python虚拟环境运作。

然后就是对返回值排序。后面详细说说。

df =  engine.runParallelOptimization(AtrRsiStrategy, setting)
def runParallelOptimization(self, strategyClass, optimizationSetting):
    """并行优化参数"""
    # 获取优化设置        
    settingList = optimizationSetting.generateSetting()
    targetName = optimizationSetting.optimizeTarget

    # 检查参数设置问题
    if not settingList or not targetName:
        self.output(u'优化设置有问题,请检查')

    # 多进程优化,启动一个对应CPU核心数量的进程池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    l = []
    for setting in settingList:
        l.append(pool.apply_async(optimize, (strategyClass, setting,
                                             targetName, self.mode, 
                                             self.startDate, self.initDays, self.endDate,
                                             self.slippage, self.rate, self.size, self.priceTick,
                                             self.dbName, self.symbol)))
    pool.close()
    pool.join()

    # 显示结果
    resultList = [res.get() for res in l]
    resultList.sort(reverse=True, key=lambda result:result[1])
    return resultList

像现在双核四线程就有四个python环境在跑任务。

enter image description here
这里会发现是用静态方法optimize,如果直接调用 BacktestingEngine的回测方法更简洁,为什么没有呢,这个是python2.7的Multiprocessing的一个局限,只能打包静态方法做为工作方法,如果打包类中的方法,会提示错误。

cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup builtin .instanceme

如果VNPY2.0基于python3.6版本,应该就会更简化一些。

下面看看 静态方法 optimize,其实没什么好说,就是新建一个回测引擎BacktestingEngine对象,按照参数跑一遍回测,返回一个元祖,包含了这次回测的参数,针对回测目标的值,和一个包含回测结果的字典,这个字典包括什么年化收入,sharpe等一堆回测结果。

然后所有的回测结果元祖组成一个回测结果队列,这个结果队列按照targetValue反向排序,最大放在第一位。

因为太多了,一般我都是输出到excel里面,之前说过怎么实现。

#----------------------------------------------------------------------
def optimize(strategyClass, setting, targetName,
             mode, startDate, initDays, endDate,
             slippage, rate, size, priceTick,
             dbName, symbol):
    """多进程优化时跑在每个进程中运行的函数"""
    engine = BacktestingEngine()
    engine.setBacktestingMode(mode)
    engine.setStartDate(startDate, initDays)
    engine.setEndDate(endDate)
    engine.setSlippage(slippage)
    engine.setRate(rate)
    engine.setSize(size)
    engine.setPriceTick(priceTick)
    engine.setDatabase(dbName, symbol)
    engine.initStrategy(strategyClass, setting)
    engine.runBacktesting()
    engine.calculateDailyResult()
    d, result = engine.calculateDailyStatistics()
    try:
        targetValue = result[targetName]
    except KeyError:
        targetValue = 0
    return (str(setting), targetValue, result)

其实python的多进程库Multiprocessing不算复杂,但是用在回测上效果很好;现在有了遗传算法,进行策略优化更加方便了。



centos7.6 搭建vnpy量化交易环境

一直用的是centos的服务器,折腾了几天,终于在上面把vnpy跑起来了,没有记录折腾的细节,只是记录了下正常的操作步骤。欢迎大家一起交流。

centos7.6 安装python环境

# 在anaconda.com官网下载安装包,这里需要下载linux版本的x86_64位的包
bash Anaconda3-2019.07-Linux-x86_64.sh
# 如果不修改安装路径的话,按照默认设置即可。
# 安装过anaconda后,需要将/root/.bashrc中的关于anaconda的部分屏蔽掉,不然后面vncserver无法正常启动。

centos7.6 安装桌面

yum groups install "X Window System" -y
yum install epel-release -y
yum groups install "MATE Desktop" -y
systemctl set-default graphical.target

centos7.6 安装vncserver

yum install tigervnc-server -y

# 替换User为root,增加显示分辨率参数设置
sed -r -i "s/^(ExecStart.*)<USER>(.*%i)/\1root\2 -geometry 1920x1200 -depth 16/" /lib/systemd/system/vncserver@.service
sed -r -i "s/^(PIDFile.*)home\/<USER>(.*pid)/\1root\2/" /lib/systemd/system/vncserver@.service

mv /lib/systemd/system/vncserver@.service /lib/systemd/system/vncserver@:1.service
systemctl daemon-reload
vncpasswd
systemctl start vncserver@:1.service
systemctl enable vncserver@:1.service

# 屏蔽默认桌面,启动mate桌面
sed -r -i "s@^/etc/X11/xinit/xinitrc$@# &@" /root/.vnc/xstartup
echo "/usr/bin/mate-session &" >> /root/.vnc/xstartup

# 其它操作
# 禁用selinux
sed -r -i "s/^(SELINUX=).*/\1disabled/" /etc/selinux/config
# 关闭防火墙
systemctl stop firewalld.service
systemctl disable firewalld.service

reboot

# 如果是云服务器,需要确保开放了TCP 5901端口

centos7.6 安装vscode

rpm --import https://packages.microsoft.com/keys/microsoft.asc
sh -c 'echo -e "[code]\nname=Visual Studio Code\nbaseurl=https://packages.microsoft.com/yumrepos/vscode\nenabled=1\ngpgcheck=1\ngpgkey=https://packages.microsoft.com/keys/microsoft.asc" > /etc/yum.repos.d/vscode.repo'
yum check-update
yum install code -y

centos7.6 升级gcc9.1.0版本

yum install gcc gcc-c++ bzip2 m4 -y

tar -jxvf gcc-9.1.0.tar.bz2
./contrib/download_prerequisites

cd gmp;mkdir temp;cd temp
../configure --prefix=/usr/local/gmp-6.1.0
make
make install
cd ../..

cd mpfr;mkdir temp;cd temp
../configure --prefix=/usr/local/mpfr-3.1.4 --with-gmp=/usr/local/gmp-6.1.0
make
make install
cd ../..

cd mpc;mkdir temp;cd temp
../configure --prefix=/usr/local/mpc-1.0.3 --with-gmp=/usr/local/gmp-6.1.0 --with-mpfr=/usr/local/mpfr-3.1.4
make
make install
cd ../..

vim /etc/profile
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/mpc-1.0.3/lib:/usr/local/gmp-6.1.0/lib:/usr/local/mpfr-3.1.4/lib

mkdir temp;cd temp
../configure --disable-multilib --enable-languages=c,c++ --with-gmp=/usr/local/gmp-6.1.0 --with-mpfr=/usr/local/mpfr-3.1.4 --witpc=/usr/local/mpc-1.0.3
make
make install

gcc需要9.1.0以上的版本,否则在编译vnpy的时候,会报-std=c++17相关的错误。
gcc编译时间很长,估计得几个小时。

centos7.6 编译vnpy

# 切换到python环境
. anaconda3/bin/activate
yum install postgresql-devel* libxkbcommon-x11 -y
cd vnpy
bash install.sh


从数据库读取数据并且画图

策略开发的第一步,永远是对行情数据进行画图。

from datetime import datetime
from vnpy.trader.constant import Exchange,Interval
from vnpy.trader.database import database_manager
import matplotlib.pyplot as plt

# Load history data
bars =database_manager.load_bar_data(    
    symbol="XBTUSD", 
    exchange=Exchange.BITMEX, 
    interval=Interval.MINUTE, 
    start=datetime(2017, 4, 1), 
    end=datetime(2019, 10, 30)
    )

# Generate x, y
y = []
for bar in bars:
    close_price = bar.close_price
    y.append(close_price)  
x = list(range(1,len(y)+1))

# Show foto
plt.figure(figsize=(40, 20))
plt.plot(x, y)  

plt.show()

description

 

附:若是24小时交易,需要查看是否有缺失数据,x轴和y轴数据可以改成下面这样

# generate x, y
x=[]
y = []
for bar in bars:
    time = bar.datetime
    close_price = bar.close_price

    x.append(time)
    y.append(close_price)


分享一个螺纹钢30分钟均线策略

作者:爱茶语 ;来源:维恩的派论坛

 

  • 原文测试时间区间是20120111--20171117,样本内夏普比率达1.35。
  • 今进行样本外测试,时间区间20130111--20190102,夏普比率为0.78。

结果显示尽管参数不多,但是模型还是过拟合了。但是在策略内实现Tick数据聚合成X分钟K线还是值得学习一下
 

回测设置

# 设置回测使用的数据
engine.setBacktestingMode(engine.BAR_MODE)    # 设置引擎的回测模式为K线
engine.setDatabase(MINUTE_DB_NAME, 'RB99')  # RQDATA一分钟期货指数数据
engine.setStartDate('20130101')               # 设置回测用的数据起始日期

# 配置回测引擎参数
engine.setSlippage(1)      # 设置滑点为1跳
engine.setRate(1/1000)    # 设置手续费
engine.setSize(10)         # 设置合约大小 
engine.setPriceTick(1)     # 设置最小价格变动   
engine.setCapital(200000)   # 设置回测本金

 
 

回测效果

 
enter image description here

 
 
策略代码如下
 

# encoding: UTF-8


import talib
import numpy as np

from vnpy.trader.vtObject import VtBarData
from vnpy.trader.vtConstant import EMPTY_INT,EMPTY_STRING
from vnpy.trader.app.ctaStrategy.ctaTemplate import CtaTemplate

########################################################################
class RBMAStrategy(CtaTemplate):
    """结合MA的一个30分钟线交易策略"""
    className = 'RBMAStrategy'
    author = 'xldistance'

    #策略参数

    initDays = 33    # 初始化数据所用的天数默认35
    open_pos = 10   #每次交易的手数
    OCM = 30      #操作分钟周期(1,60)默认30
    # 策略变量
    bar = None                  # K线对象
    barMinute = EMPTY_STRING    # K线当前的分钟
    minutebar = None        # minuteK线对象
    ma_windows1 = 20    #默认20
    ma_windows2 = 200    #默认200
    # 参数列表,保存了参数的名称
    paramList = ['name',
                 'className',
                 'author',
                 'vtSymbol',
                 'open_pos']

    # 变量列表,保存了变量的名称
    varList = ['inited',
               'trading',
               'pos',
               'OCM',
               'ma20_value',
               'ma200_value']
    #----------------------------------------------------------------------
    def __init__(self, ctaEngine, setting):
        """Constructor"""
        super(RBMAStrategy, self).__init__(ctaEngine, setting)
        """
        如果是多合约实例的话,变量需要放在__init__里面
        """
        #self.orderList = []
        self.barList = []
        self.bufferSize = 201                    # 需要缓存的数据的大小
        self.bufferCount = 0                     # 目前已经缓存了的数据的计数
        self.highArray = np.zeros(self.bufferSize)    # K线最高价的数组
        self.lowArray = np.zeros(self.bufferSize)     # K线最低价的数组
        self.closeArray = np.zeros(self.bufferSize)   # K线收盘价的数组
        self.openArray = np.zeros(self.bufferSize)   # K线开盘价的数组
        self.LongEnterable = False
        self.ShortEnterable = False
        self.ma20_value = 0
        self.ma200_value = 0
    def onInit(self):
        self.writeCtaLog('%s策略初始化' %self.name)

        # 载入历史数据,并采用回放计算的方式初始化策略数值
        initData = self.loadBar(self.initDays)
        for bar in initData:
            self.onBar(bar)

        self.putEvent()

    #----------------------------------------------------------------------
    def onStart(self):
        """启动策略(必须由用户继承实现)"""
        self.writeCtaLog('%s策略启动' %self.name)
        self.putEvent()

    #----------------------------------------------------------------------
    def onStop(self):
        """停止策略(必须由用户继承实现)"""
        self.writeCtaLog('%s策略停止' %self.name)
        self.putEvent()

    #----------------------------------------------------------------------
    def onTick(self, tick):
        """收到行情TICK推送(必须由用户继承实现)"""


        tickMinute = tick.datetime.minute
        if tickMinute != self.barMinute:
            if self.bar:
                self.onBar(self.bar)

            bar = VtBarData()
            bar.vtSymbol = tick.vtSymbol
            bar.symbol = tick.symbol
            bar.exchange = tick.exchange

            bar.open = tick.lastPrice
            bar.high = tick.lastPrice
            bar.low = tick.lastPrice
            bar.close = tick.lastPrice

            bar.date = tick.date
            bar.time = tick.time
            bar.datetime = tick.datetime    # K线的时间设为第一个Tick的时间

            self.bar = bar                  # 这种写法为了减少一层访问,加快速度
            self.barMinute = tickMinute     # 更新当前的分钟
        else:                               # 否则继续累加新的K线
            bar = self.bar                  # 写法同样为了加快速度

            bar.high = max(bar.high, tick.lastPrice)
            bar.low = min(bar.low, tick.lastPrice)
            bar.close = tick.lastPrice

   #----------------------------------------------------------------------
    def onBar(self, bar):
        """收到Bar推送(必须由用户继承实现)"""

        if self.LongEnterable:
            if self.pos == 0:# and bar.close > self.dayOpen
                self.buy(bar.close,self.open_pos,True)
            elif self.pos < 0 :
                self.cover(bar.close,abs(self.pos),True)
        if self.ShortEnterable:
            if self.pos ==0:#and bar.close < self.dayOpen
                self.short(bar.close,self.open_pos,True)
            elif self.pos > 0:
                self.sell(bar.close,abs(self.pos),True)

        if bar.datetime.minute  % self.OCM == 0:
            # 如果已经有聚合minuteK线
            if self.minutebar:
                # 将最新分钟的数据更新到目前minute线中
                minutebar = self.minutebar
                minutebar.high = max(minutebar.high, bar.high)
                minutebar.low = min(minutebar.low, bar.low)
                minutebar.close = bar.close

                # 推送minute线数据
                self.onminutebar(minutebar)

                # 清空minute线数据缓存
                self.minutebar = None
        else:
            # 如果没有缓存则新建
            if not self.minutebar:
                minutebar = VtBarData()

                minutebar.vtSymbol = bar.vtSymbol
                minutebar.symbol = bar.symbol
                minutebar.exchange = bar.exchange

                minutebar.open = bar.open
                minutebar.high = bar.high
                minutebar.low = bar.low
                minutebar.close = bar.close

                minutebar.date = bar.date
                minutebar.time = bar.time
                minutebar.datetime = bar.datetime

                self.minutebar = minutebar
            else:
                minutebar = self.minutebar
                minutebar.high = max(minutebar.high, bar.high)
                minutebar.low = min(minutebar.low, bar.low)
                minutebar.close = bar.close
        # 发出状态更新事件
        self.putEvent()

    #----------------------------------------------------------------------
    def onminutebar(self,bar):
        """收到Bar推送(必须由用户继承实现)"""
        # 撤销之前发出的尚未成交的委托(包括限价单和停止单)
        #for orderID in self.orderList:
            #self.cancelOrder(orderID)
        #self.orderList = []

        # 保存K线数据
        self.closeArray[0:self.bufferSize-1] = self.closeArray[1:self.bufferSize]
        self.highArray[0:self.bufferSize-1] = self.highArray[1:self.bufferSize]
        self.lowArray[0:self.bufferSize-1] = self.lowArray[1:self.bufferSize]
        self.openArray[0:self.bufferSize-1] = self.openArray[1:self.bufferSize]
        self.closeArray[-1] = bar.close
        self.highArray[-1] = bar.high
        self.lowArray[-1] = bar.low
        self.openArray[-1] = bar.open
        self.bufferCount += 1
        if self.bufferCount < self.bufferSize:
            return
        # 计算指标数值
        ma_20 = talib.EMA(self.closeArray,timeperiod = self.ma_windows1)
        ma_200 = talib.EMA(self.closeArray,timeperiod = self.ma_windows2)
        self.ma20_value = ma_20[-1]
        self.ma200_value = ma_200[-1]
        self.LongEnterable = ma_20[-1] > ma_200[-1] and ma_20[-2] < ma_200[-2]
        self.ShortEnterable = ma_20[-1] < ma_200[-1] and ma_20[-2] > ma_200[-2]

        # 发出状态更新事件
        self.putEvent()

    #----------------------------------------------------------------------
    def onOrder(self, order):
        """收到委托变化推送(必须由用户继承实现)"""
        pass

    #----------------------------------------------------------------------
    def onTrade(self, trade):
        # 发出状态更新事件
        self.putEvent()
    def onStopOrder(self, so):
        """停止单推送"""
        pass

统计

主题
1947
帖子
7091
已注册用户
7763
最新用户
在线用户
159
在线来宾用户
291
© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3