利用之前文章写的保存成交order,借鉴vnpy已有的回测分析报表,生成一个实盘交易分析报表。
截图如下。
这里有个统计按钮,点击输出统计界面
因为我的代码改动不少,直接拿来用估计不合适,只做参考。
首先要自行修改database的代码,新增按照strategy名称,和时间段读取保存到数据的order,按照时许正向返回。
读取出来是一个order的队列,下面代码可以放在cta_engine.py, 是把开仓价和开仓时间作为属性附加到平仓单,输出平仓单队列。对于没有成对的,或者跨期的品种配对要抛弃;这里在做移仓操作时候,要针对策略,生成虚拟的平仓和开新仓的订单,具体实现代码后面再移仓代码中说。
def convert_strategy_triggered_order(self,strategy_name,open_date = datetime(2001,10,10), close_date = datetime(2100,10,10)):
result = database_manager.load_triggered_stop_order_data(strategy_name,open_date,close_date)
if result:
matchedOrderList = []
last_order = None
for order in result:
if order.offset == Offset.OPEN:
last_order = copy(order)
elif last_order != None and order.offset == Offset.CLOSE:
if last_order.vt_symbol != order.vt_symbol:
last_order = None
continue
order.open_datetime= last_order.datetime
order.open_price = last_order.average_price
matchedOrderList.append(copy(order))
last_order = None
return matchedOrderList
然后下面代码是处理的平仓单队列进行分析,计算出各种指标,输出为DataFrame格式,这里包括收益金额,收益率,单位收益率,回撤,账面价值等。
def get_strategy_triggered_order(self,strategy_name):
# result = database_manager.load_close_triggered_stop_order_data(strategy_name)
result = self.convert_strategy_triggered_order(strategy_name)
objectDF = DataFrame(data=None,columns=["开仓日期","平仓日期", "开仓方向", "开仓价", "手数", "平仓价", "收益"],dtype=object)
if result:
parameters = self.get_strategy_parameters(strategy_name)
# 策略中定义的合约价值
HeYueJiaZhi = parameters["HeYueJiaZhi"]
# 策略中定义的品种每笔数量
HeYueChengShu = parameters["HeYueChengShu"]
for close_data in result:
if close_data.direction == Direction.LONG:
close_data.direction = Direction.SHORT
else:
close_data.direction = Direction.LONG
objectDF.loc[len(objectDF) + 1] = [close_data.vt_orderids[0].replace(tzinfo=None),close_data.datetime.replace(tzinfo=None), close_data.direction, close_data.open_price,close_data.volume, close_data.average_price,0.0]
objectDF["收益"] = objectDF.apply(lambda x: x['开仓价'] - x['平仓价'] if x['开仓方向'] == Direction.SHORT else x['平仓价'] - x['开仓价'],
axis=1)
objectDF["UnitReturn"] = objectDF["收益"] * 100 / objectDF['开仓价']
objectDF["收益"] = objectDF["收益"]*HeYueChengShu*objectDF["手数"]
objectDF["balance"] = objectDF["收益"].cumsum() + HeYueJiaZhi
objectDF["return"] = objectDF["收益"]*100/HeYueJiaZhi
objectDF.loc[0] = copy(objectDF.iloc[0])
objectDF = objectDF.sort_index()
objectDF.loc[0,"balance"] = HeYueJiaZhi
objectDF["highlevel"] = (
objectDF["balance"].rolling(
min_periods=1, window=len(objectDF), center=False).max()
)
objectDF.drop(index=0, inplace=True)
objectDF["drawdown"] = objectDF["balance"] - objectDF["highlevel"]
objectDF["ddpercent"] = objectDF["drawdown"] / objectDF["highlevel"] * 100
return objectDF
下面代码是分析dataframe,计算指标数据
def calculate_statistics(self, objectDF):
"""
"""
data = {}
# end_balance = df["balance"].iloc[-1]
# max_drawdown = df["drawdown"].min()
# max_ddpercent = df["ddpercent"].min()
HeYueJiaZhi = self._data["parameters"]["HeYueJiaZhi"]
data["capital"] = HeYueJiaZhi
data["total_net_pnl"] = objectDF["收益"].sum()
data["end_balance"] = objectDF["balance"].iloc[-1]
data["total_return"] = data["total_net_pnl"]*100/max(HeYueJiaZhi,1)
data["max_drawdown"] = objectDF["drawdown"].min()
data["max_ddpercent"] = objectDF["ddpercent"].min()
data["total_trade_count"] = len(objectDF)
data["winningResult"] = len(objectDF[objectDF["收益"] >0])
data["losingResult"] = len(objectDF[objectDF["收益"] <0])
data["winningRate"] = data["winningResult"] *100/ data["total_trade_count"]
data["totalWinning"] = objectDF[objectDF["收益"] >0]["收益"].sum()
data["totalLosing"] = objectDF[objectDF["收益"] <0]["收益"].sum()
data["averageWinning"] = data["totalWinning"]/max(1,data["winningResult"])
data["averageLosing"] = data["totalLosing"]/max(1,data["losingResult"])
data["perprofitLoss"] = data["total_net_pnl"] / data["total_trade_count"]
data["profitLossRatio"] = data["averageWinning"] / max(1,abs(data["averageLosing"]))
return data
然后在cta_widget.py 中,加入显示代码,这里直接使用回撤模块的BacktesterChart
def analyze_strategy(self):
objectDF = self.cta_engine.get_strategy_triggered_order(self.strategy_name)
if not objectDF.empty:
triggerd_statistics_monitor = Triggered_OrderStatisticsMonitor()
triggerd_statistics_monitor.set_data(self.calculate_statistics(objectDF))
triggerd_statistics_monitor.setMinimumHeight(400)
triggerd_view = TriggeredMonitor(self.cta_manager.main_engine, self.cta_manager.main_engine.event_engine)
triggerd_view.set_df(objectDF)
triggerd_view.setMinimumHeight(400)
triggerd_view.setMinimumWidth(600)
objectDF["net_pnl"] = objectDF["收益"]
objectDF= objectDF.set_index("平仓日期")
chart = BacktesterChart()
chart.set_data(objectDF)
analyz_dialog = QDialog()
analyz_dialog.setWindowTitle(self.strategy_name)
analyz_dialog.setWindowModality(Qt.NonModal)
analyz_dialog.setWindowFlags(Qt.Dialog | Qt.WindowMinMaxButtonsHint | Qt.WindowCloseButtonHint)
gbox = QtWidgets.QGridLayout()
analyz_dialog.resize(1200, 800)
gbox.addWidget(triggerd_statistics_monitor,0,0)
gbox.addWidget(triggerd_view,1,0)
gbox.addWidget(chart,0,1,2,1)
analyz_dialog.setLayout(gbox)
analyz_dialog.exec_()
class PercentCell(BaseCell):
def __init__(self, content: Any, data: Any):
super(PercentCell, self).__init__(content, data)
def set_content(self, content: Any, data: Any) -> None:
self.setText(f"{content:,.2f}%")
self._data = data
class fullDatetimeCell(BaseCell):
def __init__(self, content: Any, data: Any):
super(fullDatetimeCell, self).__init__(content, data)
def set_content(self, content: Any, data: Any) -> None:
if content is None:
return
timestamp = content.strftime("%Y%m%d %H:%M:%S")
self.setText(timestamp)
self._data = data
class TriggeredMonitor(BaseMonitor):
event_type = ""
data_key = ""
sorting = False
# ["平仓日期", "方向", "开仓价", "手数", "平仓价", "收益"]
headers = {
"开仓日期": {"display": "开仓日期", "cell": fullDatetimeCell, "update": False},
"平仓日期": {"display": "平仓日期", "cell": fullDatetimeCell, "update": False},
"开仓方向": {"display": "开仓方向", "cell": DirectionCell, "update": False},
"开仓价": {"display": "开仓价", "cell": BaseCell, "update": False},
"手数": {"display": "手数", "cell": BaseCell, "update": False},
"平仓价": {"display": "平仓价", "cell": BaseCell, "update": False},
"收益": {"display": "收益", "cell": PnlCell, "update": False},
"return": {"display": "收益率", "cell": PercentCell, "update": False},
"UnitReturn": {"display": "单位收益率", "cell": PercentCell, "update": False},
"balance": {"display": "当前资金", "cell": BaseCell, "update": False},
"drawdown": {"display": "回撤", "cell": BaseCell, "update": False}
}
def set_df(self,objectDF):
objectDF_list = objectDF.to_dict(orient='records')
for record_item in objectDF_list:
self.insert_data(record_item)
def insert_data(self, data):
self.insertRow(0)
for column, header in enumerate(self.headers.keys()):
setting = self.headers[header]
content = data[header]
cell = setting["cell"](content, data)
self.setItem(0, column, cell)
def __del__(self) -> None:
pass
class Triggered_OrderStatisticsMonitor(QtWidgets.QTableWidget):
KEY_NAME_MAP = {
"capital": "策略定义资金",
"end_balance": "历史结算资金",
"total_net_pnl": "总盈亏",
"total_return": "总收益率",
"total_trade_count": "总成交笔数",
"winningResult": "盈利次数",
"losingResult" : "亏损次数",
"winningRate": "笔数胜率",
"max_drawdown": "最大回撤",
"max_ddpercent": "最大回撤比率",
"totalWinning": "总盈利金额",
"totalLosing": "总亏损金额",
"perprofitLoss": "平均单笔损益",
"averageWinning": "盈利平均每笔",
"averageLosing" : "亏损平均每笔",
"profitLossRatio" : "盈亏比",
}
def __init__(self):
super().__init__()
self.cells = {}
self.init_ui()
def init_ui(self):
self.setRowCount(len(self.KEY_NAME_MAP))
self.setVerticalHeaderLabels(list(self.KEY_NAME_MAP.values()))
self.setColumnCount(1)
self.horizontalHeader().setVisible(False)
self.horizontalHeader().setSectionResizeMode(
QtWidgets.QHeaderView.Stretch
)
self.setEditTriggers(self.NoEditTriggers)
for row, key in enumerate(self.KEY_NAME_MAP.keys()):
cell = QtWidgets.QTableWidgetItem()
self.setItem(row, 0, cell)
self.cells[key] = cell
def clear_data(self):
for cell in self.cells.values():
cell.setText("")
def set_data(self, data: dict):
data["capital"] = f"{data['capital']:,.2f}"
data["end_balance"] = f"{data['end_balance']:,.2f}"
data["total_net_pnl"] = f"{data['total_net_pnl']:,.2f}"
data["total_return"] = f"{data['total_return']:,.2f}%"
data["total_trade_count"] = f"{data['total_trade_count']}"
data["winningResult"] = f"{data['winningResult']}"
data["losingResult"] = f"{data['losingResult']}"
data["winningRate"] = f"{data['winningRate']:,.2f}%"
data["max_drawdown"] = f"{data['max_drawdown']:,.2f}"
data["max_ddpercent"] = f"{data['max_ddpercent']:,.2f}%"
data["totalWinning"] = f"{data['totalWinning']:,.2f}"
data["totalLosing"] = f"{data['totalLosing']:,.2f}"
data["averageWinning"] = f"{data['averageWinning']:,.2f}"
data["averageLosing"] = f"{data['averageLosing']:,.2f}"
data["perprofitLoss"] = f"{data['perprofitLoss']:,.2f}"
data["profitLossRatio"] = f"{data['profitLossRatio']:,.2f}"
for key, cell in self.cells.items():
value = data.get(key, "")
cell.setText(str(value))