vn.py量化社区
By Traders, For Traders.

置顶主题

使用cProfile针对回测进行性能分析,和结合说下提速思路

昨天再研究lru_cache缓存读取回测优化时候,使用了性能分析工具cProfile,发现还是非常强力的分析工具,这里做个简单介绍。并结合说下一些策略提速思路。
关于cProfile使用这个文章写的很详细,https://www.cnblogs.com/btchenguang/archive/2012/02/03/2337112.html 。 这里我简单接受下。

安装

不用安装,python一般自带都有的,

使用

使用方法有几个,我是直接输出。就是把回测代码放在一个方法里面,比如runBackTesting()里面。然后再main方法按照下面代码跑,这里是按照累计时间排序的。提示,最好注释掉Matplot图像输出,因为交互的时间也是统计的。

if __name__ == '__main__':
    cProfile.run("runBackTesting()", sort="cumulative")

性能分析结果

如下图所示
description

  • 第一行 是总共调用function次数,和总运行时间次数
  • 下面列的说明
    • ncalls:表示函数调用的次数;
    • tottime:表示指定函数的总的运行时间,除掉函数中调用子函数的运行时间;
    • percall:(第一个percall)等于 tottime/ncalls;
    • cumtime:表示该函数及其所有子函数的调用运行的时间,即函数开始调用到返回的时间;
    • percall:(第二个percall)即函数运行一次的平均时间,等于 cumtime/ncalls;
    • filename:lineno(function):每个函数调用的具体信息;前面是文件名,第几行,后面是方法名,有些方法比如max,min这些就没有文件名了。

一些分析

一般关注两个ncalls,和cumtime。这里结合说下一些提速思路

  • 通常ncalls如果只有一两次,但是cumtime很长;这些通常是硬盘IO读取一类,只能尽量减少这些次数。
  • ncalls次数很多,而且cumtime不低的,是优化重点,可以看到主要是bar回溯。如果vnpy自带改进思路不是很多。如果是自己的onBar方法,尽量减少里面运算,可以放在条件判断下的,先放进去。
    比如self.pos == 0, self.pos <> 0: 这样。
  • 在不会缺少数据的前提下,尽量减少ArrayManager中的长度,比如ArrayManager(50),比默认100少一半。
  • 使用缓存或者固化数据,我自己定义了一个方法atrlog,发现用时很多,检查发现实在每次调用时候要用三次np.log去计算high, low, close的对数值;这里我索性一次过把所有历史数据在读取时候就把历史数据对数化放到cache里面。减少了耗时。

当然还有很多,只是一些我的思路,其他优化可以见专业文章。



Jupyter Notebook实现从IB接口历史数据获取,写入数据库,策略回测和实盘交易

刚好有个同学问怎么实现IB历史数据获取,和策略回测和实盘交易。想着熟悉vnpy2.0操作,就用Jupyter Notebook都是跑了一边。VNPY2.0的整体架构设计很有扩展性,而且调用也比起v1.0先进清晰很多,引擎加载调用非常方便。

讲讲注意点:

  1. IB接口历史数据大多是要收费的订阅的,如果收费会有报错信息提示,这里找个免费的作为使用。另外vnpy是按照最大6个月历史数据设计的。
  2. 数据库定义有个小坑,我是用mongodb的,在第一次填写 trader/setting.py中密码写错了,后面在trader/setting.py改发现怎么也改不好;原来当第一次维护后,配置会写入.vntrader/vt_setting,之后系统只会去.vntrader/vt_setting读取。去改vt_setting,而不是trader/setting.py。
  3. 使用CtaStrategyApp支持加入新策略,系统会自动输出json保持策略信息;所以第二次运行代码时候,会提示已经有了,不是问题。
  4. 我在代码里面把回测和实盘放在一次,如果直接跑下来可能会报错,建议跑实盘时候先注释的回测。
  5. 使用script_engine订阅历史数据是是默认从rqdata获取,vnpy v2.07 IB接口已经提供历史数据获取,这里创建HistoryRequest用main_engine来获取,

为了方便贴出来,改成.py代码格式,直接跑也没有问题。

from vnpy.app.script_trader import init_cli_trading
from vnpy.app.script_trader.cli import process_log_event
from vnpy.gateway.ib import IbGateway
from time import sleep
from datetime import datetime
import pandas as pd
# 连接到服务器
setting = {
    "TWS地址": "127.0.0.1",
    "TWS端口": 7497,
    "客户号":8 #每个链接用一个独立的链接号,一个IBAPI支持32个来同时链接
}
engine = init_cli_trading([IbGateway]) #返回Script_engine 示例,并且给main_engine注册了gateway
engine.connect_gateway(setting, "IB") #链接

# 查询资金 - 自动
sleep(10)
print("***查询资金和持仓***")
print(engine.get_all_accounts(use_df = True))
# 查询持仓
print(engine.get_all_positions(use_df = True))

# 订阅行情
from vnpy.trader.constant import Exchange
from vnpy.trader.object import SubscribeRequest
# 从我测试直接用Script_engine有问题,IB的品种太多,get_all_contracts命令不行,需要指定具体后才可以,这里使用main_engine订阅
req1 = SubscribeRequest("12087792",Exchange.IDEALPRO) #创建行情订阅
engine.main_engine.subscribe(req1,"IB")


# 使用script_engine订阅历史数据是从rqdata获取,vnpy v2.07已经提供历史数据获取,这里创建HistoryRequest来获取,
# 查询如果没有endtime,默认当前。返回历史数据输出到数据库和csv文件
# 关于api更多信息可以参见 https://interactivebrokers.github.io/tws-api/historical_bars.html
print("***从IB读取历史数据, 返回历史数据输出到数据库和csv文件***")
from vnpy.trader.object import HistoryRequest
from vnpy.trader.object import Interval
start = datetime.strptime('20190901', "%Y%m%d")

historyreq = HistoryRequest(
    symbol="12087792",
    exchange=Exchange.IDEALPRO,
    start=start,
    interval=Interval.MINUTE
)
# # 读取历史数据,并把历史数据BarData放入数据库
bardatalist = engine.main_engine.query_history(historyreq,"IB")
from vnpy.trader.database import database_manager
database_manager.save_bar_data(bardatalist)

# 把历史数据BarData输出到csv
pd.DataFrame(bardatalist).to_csv("C:\Project\\"+ str(historyreq.symbol) + ".csv" , index=True, header=True)
print("History data export to CSV")

# # 参考backtesting.ipynb, 使用自带的双均线策略回测,10日上穿60日做多,否则反之
print("***从数据库读取历史数据, 进行回测***")
from vnpy.app.cta_strategy.backtesting import BacktestingEngine
from vnpy.app.cta_strategy.strategies.double_ma_strategy import (
    DoubleMaStrategy,
)
btengine = BacktestingEngine() #新建回测引擎
btengine.set_parameters(
    vt_symbol="12087792.IDEALPRO",
    interval="1m",
    start=datetime(2019, 9, 1),
    end=datetime(2019, 10, 5),
    rate = 0,
    slippage=0.00005,
    size=1000,
    pricetick=0.00005,
    capital=1_000_000,
)
btengine.add_strategy(DoubleMaStrategy, {"fast_window":10, "slow_window": 60})

btengine.load_data()
btengine.run_backtesting()
df = btengine.calculate_result()
btengine.calculate_statistics()
btengine.show_chart()

# 给script_engine载入双均线策略,实盘运行
print("***从数据库读取准备数据, 实盘运行***")
# 使用cta交易引擎
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
engine.event_engine.register(EVENT_CTA_LOG, process_log_event)
cta_engine = engine.main_engine.add_app(CtaStrategyApp) #加入app
cta_engine.init_engine()
cta_engine.add_strategy("DoubleMaStrategy","DoubleMaStrategy_IB_12087792_v1", "12087792.IDEALPRO",{"fast_window":10, "slow_window": 50})
sleep(10)
cta_engine.init_strategy("DoubleMaStrategy_IB_12087792_v1")
sleep(10)
cta_engine.start_strategy("DoubleMaStrategy_IB_12087792_v1")


解密并强化日内经典策略R-Breaker

R-Breaker是一种中高频的日内交易策略,这个策略也长期被Future Truth杂志评为最赚钱的策略之一。R-Breaker策略结合了趋势和反转两种交易方式,所以交易机会相对较多,比较适合日内1分钟K线或者5分钟K线级别的数据。

 
 

R-Breaker策略逻辑

 

R-Breaker的策略逻辑由以下4部分构成:

1)计算6个目标价位

根据昨日的开高低收价位计算出今日的6个目标价位,按照价格高低依次是:

  • 突破买入价(Bbreak)
  • 观察卖出价(Ssetup)
  • 反转卖出价(Senter)
  • 反转买入价(Benter)
  • 观察买入价(Bsetup)
  • 突破卖出价(Sbreak)

 

他们的计算方法如下:(其中a、b、c、d为策略参数)

  • 观察卖出价(Ssetup)= High + a * (Close – Low)
  • 观察买入(Bsetup)= Low – a * (High – Close)
  • 反转卖出价(Senter)= b / 2 * (High + Low) – c * Low
  • 反转买入价(Benter)= b / 2 * (High + Low) – c * High
  • 突破卖出价(Sbreak)= Ssetup - d * (Ssetup – Bsetup)
  • 突破买入价(Bbreak)= Bsetup + d * (Ssetup – Bsetup)

 

description

2)设计委托逻辑

趋势策略情况:

  • 若价格>突破买入价,开仓做多;
  • 若价格<突破卖出价,开仓做空;

 

反转策略情况:

  • 若日最高价>观察卖出价,然后下跌导致价格<反转卖出价,开仓做空或者反手(先平仓再反向开仓)做空;
  • 若日最低价<观察买入价,然后上涨导致价格>反转买入价,开仓做多或者反手(先平仓再反向开仓)做多;

 

3)设定相应的止盈止损。

 

4)日内策略要求收盘前平仓。

 

上面是原版R-Breaker策略逻辑,但是使用RQData从2010年至今(即2019年10月)的1分钟沪深300股指期货主力连续合约(IF88)测试,效果并不理想。

 
 

策略逻辑优化

 

实际上R-Breaker策略可以拆分成趋势策略和反转策略。下面分别对这对2种策略逻辑进行优化:

1)趋势策略:

  • 若当前x分钟的最高价>观察卖出价,认为它具有上升趋势,在突破买入价挂上买入开仓的停止单;
  • 若当前x分钟的最低价<观察买入价,认为它具有下跌趋势,在突破卖出价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 增加过滤条件:为防止横盘行情导致不断的开平仓,日内每次开仓买入开仓(卖出开仓)委托的价位都比上一次更高(更低);
  • 收盘前,必须平调所持有的仓位。

 

2)反转策略:

  • 若当前x分钟的最高价>观察卖出价,认为它已经到了当日阻力位,可能发生行情反转,在反转卖出价挂上卖出开仓的停止单;
  • 若当前x分钟的最低价>观察买入价,认为它已经到了当日支撑位,可能发生行情反转,在反转买入价挂上买入开仓的停止单;
  • 开仓后,使用固定百分比移动止损离场;
  • 收盘前,必须平调所持有的仓位。

 

其代码实现逻辑如下:

self.tend_high, self.tend_low = am.donchian(self.donchian_window)

if bar.datetime.time() < self.exit_time:

    if self.pos == 0:
        self.intra_trade_low = bar.low_price
        self.intra_trade_high = bar.high_price

        # Trend Condition
        if self.tend_high > self.sell_setup:
            long_entry = max(self.buy_break, self.day_high)
            self.buy(long_entry, self.fixed_size, stop=True)

            self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)

        elif self.tend_low < self.buy_setup:
            short_entry = min(self.sell_break, self.day_low)
            self.short(short_entry, self.fixed_size, stop=True)

            self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)

    elif self.pos > 0:
        self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
        long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
        self.sell(long_stop, abs(self.pos), stop=True)

    elif self.pos < 0:
        self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
        short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
        self.cover(short_stop, abs(self.pos), stop=True)

# Close existing position
else:
    if self.pos > 0:
        self.sell(bar.close_price * 0.99, abs(self.pos))
    elif self.pos < 0:
        self.cover(bar.close_price * 1.01, abs(self.pos))

 
 

策略效果

 

同样使用10年的1分钟IF88数据进行回测。不过,在展示强化版R-Breaker策略效果前,先分别展示一下拆分后的趋势策略和反转策略。

1)趋势策略:

  • 趋势策略夏普比率1.96,日均成交2.6笔,资金曲线是整体上扬的;
  • 但是在2017~2018年的盘整阶段,具有较大并且持续时间较长的回撤;
  • 这凸显出趋势类策略自身无法规避的缺点:在趋势行情中盈利,意味着震荡行情必然亏损。

description

 

2)反转策略

  • 反转策略夏普比率0.75,日均成交0.4笔,资金曲线缓慢上扬;
  • 但是在2017~2018年的盘整阶段,资金曲线上扬最快,而且这个阶段是最平滑的;
  • 这凸显出反转类策略优点:尽管在趋势行情亏损,在震荡行情必然能盈利。

description

 

综合对比2种策略的日均成交笔数和资金曲线,我们可以知道:

  • 由于趋势策略日均交易笔数较多(2.6笔),它主要负责贡献R-Breaker策略的alpha;
  • 趋势策略的亏损也是主要导致R-Breaker策略亏损的原因,但这时候的亏损由反转策略的盈利来填补。

由于趋势策略和反转策略是互斥的,在某些方面呈现出此消彼长的特点。那么,根据投资组合理论,可以把反转策略看作是看跌期权,买入一定规模的看跌期权来对消非系统性风险,那么组合的收益会更加稳健,即夏普比率更高。

由于趋势策略和反转策略日均成交手数比是2.6:0.4,若它们都只委托1手的话,反转策略的对冲效果微乎其微。

为了方便演示,我们设置趋势策略每次交易1手;反转策略则是3手。然后合成R-Breaker策略。发现夏普比率提高到2,资金曲线整体上扬,而且没有较大且持续时间较长的回撤。

description

 
 

结论

 

R-Breaker策略成功之处在于它并不是纯粹的趋势类策略,它属于复合型策略,它的alpha由2部分构成:趋势策略alpha;反转策略alpha。

这类复合型策略可以看作是轻量级的投资组合,因为它的交易标的只有一个:沪深300股指期货的主力合约。

更复杂的话,可以交易多个标的,如在商品期货做虚拟钢厂套利(同时交易螺纹钢、铁矿石、焦炭),在IF股指期货上做日内CTA策略。考虑到市场容量不同,价差套利能分配更多的资金。这样在价差套利提供稳定收益率基础上,CTA策略能在行情好的时候贡献更多alpha(高盈亏比特征导致的)。

从上面例子可以看出,一个合理的投资组合,往往比单个策略具有更高的夏普比率。因为夏普比率=超额收益/风险。夏普比率高意味着资金曲线非常平滑;这也意味着我们可以有效控制使用杠杆的风险。

当某个投资组合策略夏普足够高,而且策略资金容量允许,交易成本能有效控制等情况下,就可以通过杠杆来提升组合收益了。例如向银行贷款或者发放债券,这时候交易团队是债务人角色,即在承担更大风险同时,追求更到收益。债权人享受利息收益(类无风险收益)。

向公众发产品是另一种增加杠杆的方式,但此时投资组合风险已经转移到了客户这方,交易团队可以享受着管理费收益(类无风险收益)。根据目标客户的不同:

  • 私募基金面向高净值客户,这些客户群体风险承受能力较高;并且私募监管比较放松,能使用的衍生品较多,有提升业绩的自由度。故私募基金除了管理费,更追求业绩提成。
  • 公募基金面向普通群众,他们风险承受能力较低;并且公募监管非常严格,投资约束非常多,提升业绩难度较大。但是公募牌照的稀缺性必然导致该行业是盈利的。如万亿级别的管理规模,其管理费的收益,也不是一般的自营交易公司或者私募基金比得上的。

 
 

附录

 

最后附上策略源代码:

from datetime import time
from vnpy.app.cta_strategy import (
    CtaTemplate,
    StopOrder,
    TickData,
    BarData,
    TradeData,
    OrderData,
    BarGenerator,
    ArrayManager
)
​
​
class RBreakStrategy(CtaTemplate):
    """"""
    author = "KeKe"
​
    setup_coef = 0.25
    break_coef = 0.2
    enter_coef_1 = 1.07
    enter_coef_2 = 0.07
    fixed_size = 1
    donchian_window = 30
​
    trailing_long = 0.4
    trailing_short = 0.4
    multiplier = 3
​
    buy_break = 0   # 突破买入价
    sell_setup = 0  # 观察卖出价
    sell_enter = 0  # 反转卖出价
    buy_enter = 0   # 反转买入价
    buy_setup = 0   # 观察买入价
    sell_break = 0  # 突破卖出价
​
    intra_trade_high = 0
    intra_trade_low = 0
​
    day_high = 0
    day_open = 0
    day_close = 0
    day_low = 0
    tend_high = 0
    tend_low = 0
​
    exit_time = time(hour=14, minute=55)
​
    parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
    variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]
​
    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(RBreakStrategy, self).__init__(
            cta_engine, strategy_name, vt_symbol, setting
        )
​
        self.bg = BarGenerator(self.on_bar)
        self.am = ArrayManager()
        self.bars = []
​
    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")
        self.load_bar(10)
​
    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")
​
    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")
​
    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)
​
    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.cancel_all()
​
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return
​
        self.bars.append(bar)
        if len(self.bars) <= 2:
            return
        else:
            self.bars.pop(0)
        last_bar = self.bars[-2]
​
        # New Day
        if last_bar.datetime.date() != bar.datetime.date():
            if self.day_open:
​
                self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close)  # 观察买入价
                self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low)  # 观察卖出价
​
                self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high  # 反转买入价
                self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low  # 反转卖出价
​
                self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup)  # 突破买入价
                self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup)  # 突破卖出价
​
            self.day_open = bar.open_price
            self.day_high = bar.high_price
            self.day_close = bar.close_price
            self.day_low = bar.low_price
​
        # Today
        else:
            self.day_high = max(self.day_high, bar.high_price)
            self.day_low = min(self.day_low, bar.low_price)
            self.day_close = bar.close_price
​
        if not self.sell_setup:
            return
​
        self.tend_high, self.tend_low = am.donchian(self.donchian_window)
​
        if bar.datetime.time() < self.exit_time:
​
            if self.pos == 0:
                self.intra_trade_low = bar.low_price
                self.intra_trade_high = bar.high_price
​
                if self.tend_high > self.sell_setup:
                    long_entry = max(self.buy_break, self.day_high)
                    self.buy(long_entry, self.fixed_size, stop=True)
​
                    self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
​
                elif self.tend_low < self.buy_setup:
                    short_entry = min(self.sell_break, self.day_low)
                    self.short(short_entry, self.fixed_size, stop=True)
​
                    self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
​
            elif self.pos > 0:
                self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
                long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
                self.sell(long_stop, abs(self.pos), stop=True)
​
            elif self.pos < 0:
                self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
                short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
                self.cover(short_stop, abs(self.pos), stop=True)
​
        # Close existing position
        else:
            if self.pos > 0:
                self.sell(bar.close_price * 0.99, abs(self.pos))
            elif self.pos < 0:
                self.cover(bar.close_price * 1.01, abs(self.pos))
​
       self.put_event()
​
    def on_order(self, order: OrderData):
        """
        Callback of new order data update.
        """
        pass
​
    def on_trade(self, trade: TradeData):
        """
        Callback of new trade data update.
        """
        self.put_event()
​
    def on_stop_order(self, stop_order: StopOrder):
        """
        Callback of stop order update.
        """
        pass


从数据库读取数据并且画图

策略开发的第一步,永远是对行情数据进行画图。

from datetime import datetime
from vnpy.trader.constant import Exchange,Interval
from vnpy.trader.database import database_manager
import matplotlib.pyplot as plt

# Load history data
bars =database_manager.load_bar_data(    
    symbol="XBTUSD", 
    exchange=Exchange.BITMEX, 
    interval=Interval.MINUTE, 
    start=datetime(2017, 4, 1), 
    end=datetime(2019, 10, 30)
    )

# Generate x, y
y = []
for bar in bars:
    close_price = bar.close_price
    y.append(close_price)  
x = list(range(1,len(y)+1))

# Show foto
plt.figure(figsize=(40, 20))
plt.plot(x, y)  

plt.show()

description

 

附:若是24小时交易,需要查看是否有缺失数据,x轴和y轴数据可以改成下面这样

# generate x, y
x=[]
y = []
for bar in bars:
    time = bar.datetime
    close_price = bar.close_price

    x.append(time)
    y.append(close_price)


用pickle本地持久化数据提高回测速度,实测提升2.3倍左右

@lru_cache(maxsize=10)
def load_bar_data(
    symbol: str,
    exchange: Exchange,
    interval: Interval,
    start: datetime,
    end: datetime
):
    """bar数据缓存为pkl格式到本地硬盘"""
    dir_path = f"H:\\pickle_data\\"
    file_name = f"{symbol}_{exchange.value}_{start.date()}_{end.date()}_bar"
    pickle_path = dir_path + file_name + ".pkl"
    data_size  =0 
    if not os.path.exists(pickle_path):
        bar_data = database_manager.load_bar_data( symbol, exchange, interval, start, end )
        pickle_file = open(pickle_path,'wb')    
        pickle.dump(bar_data,pickle_file)
        pickle_file.close()
    else:        
        pickle_file = open(pickle_path,'rb')
        bar_data =pickle.load(pickle_file)
        pickle_file.close()
    #pickle_data文件夹大于50G清空缓存数据
    for dirpath, dirnames, filenames in os.walk(dir_path):
        for file_name in filenames:         #当前目录所有文件名
            data_size += os.path.getsize(dirpath + file_name)
    if data_size / (1024 ** 3) > 50:
        for dirpath, dirnames, filenames in os.walk(dir_path):
            for file_name in filenames:           
                os.remove(dirpath + file_name)    
    return bar_data


@lru_cache(maxsize=10)
def load_tick_data(
    symbol: str,
    exchange: Exchange,
    start: datetime,
    end: datetime
):
    """tick数据缓存为pkl格式到本地硬盘"""
    dir_path = f"H:\\pickle_data\\"
    file_name = f"{symbol}_{exchange.value}_{start.date()}_{end.date()}_tick"
    pickle_path = dir_path + file_name + ".pkl"
    data_size  =0 
    if not os.path.exists(pickle_path):
        tick_data = database_manager.load_tick_data( symbol, exchange, start, end )
        pickle_file = open(pickle_path,'wb')    
        pickle.dump(tick_data,pickle_file)
        pickle_file.close()
    else:        
        pickle_file = open(pickle_path,'rb')
        tick_data =pickle.load(pickle_file)
        pickle_file.close()
    #pickle_data文件夹大于50G清空缓存数据
    for dirpath, dirnames, filenames in os.walk(dir_path):
        for file_name in filenames:         #当前目录所有文件名
            data_size += os.path.getsize(dirpath + file_name)
    if data_size / (1024 ** 3) > 50:
        for dirpath, dirnames, filenames in os.walk(dir_path):
            for file_name in filenames:           
                os.remove(dirpath + file_name)    
    return tick_data


2019年vn.py项目计划:Hello, 2.0!

回顾2018年

还有三个小时不到2018年就要结束了,又到了总结的时间。除了年初定下的计划全部完成(绿色打勾标志),还额外完成了许多计划外的任务(红色爱心标志),比去年要好得多。

vn.py项目2018年完成的任务:

计划内的任务

Docker部分,主要完成了WebTrader前端和VNC虚拟桌面(社区贡献)两个方案。年初的时候寄望颇高,希望能成为降低入门用户门槛的主要方案,后来发现Docker的用处还是主要局限在开发完成后的实盘部署上,对于开发过程中各种环境配置的帮助意义不大。

跳票一年多的海龟策略,总算是交出了一份满意的答卷,针对海龟策略中诸多难点的定制化回测框架,结合RQData提供的高质量期货历史数据,即使是35年前就定下的参数组合,结果一样让人惊喜:![

上期所活跃品种,完全样本外测试(海龟默认参数):

稍作调整后,用在比特币期货上,就仿佛变成了印钞机:

现货仅包括了Alpha超额部分,现货的底仓风险需要另行计算

知乎LIVE上,主要是年初完成的《个人投资新方向:开始你的量化交易》的专辑课程,以及海龟策略发布后的《带你重新认识海龟交易策略》。线下活动前前后后举办了6场,有公开面向社区的,也有专门面向某些机构群体的。

计划外的工作

计划外最大的一块是VN Crypto(vn.crypto)项目,主要针对数字货币(CryptoCurrency)市场,开发了一系列针对性的接口和应用,包括:

  • 超过10家主流交易所的对接
  • 针对数字货币市场的CryptoTrader
  • 针对REST API的通用客户端RestClient
  • 针对Websocket API的通用客户端WebsocketClient
  • 数字货币市场的数据服务方案:CoinAPI.io和CryptoCompare数据服务

上层应用模块则是增加了社区需求比较广泛的两个模块:

  • AlgoTrading,算法交易模块,针对各类交易执行方面的需求(大委托拆单、控制成本、自成交刷量、托市护盘等)提供了一系列的标准算法工具
  • TradeCopy,交易复制模块,解决一些规模较小尚未能够发行私募产品的用户,同时管理多个交易账户的问题

历史数据方面走了一些弯路,在做了诸多努力后(开发DataRecording模块、对接各家数据服务、解答社区问题),终于意识到“高性价比”+“标准化”的对接才是唯一的解决方案,在若干个月的寻找后终于和米筐达成了战略合作,从此RQData成为了vn.py项目的标准期货数据提供方。

最后在社区活动上,很荣幸受到厦门大学的邀请开设了一门基于vn.py的量化交易暑期课程。也第二次以嘉宾的身份参加PyCon 2018的北京和上海场次,分享了数字货币量化交易方面的主题内容。最后也非常感谢万得数据WindQuant的邀请,在WQFA课程体系中负责讲解期权相关的量化交易课程

量化开源排名

一图(表)胜千言吧(2018年12月31日夜里11点统计,哈~),简单总结几条:

  • Python为王(13/20,还有谁不服?)
  • 数字货币寒冬(ccxt、Gekko等项目增长明显放缓)
  • Quant行业还在蓬勃发展(你追我赶,排名变化迅速、明显)

2019的规划

2019年的重头戏将是Python 3上的v2.0版本开发,在完成v1.9.2的LTS版本后,vn.py的交易业务功能开发其实已经告一段落,但同时也遇到了自身结构上的瓶颈,唯一的突破机会就是再一次的项目重构升级,对代码、功能的提炼和改进

快速迭代

v2.0后续的开发,将转为采用快速迭代的方法,从之前的大版本号更新(平均几个月发布一次,包含若干个功能更新),转向快速连续的小版本号更新(每个新功能一个发布、每若干个重要的BUG修复一个发布),一个简单的类比方法就是Chrome v.s IE。

最新的Python 3.7

v2.0将基于Python 3.7重新开发(参考之前的代码),利用最新版本Python语言特性的同时,尽可能保证对3.6的兼容(更早的版本就不考虑了):

  • 使用DataClass来更加精简的描述数据结构
  • 通过TypeHint引入开发时类型分析的功能,减少代码Bug
  • 基于全新的有序Dict(代替OrderedDict)、默认迭代器遍历等特性,降低内存开销
  • 默认Unicode的字符串类型
  • 基于enum来定义常量类型
  • 内置的异步模块来开发Web相关的通讯接口(RestClient、WebTrader等)
  • 等等

英文内容

17年就推出了基本的VN Trader英文版本,但后续并没有进一步的推进,在接下来的2019年将会在英文方面投入更多的精力:

  • 代码注释将全部采用英文
  • 官方网站vnpy.com的英文版本
  • 和QQ上的官方交流群类似的Telegram英文社群
  • 英文的项目相关文章

在这里,也向社区内对于英文内容感兴趣的成员们发出邀请,欢迎加入vn.py相关的英文维护工作。

最后:

2019, Enjoy Trading!!!



载入Tick数据(csv格式)到数据库中

CSV格式数据示例如下

description

 

表头及第一行数据示例

 

交易日,合约代码,交易所代码,合约在交易所的代码,最新价,上次结算价,昨收盘,昨持仓量,今开盘,最高价,最低价,数量,成交金额,持仓量,今收盘,本次结算价,涨停板价,跌停板价,昨虚实度,今虚实度,最后修改时间,最后修改毫秒,申买价一,申买量一,申卖价一,申卖量一,申买价二,申买量二,申卖价二,申卖量二,申买价三,申买量三,申卖价三,申卖量三,申买价四,申买量四,申卖价四,申卖量四,申买价五,申买量五,申卖价五,申卖量五,当日均价,业务日期
20190102,ru1905,,,11280.0000,11290.0000,11305.0000,322472,11280.0000,11280.0000,11280.0000,246,27748800.0000,322468,0.0000,0.0000,12080.0000,10495.0000,0,0,08:59:00,500,11280.0000,10,11290.0000,10,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,0.0000,0,112800.0000,20190102

 

可以发现几个问题:

  • 表头是中文的
  • datetime需要由3列数据合成
  • 在非交易时间可能出现垃圾数据,需要剔除(不包含集合竞价发出的那一个Tick数据)

 

基于csv格式的特点,开发载入tick数据到数据库的脚本,脚本功能如下:

  1. 在同一文件夹下,用for循环读取csv文件并载入到数据库
  2. 合成时间字符串,并且最终转换为datetime格式
  3. 通过datetime来判断非交易时间段,剔除垃圾数据的载入

脚本实现代码如下:

import os 
import csv
from datetime import datetime, time

from vnpy.trader.constant import Exchange
from vnpy.trader.database import database_manager
from vnpy.trader.object import TickData


def run_load_csv():
    """
    遍历同一文件夹内所有csv文件,并且载入到数据库中
    """
    for file in os.listdir("."): 
        if not file.endswith(".csv"): 
            continue

        print("载入文件:", file)
        csv_load(file)


def csv_load(file):
    """
    读取csv文件内容,并写入到数据库中    
    """
    with open(file, "r") as f:
        reader = csv.DictReader(f)

        ticks = []
        start = None
        count = 0

        for item in reader:

            # generate datetime
            date = item["交易日"]
            second = item["最后修改时间"]
            millisecond = item["最后修改毫秒"]

            standard_time = date + " " + second + "." + millisecond
            dt = datetime.strptime(standard_time, "%Y%m%d %H:%M:%S.%f")

            # filter
            if dt.time() > time(15, 1) and dt.time() < time(20, 59):
                continue

            tick = TickData(
                symbol="RU88",
                datetime=dt,
                exchange=Exchange.SHFE,
                last_price=float(item["最新价"]),
                volume=float(item["持仓量"]),
                bid_price_1=float(item["申买价一"]),
                bid_volume_1=float(item["申买量一"]),
                ask_price_1=float(item["申卖价一"]),
                ask_volume_1=float(item["申卖量一"]), 
                gateway_name="DB",       
            )
            ticks.append(tick)

            # do some statistics
            count += 1
            if not start:
                start = tick.datetime

        end = tick.datetime
        database_manager.save_tick_data(ticks)

        print("插入数据", start, "-", end, "总数量:", count)      


if __name__ == "__main__":
    run_load_csv()

 

效果展示

 

description

description



python多进程简介,和VNPY多进程参数优化代码分析

之前为了实现利用遗传算法,进行多进程策略的优化,学习研究了python的多进程库Multiprocessing。以前感觉真是黑科技,学习后发现,还是python优点,简单好用,对于一般应用还是很好理解。

首先,由于GIL(全局解释锁)的问题,全局对象只能一个进程调用,python多线程并不能充分利用多核处理器,比如有时候用pandas跑大型数据分析,发现只有一核在累死累活。如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。multiprocessing可以给每个进程赋予单独的Python解释器,这样就规避了全局解释锁所带来的问题。可以理解为多核CPU分配好一个工作任务,这个工作任务包括工作方法和工作内容。

其实python多线程很简单,相对于其他语言来说。其实简单就是针对需要多线程的方法func(a),a是参数。相当于工作内容;使用 Multiprocessing. Process(target = func, args =(a,)),创建一个Prcoess对象,也就是工作任务,再启动这个对象,这样一个多进程任务就完成了。等CPU分配一个独立核去干活,func(a)就开动了。这里唯一要注意args是默认输入元祖参数。

P = Multiprocessing.Process(target = func, args =(a,))
P.start()

Multiprocessing提供了更简洁的pool做为进程池,其实叫任务池更为恰当。把需要干的工作任务打包好,放在这个池子里面,这样空闲下来的核心就捡pool的任务干活。

常见的pool的使用如下,其中prcesses = 4 是定义任务池大小,不一定要小于或者等于cpu核心数量,可以大于cpu核心数量,不过这样就有几个任务空挂着还占用内存。

然后使用pool方法apply_async(task, args=(x,)),把打包好的任务插入池中。 apply_asyncs是异步的带返回值。如果用apply也可以正常,但是会没有返回值,此处不仔细研究了。

之后close()是把这个任务池关闭,不再接受新的任务;但是还有一些已有任务在跑,所以用pool.join(),吊着主程序,直到所有任务完成才进入下一步。

if __name__ == '__main__':
    Multiprocessing.pool = Pool(processes=4)
    for x in range(10):
        pool.apply_async(task, args=(x,))
    pool.close()
    pool.join()

下面看看VNPY多进程优化方法。其实很好理解了,runParallelOptimization是类BacktestingEngine的一个方法。

传入参数strategyClass就是这个策略类,setting是要优化参数范围,后面通过optimizationSetting.generateSetting()生成策略参数队列,做为任务内容;optimizationSetting.optimizeTarget是后面返回值。至于回测品种,回测时间段,交易费用什么,在 BacktestingEngine创建时候维护了。

然后创建任务池pool,大小刚好是cpu核数,这个也是比较稳妥设置。

之后做一个l队列来放返回值。

然后打包策略类,回测参数,策略参数做为任务内容,和任务方法optimize一起组合为一个工作任务。然后插入任务池给cpu核心去跑。这个时候在系统监视器可以看到于核心数相同的python虚拟环境运作。

然后就是对返回值排序。后面详细说说。

df =  engine.runParallelOptimization(AtrRsiStrategy, setting)
def runParallelOptimization(self, strategyClass, optimizationSetting):
    """并行优化参数"""
    # 获取优化设置        
    settingList = optimizationSetting.generateSetting()
    targetName = optimizationSetting.optimizeTarget

    # 检查参数设置问题
    if not settingList or not targetName:
        self.output(u'优化设置有问题,请检查')

    # 多进程优化,启动一个对应CPU核心数量的进程池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    l = []
    for setting in settingList:
        l.append(pool.apply_async(optimize, (strategyClass, setting,
                                             targetName, self.mode, 
                                             self.startDate, self.initDays, self.endDate,
                                             self.slippage, self.rate, self.size, self.priceTick,
                                             self.dbName, self.symbol)))
    pool.close()
    pool.join()

    # 显示结果
    resultList = [res.get() for res in l]
    resultList.sort(reverse=True, key=lambda result:result[1])
    return resultList

像现在双核四线程就有四个python环境在跑任务。

enter image description here
这里会发现是用静态方法optimize,如果直接调用 BacktestingEngine的回测方法更简洁,为什么没有呢,这个是python2.7的Multiprocessing的一个局限,只能打包静态方法做为工作方法,如果打包类中的方法,会提示错误。

cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup builtin .instanceme

如果VNPY2.0基于python3.6版本,应该就会更简化一些。

下面看看 静态方法 optimize,其实没什么好说,就是新建一个回测引擎BacktestingEngine对象,按照参数跑一遍回测,返回一个元祖,包含了这次回测的参数,针对回测目标的值,和一个包含回测结果的字典,这个字典包括什么年化收入,sharpe等一堆回测结果。

然后所有的回测结果元祖组成一个回测结果队列,这个结果队列按照targetValue反向排序,最大放在第一位。

因为太多了,一般我都是输出到excel里面,之前说过怎么实现。

#----------------------------------------------------------------------
def optimize(strategyClass, setting, targetName,
             mode, startDate, initDays, endDate,
             slippage, rate, size, priceTick,
             dbName, symbol):
    """多进程优化时跑在每个进程中运行的函数"""
    engine = BacktestingEngine()
    engine.setBacktestingMode(mode)
    engine.setStartDate(startDate, initDays)
    engine.setEndDate(endDate)
    engine.setSlippage(slippage)
    engine.setRate(rate)
    engine.setSize(size)
    engine.setPriceTick(priceTick)
    engine.setDatabase(dbName, symbol)
    engine.initStrategy(strategyClass, setting)
    engine.runBacktesting()
    engine.calculateDailyResult()
    d, result = engine.calculateDailyStatistics()
    try:
        targetValue = result[targetName]
    except KeyError:
        targetValue = 0
    return (str(setting), targetValue, result)

其实python的多进程库Multiprocessing不算复杂,但是用在回测上效果很好;现在有了遗传算法,进行策略优化更加方便了。



VNPY连接IB接口交易的一些坑,还有如何订阅延迟行情

这个文章主要是记录IB模拟交易使用的一些坑,IB TWS和VNPY链接和一般的接口不一样,不是VNPY通过API直接联网IB交易服务器,而且现在本地安装一个类似gateway的tws客户端,vnpy链接本地tws客户端,在由客户端中转到交易服务器。IB也提供非客户端的纯本地gatway软件,没有试过,这里只是说说通过TWS客户端。
在写之前没有看到陈总在知乎的文章,走了不少坑。https://zhuanlan.zhihu.com/p/75787960

1 - 安装IB TWS桌面版,然后登录模拟账户。假定你已经申请好了用户,并且有了模拟账户。如果没有可以直接官网申请。
2 - 等罗后一定要记得勾选“Enable ActiveX and Socket Clients”这个选项,此外在下方的“Trusted IPs”,要注意是否允许本地连接已经打开了,这个勾选后,默认127.0.0.1可以直接访问TWS,而其他地址,需要通过“Create”命令,加入到TWS的白名单里。
另外Socket port,和VNPY里面要填入一致,一般7497是模拟交易,7496是正式交易。
description

3 - 链接后,会返回message code,可以去下面网址查询message code含义,-1是warning
ERROR -1 2104 Market data farm connection is OK:usfarm.nj
ERROR -1 2104 Market data farm connection is OK:cashfarm
ERROR -1 2104 Market data farm connection is OK:usfarm
ERROR -1 2106 HMDS data farm connection is OK:hkhmds
ERROR -1 2106 HMDS data farm connection is OK:ushmds
https://interactivebrokers.github.io/tws-api/message_codes.html
description

4 - 研究下IB Gateway的三个类,简单说明下, 后面有空在研究
IbGateway: BaseGateway的继承,提供BaseGateway的一些abstractmethod的实现,被其他引擎按照标准调用,其实那些实现都是打包IbAPI的方法。
IbAPI:IBAPI 中Ewrapper 类的继承,基本所有订阅查询下单都是在这里二次打包了。
IbClient: IBAPI中EClient的继承,主要保持和TWS连通
5 - 订阅行情,这里是查询是盈透证券对于每个合约在某一交易所的唯一标识符ConId码,而非Symbol或者LocalName。要去下面官网查询,比如BABA就是166090175
https://contract.ibkr.info/v3.10/index.php
关于交易所,可以在TWS中点击右键 - Finacial Instrument Info 看到,SMART并不是交易所,而且IB会自动选择Primary Exhcange,避免选取太多。
description
如果提示错误信息 354, 那就是你没有订阅相关行情数据,那么去安装下面也去订阅,还有模拟账户共享真实账户订阅数据,所以即使模拟账户,还是要现在真实账户订阅。
https://interactivebrokers.github.io/tws-api/market_data.html

7 - 交易,直接在vnpy交易界面下单,没有什么问题,对于股票来说就是多是买入,空是卖出,和方向开平没有什么关系。
返回成交信息的时候,vnpy系统报错,研究发现是IB的模拟交易用了一个模拟交易所exhange,所以在vnpy/gateway/ib/ib_gateway.py和 trader/constant.py文件中加入这个exhange
Exchange.IBKRATS:'IBKRATS'

8 - 延时行情订阅,当你去订阅时候,会发现,嗯,收费的,不如我大A股,测试环境只能用延时数据,暂时还不准备打钱。这时候虽然在tws上是有数据,但是在vnpy尝试订阅延时数据时候,会提示说
ERROR 6 10168 Requested market data is not subscribed. Delayed market data is not enabled
研究了下,在vnpy/gateway/ib/ib_gateway.py中reqMktData前的延时快照数据读取激活,如果真的买了,最好改回去,虽然系统会自动推送最新数据如果发现你购买了。
这里默认是1真实行情, 3 是延时行情。live(1)frozen(2)delayed(3)delayedfrozen(4).
在reqMktData之前加一行
self.client.reqMarketDataType(3)
但是发现还是没有vnpy数据,再一看,是因为delyed的ticktype不一样,是88,而vnpy只让45这个实际的过来。而且还有tick数据也不一样编码,要从新加入到ENUM TICKFIELD_IB2VT, 好像折腾,直接再TWS就不可以看到了。改到这里就改下去吧,最好可以显示。
def tickString 修改成
if tickType!="45"ortickType!="88":
TICKFIELD_IB2VT 加入下面
66:"bid_volume_1",
69:"bid_price_1",
67:"ask_price_1",
70:"ask_volume_1",
68:"last_price",
71:"last_volume",
73:"high_price",
73:"low_price",
74:"volume",
75:"pre_close",
76:"open_price",
最后显示效果如下
description
后面我会尝试用Jupyter Notebook试试交易



使用Jupyter NoteBook进行IB查询和交易,以及使用算法交易示例

在搞好IB盈透接口后,试了下客户端交易,但是最终目的还是使用程序化交易。发现vnpy已经提供的Script_engine来支持Jupyter NoteBook 交易的,而且非常方便调用。
这里就用写了用代码实现IB盈透下的查询和交易,和一个TWVP算法交易。

Script_engine的大多操作都是针对main_engine的封装,类似的逻辑,其他交易相关App,也可以用类似方法调用,真的很方便,比起之前调试来说。其实算法交易调用也很直接,直接传入algo setting 的dict就可以。

应为Jupyter NoteBook代码不好贴,我这里又改写会直接python code。在启动tws登录后,可以直接运行。
另外IB接口的返回信息采用一个中wrapper机制,有点类似Spring的反转调用,可以理解为本地返回方法是被IBapi调用的写入。

from vnpy.app.script_trader import init_cli_trading
from vnpy.gateway.ib import IbGateway
from time import sleep
# 连接到服务器
setting = {
    "TWS地址": "127.0.0.1",
    "TWS端口": 7497,
    "客户号":5 #每个链接用一个独立的链接号,一个IBAPI支持32个来同时链接
}
engine = init_cli_trading([IbGateway]) #返回Script_engine 示例,并且给main_engine注册了gateway
engine.connect_gateway(setting, "IB") #链接

# 查询资金 - 自动
sleep(10)
print(engine.get_all_accounts(use_df = True))
# 查询持仓
print(engine.get_all_positions(use_df = True))

# 订阅行情
from vnpy.trader.constant import Exchange
from vnpy.trader.object import SubscribeRequest
# 从我测试直接用Script_engine有问题,IB的品种太多,get_all_contracts命令不行,需要指定具体后才可以,这里使用main_engine订阅
req1 = SubscribeRequest("152791428",Exchange.SEHK) #创建行情订阅,腾讯
req2 = SubscribeRequest("332623976",Exchange.SEHK) #创建行情订阅,美团
req3 = SubscribeRequest("12087792",Exchange.IDEALPRO) #创建行情订阅,美团
engine.main_engine.subscribe(req1,"IB")
engine.main_engine.subscribe(req2,"IB")
engine.main_engine.subscribe(req3,"IB")

# 返回行情
sleep(10)
print(engine.get_all_contracts(use_df = True)) #返回所有已经订阅的contact
print(engine.get_contract("152791428.SEHK",use_df = True)) #返回单个订阅的contact
print(engine.get_ticks(["152791428.SEHK","332623976.SEHK"],use_df = True)) #返回订阅的tick

# 委托下单,返回订单号
from vnpy.trader.constant import OrderType
vt_orderid = engine.buy(vt_symbol = "12087792.IDEALPRO",price = 1.20, volume = 50000, order_type = OrderType.LIMIT)
print(vt_orderid)


# 按照订单号查询委托状态,这里也可以用get_orders, 查询订单号队列
sleep(10)
print(engine.get_order(vt_orderid)) #
print(engine.get_trades(vt_orderid, use_df= True))
# 再次查询持仓
print(engine.get_all_positions(use_df = True))

# 使用算法交易引擎
from vnpy.app.algo_trading import AlgoTradingApp
engine.main_engine.add_app(AlgoTradingApp) #加入app
AlgoInstance = engine.main_engine.get_engine("AlgoTrading") #为了方便,这里直接用返回的AlgoInstance
# 创建算法交易的要执行交易内容, 这个可以复制 algo_trading_setting.json的内容,这里这里策略是,100秒内每隔10秒下单一次,每次购买10000
AlgotradingDict1 = {
        "template_name": "TwapAlgo",
        "vt_symbol": "12087792.IDEALPRO",
        "direction": "多",
        "price": 1.0985,
        "volume": 10000.0,
        "time": 100,
        "interval": 10,
        "offset": ""
    }
AlgoInstance.start_algo(setting = AlgotradingDict1)

# 再次查询持仓
print(engine.get_all_positions(use_df = True))

统计

主题
1568
帖子
5742
已注册用户
6573
最新用户
在线用户
104
在线来宾用户
189
© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3