Spaces:
Running
Running
import processing | |
from datetime import datetime, timedelta | |
import panel as pn | |
import pandas as pd | |
import hvplot.pandas # noqa | |
import plotly.express as px | |
import numpy as np | |
import hvplot.pandas # noqa | |
from panel.viewable import Viewer | |
import param | |
from script import styling | |
from script import description | |
import plotly.graph_objs as go | |
# import warnings | |
pn.extension('mathjax') | |
pn.extension('plotly') | |
class TotalReturnCard(Viewer): | |
start_date = param.Parameter() | |
end_date = param.Parameter() | |
b_stock_df = param.Parameter() | |
p_stock_df = param.Parameter() | |
def format_number(self, num): | |
return f'{round(num * 100, 2)}%' | |
def get_color(self, num): | |
return 'green' if num >= 0 else 'red' | |
def create_report(self): | |
# Calculate the risk, tracking error, active return | |
# get the result from entry with max time | |
most_recent_row = self.result.loc[self.result.time.idxmax()] | |
active_return = most_recent_row.active_return | |
tracking_error = most_recent_row.tracking_error | |
total_return = most_recent_row.weighted_return_p | |
mkt_cap = most_recent_row.cash | |
risk = most_recent_row.risk | |
# Calculate the total attribution | |
# attributes = processing.calculate_attributes_between_dates( | |
# self.start_date, self.end_date, self.p_stock_df, self.b_stock_df) | |
# total_attributes = attributes.aggregate({ | |
# 'interaction': 'sum', | |
# 'allocation': 'sum', | |
# 'selection': 'sum', | |
# 'active_return': 'sum', | |
# 'notional_return': 'sum' | |
# }) | |
# active_return_from_stock = total_attributes.active_return | |
# notional_return = total_attributes.notional_return | |
# interaction = total_attributes.interaction | |
# allocation = total_attributes.allocation | |
# selection = total_attributes.selection | |
# TODO dummy data | |
active_return_from_stock = 0 | |
notional_return = 0 | |
interaction = 0 | |
allocation = 0 | |
selection = 0 | |
# Create a function for text report | |
report = f""" | |
<style> | |
.compact-container {{ | |
display: flex; | |
flex-direction: column; | |
gap: 5px; | |
}} | |
.compact-container > div {{ | |
display: flex; | |
justify-content: space-between; | |
margin-bottom: 2px; | |
}} | |
.compact-container > div > h2, | |
.compact-container > div > h3, | |
.compact-container > div > p, | |
.compact-container > div > ul > li {{ | |
margin: 0; | |
}} | |
.compact-container > ul {{ | |
padding: 0; | |
margin: 0; | |
list-style-type: none; | |
}} | |
.compact-container > ul > li {{ | |
display: flex; | |
margin-bottom: 2px; | |
}} | |
</style> | |
<div class="compact-container"> | |
<u><b>总市值</b></u> | |
<div> | |
<h2 style="margin: 0;">¥{round(mkt_cap,2)}</h2> | |
<h2 style='color: {self.get_color(total_return)}; margin: 0;'>{self.format_number(total_return)}</h2> | |
</div> | |
<div> | |
<p style="margin: 0;">追踪误差</p> | |
<p style='color: {self.get_color(tracking_error)}; margin: 0;'>{self.format_number(tracking_error)}</p> | |
</div> | |
<div> | |
<p style="margin: 0;">风险</p> | |
<p style='color: {self.get_color(risk)}; margin: 0;'>{self.format_number(risk)}</p> | |
</div> | |
<div> | |
<p style="margin: 0;">归因</p> | |
<ul style="padding: 0; margin: 0; list-style-type: none;"> | |
<li style="margin-bottom: 2px;"> | |
<div style="display: flex;"> | |
<p style="margin: 0;">主动回报:</p> | |
<p style="color: {self.get_color(active_return)}; margin: 0;">{self.format_number(active_return)}</p> | |
</div> | |
</li> | |
<li style="margin-bottom: 2px;"> | |
<div style="display: flex;"> | |
<p style="margin: 0;">交互:</p> | |
<p style="color: {self.get_color(interaction)}; margin: 0;">{self.format_number(interaction)}</p> | |
</div> | |
</li> | |
<li style="margin-bottom: 2px;"> | |
<div style="display: flex;"> | |
<p style="margin: 0;">名义主动回报:</p> | |
<p style="color: {self.get_color(notional_return)}; margin: 0;">{self.format_number(notional_return)}</p> | |
</div> | |
</li> | |
<li style="margin-bottom: 2px;"> | |
<div style="display: flex;"> | |
<p style="margin: 0;">选择:</p> | |
<p style="color: {self.get_color(selection)}; margin: 0;">{self.format_number(selection)}</p> | |
</div> | |
</li> | |
<li style="margin-bottom: 2px;"> | |
<div style="display: flex;"> | |
<p style="margin: 0;">分配:</p> | |
<p style="color: {self.get_color(allocation)}; margin: 0;">{self.format_number(allocation)}</p> | |
</div> | |
</li> | |
</ul> | |
</div> | |
</div> | |
""" | |
return report | |
def _create_result_df(self, analytic_b, analytic_p): | |
''' | |
calculate weighted return, tracking error, risk for the whole portfolio | |
''' | |
return_b_df = processing.calculate_weighted_return( | |
analytic_b, self.start_date, self.end_date) | |
return_p_df = processing.calculate_weighted_return( | |
analytic_p, self.start_date, self.end_date) | |
# weighted pct | |
processing.calculate_weighted_pct(return_b_df) | |
processing.calculate_weighted_pct(return_p_df) | |
# not needed but to accomendate post processing | |
return_b_df['in_benchmark'] = True | |
return_p_df['in_portfolio'] = False | |
merged_df = pd.merge(return_b_df, return_p_df, on=[ | |
'ticker', 'time'], how='outer', suffixes=('_b', '_p')) | |
processing.post_process_merged_analytic_df(merged_df) | |
# fill emtpy weighted_return with 0 | |
# merged_df['weighted_return_b'] = merged_df['weighted_return_b'].fillna(0) | |
# merged_df['weighted_return_p'] = merged_df['weighted_return_p'].fillna(0) | |
# aggregate on date | |
result = merged_df.groupby('time').aggregate({'weighted_return_p': 'sum', | |
'weighted_return_b': 'sum', | |
"cash": 'sum', | |
'weighted_pct_p': 'sum', | |
'weighted_pct_b': 'sum', | |
}) | |
# active return | |
result['active_return'] = result.weighted_return_p - \ | |
result.weighted_return_b | |
result.sort_values('time', inplace=True) | |
# tracking error | |
result['tracking_error'] = result['active_return'].rolling( | |
len(result), min_periods=1).std() * np.sqrt(252) | |
# risk std of pct | |
result['risk'] = result['weighted_pct_b'].rolling( | |
len(result), min_periods=1).std() * np.sqrt(252) | |
# result.time = result.index | |
result.reset_index(inplace=True) | |
return result | |
def create_plot(self): | |
fig = px.line(self.result, y=[ | |
'weighted_return_p', 'weighted_return_b']) | |
fig.update_traces(mode="lines+markers", | |
marker=dict(size=5), line=dict(width=2)) | |
fig.update_layout(styling.plot_layout) | |
colname_to_name = { | |
'weighted_return_p': 'Portfolio回报', | |
'weighted_return_b': 'benchmark回报' | |
} | |
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), | |
legendgroup=colname_to_name.get( | |
t.name, t.name), | |
hovertemplate=t.hovertemplate.replace( | |
t.name, colname_to_name.get(t.name, t.name)) | |
)) | |
# fig.layout.autosize = True | |
return fig.to_dict() | |
def update(self): | |
self.result = self._create_result_df(self.p_stock_df, self.b_stock_df) | |
fig = self.create_plot() | |
report = self.create_report() | |
self.report.object = report | |
self.plot_pane.object = fig | |
def __init__(self, b_stock_df, p_stock_df, **params): | |
self.b_stock_df = b_stock_df | |
self.p_stock_df = p_stock_df | |
self._date_range = pn.widgets.DateRangeSlider( | |
start=p_stock_df.time.min(), | |
end=b_stock_df.time.max(), | |
value=(p_stock_df.time.max() - | |
timedelta(days=7), p_stock_df.time.max()) | |
) | |
self.start_date = self._date_range.value_start | |
self.end_date = self._date_range.value_end | |
self.result = self._create_result_df(b_stock_df, p_stock_df) | |
self.plot_pane = pn.pane.Plotly( | |
self.create_plot(), sizing_mode='stretch_width') | |
self.report = pn.pane.HTML( | |
self.create_report(), sizing_mode='stretch_width') | |
super().__init__(**params) | |
# self._sync_widgets() | |
def __panel__(self): | |
self._layout = pn.Card(self._date_range, self.report, self.plot_pane, | |
width=500, header=pn.Row(pn.pane.Str('投资组合总结'), | |
pn.widgets.TooltipIcon(value=description.summary_card))) | |
return self._layout | |
# @param.depends('value', 'width', watch=True) | |
# def _sync_widgets(self): | |
# pass | |
def _sync_params(self): | |
self.start_date = self._date_range.value[0] | |
self.end_date = self._date_range.value[1] | |
class DrawDownCard(Viewer): | |
selected_key_column = param.Parameter() | |
calcualted_p_stock = param.Parameter() | |
def __init__(self, calculated_p_stock, **params): | |
self.select = pn.widgets.Select( | |
name='Select', value='盈利', options=['盈利', '回报']) | |
self.calculated_p_stock = calculated_p_stock | |
self._sycn_params() | |
self.drawdown_plot = pn.pane.Plotly(self.plot_drawdown()) | |
super().__init__(**params) | |
def _sycn_params(self): | |
self.selected_key_column = 'cum_pnl' if self.select.value == '盈利' else 'weighted_return' | |
def _aggregate_by_sum(self): | |
# calculate weighted return | |
processed_df = processing.calculate_weighted_return( | |
self.calculated_p_stock) | |
agg_df = processed_df.groupby('time').aggregate({ | |
'weighted_return': 'sum', | |
'cash': 'sum', | |
'pnl': 'sum', | |
}) | |
# calcualte cum pnl | |
agg_df['cum_pnl'] = agg_df['pnl'].cumsum() | |
return agg_df | |
def calculate_drawdown(self): | |
agg_df = self._aggregate_by_sum() | |
df = processing.calculate_draw_down_on( | |
agg_df, self.selected_key_column) | |
df.reset_index(inplace=True) | |
return df | |
def plot_drawdown(self): | |
df = self.calculate_drawdown() | |
fig = px.line(df, x='time', y=['drawn_down']) | |
# add scatter to represetn new high | |
new_height_pnl = df[df[self.selected_key_column] == | |
df[f'rolling_max_{self.selected_key_column}']] | |
fig.add_trace(go.Scatter(x=new_height_pnl['time'], | |
y=new_height_pnl['drawn_down'], mode='markers', name='新的最高总回报')) | |
colname_to_name = { | |
'drawn_down': '回撤' | |
} | |
fig.update_layout(styling.plot_layout) | |
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), | |
legendgroup=colname_to_name.get( | |
t.name, t.name), | |
# hovertemplate=t.hovertemplate.replace( | |
# t.name, colname_to_name.get(t.name, t.name)) | |
)) | |
return fig | |
def update(self): | |
self.drawdown_plot.object = self.plot_drawdown().to_dict() | |
def __panel__(self): | |
self._layout = pn.Card( | |
self.select, | |
self.drawdown_plot, | |
header=pn.Row(pn.pane.Str('回撤分析')), | |
width=500 | |
) | |
return self._layout | |
class HistReturnCard(Viewer): | |
return_barplot = param.Parameterized() | |
calculated_b_stock = param.Parameterized() | |
calculated_p_stock = param.Parameterized() | |
select_resolution = param.ObjectSelector( | |
default='每周回报', objects=['每日回报', '每周回报', '每月回报', '每年回报']) | |
def _calculate_return(self, df, freq): | |
# start on tuesday, end on monday | |
grouped = df.groupby(pd.Grouper(key='time', freq=freq)) | |
agg_df = grouped.agg({'weighted_log_return': 'sum'}) | |
# time indicating the last end of the week | |
agg_df['time'] = agg_df.index | |
# convert cumulative log return to percentage return | |
agg_df['return'] = np.exp(agg_df['weighted_log_return']) - 1 | |
# return agg_df | |
return agg_df.reset_index(drop=True) | |
def update_aggregate_df(self): | |
freq = None | |
if self.select_resolution == "每日回报": | |
freq = "D" | |
elif self.select_resolution == "每月回报": | |
freq = 'M' | |
elif self.select_resolution == "每年回报": | |
freq = 'Y' | |
elif self.select_resolution == "每周回报": | |
freq = 'W-MON' | |
p_return = self._calculate_return(self.calculated_p_stock, freq) | |
b_return = self._calculate_return(self.calculated_b_stock, freq) | |
merge_df = pd.merge(p_return, b_return, on='time', | |
how='outer', suffixes=('_p', '_b')) | |
return merge_df | |
def create_attributes_barplot(self): | |
self.attribute_df = self._update_attributes_df() | |
fig = px.bar(self.attribute_df, x='period_str', y=[ | |
'allocation', 'selection', 'interaction', 'notional_active_return', 'active_return']) | |
colname_to_name = { | |
'allocation': '分配', | |
'selection': '选择', | |
'interaction': '交互', | |
'notional_active_return': '名义主动回报', | |
'active_return': '实际主动回报' | |
} | |
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), | |
legendgroup=colname_to_name.get( | |
t.name, t.name), | |
hovertemplate=t.hovertemplate.replace( | |
t.name, colname_to_name.get(t.name, t.name)) | |
)) | |
fig.update_layout(barmode='group', title='主动回报归因', | |
bargap=0.0, bargroupgap=0.0) | |
fig.update_layout(**styling.plot_layout) | |
fig.update_traces(**styling.barplot_trace) | |
return fig.to_dict() | |
def create_return_barplot(self): | |
self.agg_df = self.update_aggregate_df() | |
fig = px.bar(self.agg_df, x='time', y=[ | |
'return_p', 'return_b'], | |
barmode='overlay', | |
title='周期回报', | |
) | |
# update legend | |
colname_to_name = { | |
'return_p': 'portfolio回报率', | |
'return_b': 'benchmark回报率' | |
} | |
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), | |
legendgroup=colname_to_name.get( | |
t.name, t.name), | |
hovertemplate=t.hovertemplate.replace( | |
t.name, colname_to_name.get(t.name, t.name)) | |
)) | |
fig.update_layout(**styling.plot_layout) | |
fig.update_traces(**styling.barplot_trace) | |
return fig.to_dict() | |
def update(self): | |
return_barplot = self.create_return_barplot() | |
self.return_barplot.object = return_barplot | |
attributes_barplot = self.create_attributes_barplot() | |
self.attribute_barplot.object = attributes_barplot | |
def _update_attributes_df(self): | |
freq = None | |
if self.select_resolution == "每日回报": | |
freq = 'D' | |
elif self.select_resolution == "每月回报": | |
freq = 'M' | |
elif self.select_resolution == "每年回报": | |
freq = 'Y' | |
elif self.select_resolution == "每周回报": | |
freq = 'W-MON' | |
agg_p = processing.aggregate_analytic_df_by_period( | |
self.calculated_p_stock, freq) | |
agg_b = processing.aggregate_analytic_df_by_period( | |
self.calculated_b_stock, freq) | |
bhb_df = processing.calculate_periodic_BHB(agg_p, agg_b) | |
agg_bhb = processing.aggregate_bhb_df(bhb_df) | |
agg_bhb['period_str'] = agg_bhb.index.map(lambda x: str(x)) | |
return agg_bhb | |
def __init__(self, calculated_p_stock, calculated_b_stock, **params): | |
self.calculated_p_stock = calculated_p_stock | |
self.calculated_b_stock = calculated_b_stock | |
self._range_slider = pn.widgets.DateRangeSlider( | |
name='Date Range Slider', | |
start=self.calculated_p_stock.time.min(), end=self.calculated_p_stock.time.max(), | |
value=(self.calculated_p_stock.time.min(), | |
self.calculated_p_stock.time.max()), | |
) | |
self.return_barplot = pn.pane.Plotly(self.create_return_barplot()) | |
self.attribute_barplot = pn.pane.Plotly( | |
self.create_attributes_barplot()) | |
super().__init__(**params) | |
def __panel__(self): | |
self._layout = pn.Card(pn.Param(self.param.select_resolution, name='选择周期'), | |
self.return_barplot, self.attribute_barplot, width=500, header=pn.Row(pn.pane.Str('周期回报'), | |
pn.widgets.TooltipIcon(value=description.periodic_return_report))) | |
return self._layout | |
class PortfolioComposationCard(Viewer): | |
p_stock_df = param.Parameterized() | |
selected_date = param.Parameterized() | |
def create_cash_position_df(self): | |
aggregate_df = self.p_stock_df.groupby('time', as_index=False).agg({ | |
'cash': 'sum' | |
}) | |
aggregate_df['type'] = 'portfolio' | |
not_in_portfolio_df = aggregate_df.copy() | |
not_in_portfolio_df['type'] = 'not_in_portfolio' | |
not_in_portfolio_df['cash'] = 1000 | |
# append df | |
aggregate_df = pd.concat([aggregate_df, not_in_portfolio_df]) | |
# sort | |
aggregate_df.sort_values(by=['time'], inplace=True) | |
return aggregate_df[aggregate_df.time.between(self.date_range.value[0], self.date_range.value[1])] | |
def update_trend_plot(self): | |
self.trend_plot.object = self.create_trend_plot() | |
def create_trend_plot(self): | |
aggregate_df = self.create_cash_position_df() | |
fig = px.bar(aggregate_df, x='time', y='cash', color='type') | |
fig.update_layout(legend=dict( | |
orientation="h", | |
yanchor="bottom", | |
y=1.02, | |
xanchor="right", | |
x=1 | |
)) | |
fig.update_traces( | |
marker_line_width=0, | |
selector=dict(type="bar")) | |
fig.update_layout(bargap=0, | |
bargroupgap=0, | |
) | |
fig.update_layout(uniformtext_minsize=8, uniformtext_mode='hide', | |
yaxis_title=None, xaxis_title=None, | |
margin=dict(l=0, r=0, t=0, b=0)) | |
return fig.to_dict() | |
def create_treemap(self): | |
self.selected_df = self.p_stock_df[self.p_stock_df.time == | |
self.datetime_picker.value] | |
self.selected_df['position'] = '股票' | |
not_in_portfolio_row = pd.DataFrame({ | |
'display_name': ['闲置'], | |
'position': ['闲置'], | |
'aggregate_sector': ['闲置'], | |
'cash': [100], | |
'weighted_return': [0] | |
}) | |
df = pd.concat([self.selected_df, not_in_portfolio_row], | |
ignore_index=True) | |
fig = px.treemap(df, | |
# path=[px.Constant('cash_position'), 'position', | |
# 'aggregate_sector', 'display_name'], | |
path=['position', 'aggregate_sector', 'display_name'], | |
values='cash', | |
color='weighted_return', | |
hover_data=['weighted_return', 'cash'], | |
color_continuous_scale='RdBu', | |
color_continuous_midpoint=np.average( | |
df['weighted_return']) | |
) | |
fig.update_layout(styling.plot_layout) | |
fig.update_layout(coloraxis_colorbar=dict( | |
title="累计加权回报率")) | |
# colname_to_name = { | |
# 'cash_position': '现金分布', | |
# 'portfolio_return': '加权回报', | |
# 'not_in_portfolio': '不在portfolio中', | |
# 'current_weight': '现金', | |
# } | |
# fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), | |
# hovertemplate=t.hovertemplate.replace( | |
# t.name, colname_to_name.get(t.name, t.name)) | |
# )) | |
return fig.to_dict() | |
def __init__(self, analytic_df, **params): | |
self.p_stock_df = analytic_df | |
self.p_stock_df = processing.calculate_weighted_return(self.p_stock_df, | |
start=self.p_stock_df.time.min(), | |
end=self.p_stock_df.time.max()) | |
# convert to datetime to date | |
enabled_dates = [time.date() for time in self.p_stock_df.time.unique()] | |
self.datetime_picker = pn.widgets.DatetimePicker(name='选择某日资金分布', | |
start=self.p_stock_df.time.min(), | |
end=self.p_stock_df.time.max(), | |
value=self.p_stock_df.time.max(), | |
enabled_dates=enabled_dates, | |
) | |
self.date_range = pn.widgets.DateRangeSlider(name='选择资金分布走势区间', | |
start=self.p_stock_df.time.min(), | |
end=self.p_stock_df.time.max(), | |
value=(self.p_stock_df.time.min( | |
), self.p_stock_df.time.max()), | |
) | |
self.tree_plot = pn.pane.Plotly(self.create_treemap()) | |
self.trend_plot = pn.pane.Plotly(self.create_trend_plot()) | |
# calculate money position | |
super().__init__(**params) | |
def __panel__(self): | |
self._layout = pn.Card(self.datetime_picker, self.tree_plot, self.date_range, self.trend_plot, | |
width=500, header=pn.pane.Str('资金分布')) | |
return self._layout | |
def update(self): | |
tree_plot = self.create_treemap() | |
self.tree_plot.object = tree_plot | |
class BestAndWorstStocks(Viewer): | |
start_date = param.Parameter() | |
end_date = param.Parameter() | |
hidden_col = [ | |
'index', | |
'open', | |
'high', | |
'low', | |
'close', | |
'volume', | |
'money', | |
'pct', | |
'sector', | |
'aggregate_sector', | |
'ave_price', | |
'weight', | |
'ini_w', | |
'name', | |
'pnl' | |
] | |
forzen_columns = ['display_name', 'return', 'cum_pnl', 'shares'] | |
description = "股票表现排名" | |
tooltip = "在一个时间窗口中累计盈利最高和最低的股票,包括已经卖出的股票,如果表格的日期小于窗口的结束时间代表已经卖出" | |
def create_tabulator(self, df): | |
col_title_map = { | |
'display_name': '股票名称', | |
'ticker': '股票代码', | |
'time': '日期', | |
'return': '回报率', | |
'sector': '行业', | |
'shares': '持仓', | |
'cash': '现金', | |
'cum_pnl': '累计盈利', | |
} | |
return pn.widgets.Tabulator(df, sizing_mode='stretch_width', | |
hidden_columns=self.hidden_col, | |
frozen_columns=self.forzen_columns, | |
titles=col_title_map | |
) | |
def update(self): | |
result_df = self.get_processed_df() | |
self.best_5_tabulator.value = result_df.head(5) | |
self.worst_5_tabulator.value = result_df.tail(5) | |
def _get_cum_return(self, df): | |
'''return a df contain cumulative return at the end date''' | |
result_df = processing.calcualte_return(df=df, | |
start=self.start_date, | |
end=self.end_date) | |
grouped = result_df.groupby('ticker') | |
last_row = result_df.loc[grouped.time.idxmax()] | |
return last_row | |
def get_processed_df(self): | |
''' | |
calculate attributes and return a sorted dataframe on weighted return | |
''' | |
df = processing.calculate_cum_pnl(self.analytic_df, | |
start=self.start_date, | |
end=self.end_date) | |
df = self._get_cum_return(df) | |
return df.sort_values(by='cum_pnl', ascending=False) | |
def __init__(self, analytic_df, **params): | |
self.analytic_df = analytic_df | |
self._date_range = pn.widgets.DateRangeSlider( | |
name='选择计算回报的时间区间', | |
start=self.analytic_df.time.min(), | |
end=self.analytic_df.time.max(), | |
value=(self.analytic_df.time.max() - | |
timedelta(days=7), self.analytic_df.time.max()) | |
) | |
self.start_date = self._date_range.value_start | |
self.end_date = self._date_range.value_end | |
result_df = self.get_processed_df() | |
self.best_5_tabulator = self.create_tabulator(result_df.head(5)) | |
self.worst_5_tabulator = self.create_tabulator(result_df.tail(5)) | |
super().__init__(**params) | |
def _sync_params(self): | |
self.start_date = self._date_range.value[0] | |
self.end_date = self._date_range.value[1] | |
# print('update range...') | |
def __panel__(self): | |
self._layout = pn.Card(self._date_range, | |
pn.pane.Str('加权回报率最高回报5只股票'), | |
self.best_5_tabulator, | |
pn.pane.Str('加权回报率最低回报5只股票'), | |
self.worst_5_tabulator, | |
max_width=500, | |
header=pn.Row( | |
pn.pane.Str(self.description), | |
pn.widgets.TooltipIcon(value=self.tooltip) | |
) | |
) | |
return self._layout | |
class TopHeader(Viewer): | |
''' | |
display up to todays' PnL, total return and max drawdown | |
''' | |
eval_df = param.Parameter() | |
def update(self): | |
''' | |
update Pnl, total return and max drawdown when df is updated | |
''' | |
return | |
def _process(self): | |
'''calculate accumulative pnl, total return and Max Drawdown on return''' | |
# return | |
result_df = processing.calculate_weighted_return(self.eval_df) | |
# merge by date | |
agg_df = result_df.groupby('time').aggregate({ | |
'weighted_return': 'sum', | |
'cash': 'sum', | |
'pnl': 'sum', | |
}) | |
agg_df.reset_index(inplace=True) | |
# accumulative pnl | |
agg_df['cum_pnl'] = agg_df['pnl'].cumsum() | |
# calcualte drawdown | |
result = processing.calculate_draw_down_on(agg_df) | |
max_draw_down = result.drawn_down.min() | |
# last row | |
last_row = agg_df.loc[agg_df.time.idxmax()] | |
return last_row.cum_pnl, last_row.weighted_return, max_draw_down | |
def create_report(self, pnl, total_return, max_drawdown): | |
return pn.FlexBox( | |
f"PnL:{round(pnl,2)}¥", f"回报:{round(total_return * 100,2)}%", f'最大回撤:{round(max_drawdown * 100,2)}%', justify_content='space-evenly') | |
def __init__(self, eval_df, **params): | |
self.eval_df = eval_df | |
cum_pnl, total_return, max_drawdown = self._process() | |
self.report = self.create_report(cum_pnl, total_return, max_drawdown) | |
super().__init__(**params) | |
def __panel__(self): | |
self._layout = pn.Card(self.report, sizing_mode='stretch_width') | |
return self._layout | |