VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 187
声望: 58

通用型逐笔成交统计

 

逐笔成交统计想用通用化,难点在于去限定一次完整开平交易的开始点和结束点,抽象来说就是寻找特殊的断点对所有成交记录进行划分。

 

断点的选择

 
而在算法状态机控制中,我们可以知道数字0是一个非常有用的评判标准,即我们构建一列数据,让它数值在完全平仓后变成0,就知道真正的平仓时间。

 

在实践中,累计净持仓恰恰好符合这个标准,我们把多头仓位设为”+”,空头仓位设为“-”,得到如下表的【方向持仓】,对【方向持仓】进行累计得到【净持仓】。

 

这样,我们基于【净持仓】为0可以得到每次开平交易的结束点。而该结束点为成交记录的断点。

 

description

 
使用断点划分成交记录
 

为了简单演示,下面我们只显示【净持仓】(列)为0的成交信息(行),如下表所示,一共发生了5开完整的开平仓交易。每笔交易的结束点对应的交易序号分别为3、5、8、12、20。这5个结束点即为对所有成交信息的断点。

 

description

 
之后,我们要引入2个新的概念:
 

  • 存量:某一时间点的累计统计量
  • 增量:某一时间段内,累计统计量的增加量
     

存量是静态的,可以理解为对累计统计量的信息进行时间切片;而增量是动态的,代表时间切片信息的变化量,所以他们二者的关系如下:

 

T0时刻存量 + T0->T1增量 = T1时刻存量
 

换句话说,

 

T0->T1增量 = T1时刻存量 - T0时刻存量

 

回到逐笔回测统计主题上,增量这个概念,就能代表最新的完整开平仓交易,例如其每笔盈亏,对累计盈亏的影响。

 

如下图所示,在完成第一笔开平仓交易后,累计盈亏是1000;完成了第二笔完整的开平仓交易,累计盈亏是2000,那么二者的差别,即2000-1000=1000。这增加1000的盈利,就是属于第二笔开平仓交易的。

 

description

 
所以,通过对每个断点存量信息的对比,我们就可以得到每笔开平仓成交后的统计量:

 

description

 
这些开平仓的统计量可以如下表所示的开平成交量、开平盈亏,也可以是开平仓交易的持仓时间、手续费、滑点以及净盈亏:

 

description

 
 

从算法的原理到代码

 

计算开平交易结果
 

  1. 生成基础DataFrame信息,包括每笔交易的方向,开平,价格,时间;
  2. 计算方向持仓,以及有方向持仓累加的净持仓,计算累计持仓存量(成交量的简单相加);
  3. 计算盈亏存量,当净持仓为0时候,显示每笔开平交易对于存量盈亏的增量;
  4. 当净持仓为0时候,显示每笔开平交易的持仓时间,成交量,成交额的增量;
  5. 对DataFrame的行进行处理,剔除出那些净持仓不为0的行数,即剩下的行数都是每笔开平交易的最后一次平仓交易,通过平仓的方向可以判断该完整开平流程,如方向为空,开平为平,那么完整开平交易为多开->空平。
  6. 计算手续费,滑点以及净盈亏
  7. 返回新的DataFrame。

 

import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
pd.set_option('mode.chained_assignment', None)


def calculate_trades_result(trades):
    """
    Deal with trade data
    """
    dt, direction, offset, price, volume = [], [], [], [], []
    for i in trades.values():
        dt.append(i.datetime)
        direction.append(i.direction.value)
        offset.append(i.offset.value)
        price.append(i.price)
        volume.append(i.volume)

    # Generate DataFrame with datetime, direction, offset, price, volume
    df = pd.DataFrame()
    df["direction"] = direction
    df["offset"] = offset
    df["price"] = price
    df["volume"] = volume

    df["current_time"] = dt
    df["last_time"] = df["current_time"].shift(1)

    # Calculate trade amount
    df["amount"] = df["price"] * df["volume"]
    df["acum_amount"] = df["amount"].cumsum()

    # Calculate pos, net pos(with direction), acumluation pos(with direction)
    def calculate_pos(df):
        if df["direction"] == "多":
            result = df["volume"]
        else:
            result = - df["volume"]

        return result
    df["pos"] = df.apply(calculate_pos, axis=1)

    df["net_pos"] = df["pos"].cumsum()
    df["acum_pos"] = df["volume"].cumsum()

    # Calculate trade result, acumulation result
    # ej: trade result(buy->sell) means (new price - old price) * volume
    df["result"] = -1 * df["pos"] * df["price"]
    df["acum_result"] = df["result"].cumsum()

    # Filter column data when net pos comes to zero
    def get_acum_trade_result(df):
        if df["net_pos"] == 0:
            return df["acum_result"]
    df["acum_trade_result"] = df.apply(get_acum_trade_result, axis=1)

    def get_acum_trade_volume(df):
        if df["net_pos"] == 0:
            return df["acum_pos"]
    df["acum_trade_volume"] = df.apply(get_acum_trade_volume, axis=1)   

    def get_acum_trade_duration(df):
        if df["net_pos"] == 0:
            return df["current_time"] - df["last_time"]
    df["acum_trade_duration"] = df.apply(get_acum_trade_duration, axis=1)  

    def get_acum_trade_amount(df):
        if df["net_pos"] == 0:
            return df["acum_amount"]
    df["acum_trade_amount"] = df.apply(get_acum_trade_amount, axis=1) 

    # Select row data with net pos equil to zero     
    df = df.dropna()

    return df

def generate_trade_df(trades, size, rate, slippage, capital):
    """
    Calculate trade result from increment
    """
    df = calculate_trades_result(trades)

    trade_df = pd.DataFrame()
    trade_df["close_direction"] = df["direction"]
    trade_df["close_time"] = df["current_time"]
    trade_df["close_price"] = df["price"]
    trade_df["pnl"] = df["acum_trade_result"] - \
        df["acum_trade_result"].shift(1).fillna(0)

    trade_df["volume"] = df["acum_trade_volume"] - \
        df["acum_trade_volume"].shift(1).fillna(0)
    trade_df["duration"] = df["current_time"] - \
        df["last_time"]
    trade_df["turnover"] = df["acum_trade_amount"] - \
        df["acum_trade_amount"].shift(1).fillna(0)

    trade_df["commission"] = trade_df["turnover"] * rate
    trade_df["slipping"] = trade_df["volume"] * size * slippage

    trade_df["net_pnl"] = trade_df["pnl"] - \
        trade_df["commission"] - trade_df["slipping"]

    result = calculate_base_net_pnl(trade_df, capital)
    return result

 
 

汇总生成资金曲线
 

  1. 基于每笔开平交易的净盈亏,计算累计盈亏;
  2. 累计盈亏加上用户输入的起始资金即为资金曲线;
  3. 基于资金曲线计算每笔的每笔开平交易的盈利率,回撤和百分比回撤。

 

def calculate_base_net_pnl(df, capital):
    """
    Calculate statistic base on net pnl
    """
    df["acum_pnl"] = df["net_pnl"].cumsum()
    df["balance"] = df["acum_pnl"] + capital
    df["return"] = np.log(
        df["balance"] / df["balance"].shift(1)
        ).fillna(0)
    df["highlevel"] = (
        df["balance"].rolling(
            min_periods=1, window=len(df), center=False).max()
    )
    df["drawdown"] = df["balance"] - df["highlevel"]
    df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

    df.reset_index(drop=True, inplace=True)

    return df

 
 

统计整体策略效果
 

  1. 主要是一些统计指标的计算,如平均滑点,平均手续费,总成交次数,胜率,盈亏比,收益回撤比等等。
  2. 然后是画图,画出资金曲线图,每笔净盈亏图和净盈亏分布图

 

def statistics_trade_result(df, capital, show_chart=True):
    """"""
    end_balance = df["balance"].iloc[-1]
    max_drawdown = df["drawdown"].min()
    max_ddpercent = df["ddpercent"].min()

    pnl_medio = df["net_pnl"].mean()
    trade_count = len(df)
    duration_medio = df["duration"].mean().total_seconds()/3600
    commission_medio = df["commission"].mean()
    slipping_medio = df["slipping"].mean()

    win = df[df["net_pnl"] > 0]
    win_amount = win["net_pnl"].sum()
    win_pnl_medio = win["net_pnl"].mean()
    win_duration_medio = win["duration"].mean().total_seconds()/3600
    win_count = len(win)

    loss = df[df["net_pnl"] < 0]
    loss_amount = loss["net_pnl"].sum()
    loss_pnl_medio = loss["net_pnl"].mean()
    loss_duration_medio = loss["duration"].mean().total_seconds()/3600
    loss_count = len(loss)

    winning_rate = win_count / trade_count
    win_loss_pnl_ratio = - win_pnl_medio / loss_pnl_medio

    total_return = (end_balance / capital - 1) * 100
    return_drawdown_ratio = -total_return / max_ddpercent

    output(f"起始资金:\t{capital:,.2f}")
    output(f"结束资金:\t{end_balance:,.2f}")
    output(f"总收益率:\t{total_return:,.2f}%")
    output(f"最大回撤: \t{max_drawdown:,.2f}")
    output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
    output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")

    output(f"总成交次数:\t{trade_count}")
    output(f"盈利成交次数:\t{win_count}")
    output(f"亏损成交次数:\t{loss_count}")
    output(f"胜率:\t\t{winning_rate:,.2f}")
    output(f"盈亏比:\t\t{win_loss_pnl_ratio:,.2f}")

    output(f"平均每笔盈亏:\t{pnl_medio:,.2f}")
    output(f"平均持仓小时:\t{duration_medio:,.2f}")
    output(f"平均每笔手续费:\t{commission_medio:,.2f}")
    output(f"平均每笔滑点:\t{slipping_medio:,.2f}")

    output(f"总盈利金额:\t{win_amount:,.2f}")
    output(f"盈利交易均值:\t{win_pnl_medio:,.2f}")
    output(f"盈利持仓小时:\t{win_duration_medio:,.2f}")

    output(f"总亏损金额:\t{loss_amount:,.2f}")
    output(f"亏损交易均值:\t{loss_pnl_medio:,.2f}")
    output(f"亏损持仓小时:\t{loss_duration_medio:,.2f}")

    if not show_chart:
        return

    plt.figure(figsize=(10, 12))

    acum_pnl_plot = plt.subplot(3, 1, 1)
    acum_pnl_plot.set_title("Balance Plot")
    df["balance"].plot(legend=True)

    pnl_plot = plt.subplot(3, 1, 2)
    pnl_plot.set_title("Pnl Per Trade")
    df["net_pnl"].plot(legend=True)

    distribution_plot = plt.subplot(3, 1, 3)
    distribution_plot.set_title("Trade Pnl Distribution")
    df["net_pnl"].hist(bins=100)

    plt.show()


def output(msg):
    """
    Output message with datetime.
    """
    print(f"{datetime.now()}\t{msg}")

 
 
统计纯多头和纯空头交易

 

纯多头交易就是只有多开->空平的交易,而纯空头交易就是反过来。

 

为了筛选出纯多开交易,只要在DataFrame中判断其平仓方向的空的即可;纯空头交易则反过来,平仓方向为多。

 

def buy2sell(df, capital):
    """
    Generate DataFrame with only trade from buy to sell
    """
    buy2sell = df[df["close_direction"] == "空"]
    result = calculate_base_net_pnl(buy2sell, capital)
    return result


def short2cover(df, capital):
    """
    Generate DataFrame with only trade from short to cover
    """
    short2cover = df[df["close_direction"] == "多"]
    result = calculate_base_net_pnl(short2cover, capital)
    return result

 
 

整合所有计算步骤

 

最后,我们将上文中所有的函数进行整合,封装到单个函数中,用于实现策略回测效果的一键生成:
 

def exhaust_trade_result(
    trades, 
    size: int = 10, 
    rate: float = 0.0, 
    slippage: float = 0.0, 
    capital: int = 1000000,
    show_long_short_condition=True
    ):
    """
    Exhaust all trade result.
    """

    total_trades = generate_trade_df(trades, size, rate, slippage, capital)
    statistics_trade_result(total_trades, capital)

    if not show_long_short_condition:
        return
    long_trades = buy2sell(total_trades, capital)
    short_trades = short2cover(total_trades, capital)

    output("-----------------------")
    output("纯多头交易")
    statistics_trade_result(long_trades, capital)

    output("-----------------------")
    output("纯空头交易")
    statistics_trade_result(short_trades, capital)

description

 
 

最后附上完整的源代码
 

import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
pd.set_option('mode.chained_assignment', None)


def calculate_trades_result(trades):
    """
    Deal with trade data
    """
    dt, direction, offset, price, volume = [], [], [], [], []
    for i in trades.values():
        dt.append(i.datetime)
        direction.append(i.direction.value)
        offset.append(i.offset.value)
        price.append(i.price)
        volume.append(i.volume)

    # Generate DataFrame with datetime, direction, offset, price, volume
    df = pd.DataFrame()
    df["direction"] = direction
    df["offset"] = offset
    df["price"] = price
    df["volume"] = volume

    df["current_time"] = dt
    df["last_time"] = df["current_time"].shift(1)

    # Calculate trade amount
    df["amount"] = df["price"] * df["volume"]
    df["acum_amount"] = df["amount"].cumsum()

    # Calculate pos, net pos(with direction), acumluation pos(with direction)
    def calculate_pos(df):
        if df["direction"] == "多":
            result = df["volume"]
        else:
            result = - df["volume"]

        return result
    df["pos"] = df.apply(calculate_pos, axis=1)

    df["net_pos"] = df["pos"].cumsum()
    df["acum_pos"] = df["volume"].cumsum()

    # Calculate trade result, acumulation result
    # ej: trade result(buy->sell) means (new price - old price) * volume
    df["result"] = -1 * df["pos"] * df["price"]
    df["acum_result"] = df["result"].cumsum()

    # Filter column data when net pos comes to zero
    def get_acum_trade_result(df):
        if df["net_pos"] == 0:
            return df["acum_result"]
    df["acum_trade_result"] = df.apply(get_acum_trade_result, axis=1)

    def get_acum_trade_volume(df):
        if df["net_pos"] == 0:
            return df["acum_pos"]
    df["acum_trade_volume"] = df.apply(get_acum_trade_volume, axis=1)   

    def get_acum_trade_duration(df):
        if df["net_pos"] == 0:
            return df["current_time"] - df["last_time"]
    df["acum_trade_duration"] = df.apply(get_acum_trade_duration, axis=1)  

    def get_acum_trade_amount(df):
        if df["net_pos"] == 0:
            return df["acum_amount"]
    df["acum_trade_amount"] = df.apply(get_acum_trade_amount, axis=1) 

    # Select row data with net pos equil to zero     
    df = df.dropna()

    return df


def generate_trade_df(trades, size, rate, slippage, capital):
    """
    Calculate trade result from increment
    """
    df = calculate_trades_result(trades)

    trade_df = pd.DataFrame()
    trade_df["close_direction"] = df["direction"]
    trade_df["close_time"] = df["current_time"]
    trade_df["close_price"] = df["price"]
    trade_df["pnl"] = df["acum_trade_result"] - \
        df["acum_trade_result"].shift(1).fillna(0)

    trade_df["volume"] = df["acum_trade_volume"] - \
        df["acum_trade_volume"].shift(1).fillna(0)
    trade_df["duration"] = df["current_time"] - \
        df["last_time"]
    trade_df["turnover"] = df["acum_trade_amount"] - \
        df["acum_trade_amount"].shift(1).fillna(0)

    trade_df["commission"] = trade_df["turnover"] * rate
    trade_df["slipping"] = trade_df["volume"] * size * slippage

    trade_df["net_pnl"] = trade_df["pnl"] - \
        trade_df["commission"] - trade_df["slipping"]

    result = calculate_base_net_pnl(trade_df, capital)
    return result


def calculate_base_net_pnl(df, capital):
    """
    Calculate statistic base on net pnl
    """
    df["acum_pnl"] = df["net_pnl"].cumsum()
    df["balance"] = df["acum_pnl"] + capital
    df["return"] = np.log(
        df["balance"] / df["balance"].shift(1)
        ).fillna(0)
    df["highlevel"] = (
        df["balance"].rolling(
            min_periods=1, window=len(df), center=False).max()
    )
    df["drawdown"] = df["balance"] - df["highlevel"]
    df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

    df.reset_index(drop=True, inplace=True)

    return df


def buy2sell(df, capital):
    """
    Generate DataFrame with only trade from buy to sell
    """
    buy2sell = df[df["close_direction"] == "空"]
    result = calculate_base_net_pnl(buy2sell, capital)
    return result


def short2cover(df, capital):
    """
    Generate DataFrame with only trade from short to cover
    """
    short2cover = df[df["close_direction"] == "多"]
    result = calculate_base_net_pnl(short2cover, capital)
    return result


def statistics_trade_result(df, capital, show_chart=True):
    """"""
    end_balance = df["balance"].iloc[-1]
    max_drawdown = df["drawdown"].min()
    max_ddpercent = df["ddpercent"].min()

    pnl_medio = df["net_pnl"].mean()
    trade_count = len(df)
    duration_medio = df["duration"].mean().total_seconds()/3600
    commission_medio = df["commission"].mean()
    slipping_medio = df["slipping"].mean()

    win = df[df["net_pnl"] > 0]
    win_amount = win["net_pnl"].sum()
    win_pnl_medio = win["net_pnl"].mean()
    win_duration_medio = win["duration"].mean().total_seconds()/3600
    win_count = len(win)

    loss = df[df["net_pnl"] < 0]
    loss_amount = loss["net_pnl"].sum()
    loss_pnl_medio = loss["net_pnl"].mean()
    loss_duration_medio = loss["duration"].mean().total_seconds()/3600
    loss_count = len(loss)

    winning_rate = win_count / trade_count
    win_loss_pnl_ratio = - win_pnl_medio / loss_pnl_medio

    total_return = (end_balance / capital - 1) * 100
    return_drawdown_ratio = -total_return / max_ddpercent

    output(f"起始资金:\t{capital:,.2f}")
    output(f"结束资金:\t{end_balance:,.2f}")
    output(f"总收益率:\t{total_return:,.2f}%")
    output(f"最大回撤: \t{max_drawdown:,.2f}")
    output(f"百分比最大回撤: {max_ddpercent:,.2f}%")
    output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")

    output(f"总成交次数:\t{trade_count}")
    output(f"盈利成交次数:\t{win_count}")
    output(f"亏损成交次数:\t{loss_count}")
    output(f"胜率:\t\t{winning_rate:,.2f}")
    output(f"盈亏比:\t\t{win_loss_pnl_ratio:,.2f}")

    output(f"平均每笔盈亏:\t{pnl_medio:,.2f}")
    output(f"平均持仓小时:\t{duration_medio:,.2f}")
    output(f"平均每笔手续费:\t{commission_medio:,.2f}")
    output(f"平均每笔滑点:\t{slipping_medio:,.2f}")

    output(f"总盈利金额:\t{win_amount:,.2f}")
    output(f"盈利交易均值:\t{win_pnl_medio:,.2f}")
    output(f"盈利持仓小时:\t{win_duration_medio:,.2f}")

    output(f"总亏损金额:\t{loss_amount:,.2f}")
    output(f"亏损交易均值:\t{loss_pnl_medio:,.2f}")
    output(f"亏损持仓小时:\t{loss_duration_medio:,.2f}")

    if not show_chart:
        return

    plt.figure(figsize=(10, 12))

    acum_pnl_plot = plt.subplot(3, 1, 1)
    acum_pnl_plot.set_title("Balance Plot")
    df["balance"].plot(legend=True)

    pnl_plot = plt.subplot(3, 1, 2)
    pnl_plot.set_title("Pnl Per Trade")
    df["net_pnl"].plot(legend=True)

    distribution_plot = plt.subplot(3, 1, 3)
    distribution_plot.set_title("Trade Pnl Distribution")
    df["net_pnl"].hist(bins=100)

    plt.show()


def output(msg):
    """
    Output message with datetime.
    """
    print(f"{datetime.now()}\t{msg}")


def exhaust_trade_result(
    trades, 
    size: int = 10, 
    rate: float = 0.0, 
    slippage: float = 0.0, 
    capital: int = 1000000,
    show_long_short_condition=True
    ):
    """
    Exhaust all trade result.
    """

    total_trades = generate_trade_df(trades, size, rate, slippage, capital)
    statistics_trade_result(total_trades, capital)

    if not show_long_short_condition:
        return
    long_trades = buy2sell(total_trades, capital)
    short_trades = short2cover(total_trades, capital)

    output("-----------------------")
    output("纯多头交易")
    statistics_trade_result(long_trades, capital)

    output("-----------------------")
    output("纯空头交易")
    statistics_trade_result(short_trades, capital)

 
 
了解更多知识,请关注vn.py社区公众号。
 
description

Member
avatar
加入于:
帖子: 8
声望: 0

很有用,有计划加入到放到后续的版本中吗?

Member
avatar
加入于:
帖子: 260
声望: 4

胜率和盈亏比也是比较重要的分析指标,vnpy论坛有很多粉丝再提有这个需求,是否会加入到正式版本哦

Member
avatar
加入于:
帖子: 59
声望: 0

Expected type 'function', got '(df: {getitem}) -> Any' instead 这个警告是什么意思

def get_acum_trade_result(df):
    if df["net_pos"] == 0:
        return df["acum_result"]
df["acum_trade_result"] = df.apply(get_acum_trade_result, axis=1)

类似这种代码老是报这种警告。求解。是我python版本的问题吗。

Member
avatar
加入于:
帖子: 4
声望: 0

嗯,感觉做得挺大的啊,但是为什么运营没有跟上呢,后面网友的问题也没有回复。社区不活跃呢

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】