portfolio_management / appComponents.py
huggingface112's picture
move files to normal tracking except .db
976166f
raw
history blame
No virus
35.4 kB
from script import processing
from datetime import datetime, timedelta
import panel as pn
import pandas as pd
import hvplot.pandas # noqa
import plotly.express as px
import numpy as np
import hvplot.pandas # noqa
from panel.viewable import Viewer
import param
from script import styling
from script import description
import plotly.graph_objs as go
# import warnings
pn.extension('mathjax')
pn.extension('plotly')
pn.extension('plotly')
# warnings.filterwarnings("ignore", category=pd.core.common.SettingWithCopyWarning)
# overal performance default to 30 days
def create_portfolio_overview(df_list):
calculated_b_stock, calculated_p_stock, p_eval_df, sector_eval_df = df_list
range_slider = pn.widgets.DateRangeSlider(name='date range',
start=sector_eval_df.date.min(),
end=sector_eval_df.date.max(),
value=(sector_eval_df.date.max() - timedelta(days=30),
sector_eval_df.date.max()),
align='center',
sizing_mode='stretch_width',
)
size = dict(width=780)
option = dict(legend_position="left")
active_tools = dict(tools=['hover'], active_tools=[], axiswise=True)
# def create_overview_panel()
ip_eval_df = p_eval_df.interactive()
isector_eval_df = sector_eval_df.interactive()
# TODO convert below to a class
ranged_ip_eval_df = ip_eval_df[ip_eval_df.date.between(
range_slider.param.value_start, range_slider.param.value_end)]
ranged_isector_eval_df = isector_eval_df[isector_eval_df.date.between(
range_slider.param.value_start, range_slider.param.value_end)]
# return
return_plot = ranged_ip_eval_df.hvplot.line(x='date', y=['portfolio_return_p', 'portfolio_return_b'])\
.opts(title='投资组合总回报 v.s benchmark总回报', **size, **option)
# active return
active_return_plot = ranged_ip_eval_df.hvplot.line(x='date', y=['active_return'])\
.opts(title='每日主动回报', **size, axiswise=True)
# total risk and tracking error
risk_tracking_plot = ranged_ip_eval_df.hvplot.line(x='date', y=['risk', 'tracking_error'])\
.opts(title='风险和追踪误差', **size, **option)
# sector return
sector_return_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['portfolio_return_p'], by='aggregate_sector')\
.opts(title='投资组合各行业总回报', **size, **option)
# bsector_return_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['portfolio_return_b'], by='aggregate_sector')\
# .opts(title='benchmark sector return', **size, **option)
# sector active return
s_active_return_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['active_return'], by='aggregate_sector')\
.opts(title='投资组合各行业每日主动回报', **size, **option)
# sector risk and tracking error
s_risk_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['tracking_error'], by='aggregate_sector')\
.opts(title='投资组合各行业追踪误差', **size, **option)
s_tracking_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['risk'], by='aggregate_sector')\
.opts(title='投资组合各行业风险', **size, **option)
# attribute
def create_attribute_plot(start, end, calculated_b_stock, calculated_p_stock):
result = processing.calculate_attributes_between_dates(
start, end, calculated_b_stock, calculated_p_stock)
portfolio_attribute = result.aggregate({
'interaction': 'sum',
'allocation': 'sum',
'selection': 'sum',
})
layout = pn.Column(
pn.pane.DataFrame(portfolio_attribute.transpose()),
result.hvplot.bar(x='display_name_p',
y=['interaction', 'allocation', 'selection'],
shared_axes=False,
stacked=True,
rot=90).opts(**size, **option, title='投资组合总主动回报归因')
)
return layout
attribute_plot = pn.bind(create_attribute_plot,
start=range_slider.param.value_start,
end=range_slider.param.value_end,
calculated_b_stock=calculated_b_stock,
calculated_p_stock=calculated_p_stock)
# stock performance
# selected_p_stock = calculated_p_stock[calculated_p_stock.date ==
# calculated_p_stock.date.max()]
# stock_radar_plot = go.Figure()
# category = ['return', 'risk', 'portfolio_return', 'prev_w_in_p']
# for display_name, group in selected_p_stock.groupby('display_name'):
# stock_radar_plot.add_trace(go.Scatterpolar(
# r=group[category].values[0],
# theta=category,
# fill='toself',
# name=display_name
# ))
total_view_plots = pn.Column(return_plot.opts(**active_tools).output(),
risk_tracking_plot.opts(
**active_tools).output(),
active_return_plot.opts(
**active_tools).output(),
attribute_plot,
height=1000,
scroll=True)
sector_view_plots = pn.Column(sector_return_plot.opts(**active_tools).output(),
s_risk_plot.opts(**active_tools).output(),
s_tracking_plot.opts(
**active_tools).output(),
s_active_return_plot.opts(
**active_tools).output(),
height=1000,
scroll=True)
return pn.Column(
# pn.Row(align='center'),
range_slider,
pn.Row(total_view_plots, sector_view_plots, align='center'))
def attribution_view(daily_bnb_result, daily_sector_bnb_result, p_eval_df):
p_eval_df.date = pd.to_datetime(p_eval_df.date)
daily_bnb_result.date = pd.to_datetime(daily_bnb_result.date)
daily_sector_bnb_result.date = pd.to_datetime(daily_sector_bnb_result.date)
# interactive widget
dt_range = pn.widgets.DateRangeSlider(start=p_eval_df.date.min(
), end=p_eval_df.date.max(), value=(p_eval_df.date.min(), p_eval_df.date.max()))
# total attribution and return
p_eval_df_i = p_eval_df.interactive()
daily_bnb_result_i = daily_bnb_result.interactive()
daily_return_plot = p_eval_df_i[(p_eval_df_i.date >= dt_range.param.value_start) & (
p_eval_df_i.date <= dt_range.param.value_end)].hvplot(x='date', y=['portfolio_return_p', 'portfolio_return_b'], title='投资组合总回报 v.s benchmark总回报').output()
daily_bnb_plot = daily_bnb_result_i[daily_bnb_result_i.date.between(dt_range.param.value_start, dt_range.param.value_end)]\
.hvplot.bar(x='date', y=['allocation', 'selection', 'interaction', "active_return"], stacked=True, title='每日主动收益归因', yformatter='%.2f', xlabel='日期', shared_axes=False).output()
# return
daily_sector_bnb_df_i = daily_sector_bnb_result.interactive()
selected_range_df = daily_sector_bnb_df_i[daily_sector_bnb_df_i.date.between(
dt_range.param.value_start, dt_range.param.value_end)]
sector_active_return_plot = selected_range_df.hvplot.line(
x='date', y='active_return', by='aggregate_sector', width=1000, height=400, title='投资组合行业每日主动回报').output()
# attribution
def plot_attribute_by_sector(sector):
selected_sector_df = selected_range_df[selected_range_df.aggregate_sector == sector]
return selected_sector_df.hvplot.bar(x='date', y=['active_return', 'allocation', 'selection', 'interaction'], title='投资组合行业每日主动收入归因', stacked=True, shared_axes=False).output()
sector_attr_plot_tabs = pn.Tabs(*[(sector, plot_attribute_by_sector(sector))
for sector in daily_sector_bnb_result.aggregate_sector.unique()], dymacic=True)
# layout
sector_view = pn.Column(sector_attr_plot_tabs, sector_active_return_plot)
total_view = pn.Column(daily_return_plot, daily_bnb_plot)
return pn.Column(
pn.Row(dt_range),
pn.Row(total_view, sector_view)
)
# plot explore
def create_hvplot_explore(calculated_b_stock, calculated_p_stock, p_eval_df, sector_eval_df, attribution_result_df, s_attribution_result_df):
options = ['calculated_b_stock', 'calculated_p_stock', 'p_eval_df',
'sector_eval_df', 'attribution_result_df', 's_attribution_result_df']
name_to_df = {
'calculated_b_stock': calculated_b_stock,
'calculated_p_stock': calculated_p_stock,
'p_eval_df': p_eval_df,
'sector_eval_df': sector_eval_df,
'attribution_result_df': attribution_result_df,
's_attribution_result_df': s_attribution_result_df
}
selector = pn.widgets.Select(
name='Select', options=options, value=options[0])
def create_exploer(name):
df = name_to_df[name]
explorer = hvplot.explorer(df)
def plot_code(**kwargs):
code = f'```python\n{explorer.plot_code()}\n```'
return pn.pane.Markdown(code, sizing_mode='stretch_width')
pn.Column(
explorer,
'**Code**:',
pn.bind(plot_code, **explorer.param.objects())
)
return explorer
def create_perspective(name):
df = name_to_df[name]
return pn.pane.Perspective(df, columns=list(df.columns), width=1500, height=800)
perspective = pn.bind(create_perspective, name=selector)
exploer = pn.bind(create_exploer, name=selector)
exploer_component = pn.Column(selector, exploer, perspective)
return exploer_component
class TotalReturnCard(Viewer):
value = param.Range(doc="A numeric range.")
width = param.Integer(default=300)
start_date = param.Parameter()
end_date = param.Parameter()
eval_df = param.Parameter()
b_stock_df = param.Parameter()
p_stock_df = param.Parameter()
selected_df = param.Parameter()
plot_pane = param.Parameter()
report = param.Parameter()
def format_number(self, num):
return f'{round(num * 100, 2)}%'
def get_color(self, num):
return 'green' if num >= 0 else 'red'
def create_report(self):
# Calculate the total return and risk
result = processing.calculate_return(
self.eval_df, self.start_date, self.end_date)
most_recent_row = result.tail(1)
active_return = most_recent_row.active_return.values[0]
tracking_error = result.active_return.std() * np.sqrt(252)
total_return = most_recent_row.return_p.values[0]
mkt_cap = most_recent_row.mkt_cap.values[0]
risk = result['return_b'].std() * np.sqrt(252)
# Calculate the total attribution
attributes = processing.calculate_attributes_between_dates(
self.start_date, self.end_date, self.p_stock_df, self.b_stock_df)
total_attributes = attributes.aggregate({
'interaction': 'sum',
'allocation': 'sum',
'selection': 'sum',
'active_return': 'sum',
'notional_return': 'sum'
})
active_return_from_stock = total_attributes.active_return
notional_return = total_attributes.notional_return
interaction = total_attributes.interaction
allocation = total_attributes.allocation
selection = total_attributes.selection
# Create a function for text report
report = f"""
<style>
.compact-container {{
display: flex;
flex-direction: column;
gap: 5px;
}}
.compact-container > div {{
display: flex;
justify-content: space-between;
margin-bottom: 2px;
}}
.compact-container > div > h2,
.compact-container > div > h3,
.compact-container > div > p,
.compact-container > div > ul > li {{
margin: 0;
}}
.compact-container > ul {{
padding: 0;
margin: 0;
list-style-type: none;
}}
.compact-container > ul > li {{
display: flex;
margin-bottom: 2px;
}}
</style>
<div class="compact-container">
<u><b>总市值</b></u>
<div>
<h2 style="margin: 0;">¥{round(mkt_cap,2)}</h2>
<h2 style='color: {self.get_color(total_return)}; margin: 0;'>{self.format_number(total_return)}</h2>
</div>
<div>
<p style="margin: 0;">追踪误差</p>
<p style='color: {self.get_color(tracking_error)}; margin: 0;'>{self.format_number(tracking_error)}</p>
</div>
<div>
<p style="margin: 0;">风险</p>
<p style='color: {self.get_color(risk)}; margin: 0;'>{self.format_number(risk)}</p>
</div>
<div>
<p style="margin: 0;">归因</p>
<ul style="padding: 0; margin: 0; list-style-type: none;">
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">主动回报:</p>
<p style="color: {self.get_color(active_return)}; margin: 0;">{self.format_number(active_return)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">交互:</p>
<p style="color: {self.get_color(interaction)}; margin: 0;">{self.format_number(interaction)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">名义主动回报:</p>
<p style="color: {self.get_color(notional_return)}; margin: 0;">{self.format_number(notional_return)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">选择:</p>
<p style="color: {self.get_color(selection)}; margin: 0;">{self.format_number(selection)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">分配:</p>
<p style="color: {self.get_color(allocation)}; margin: 0;">{self.format_number(allocation)}</p>
</div>
</li>
</ul>
</div>
</div>
"""
return report
def create_plot(self):
result = processing.calculate_return(
self.eval_df, self.start_date, self.end_date)
fig = px.line(result, x="date", y=['return_p', 'return_b'])
fig.update_traces(mode="lines+markers",
marker=dict(size=5), line=dict(width=2))
fig.update_layout(styling.plot_layout)
colname_to_name = {
'return_p': 'Portfolio回报',
'return_b': 'benchmark回报'
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
# fig.layout.autosize = True
return fig.to_dict()
@param.depends('start_date', 'end_date', 'eval_df', watch=True)
def update(self):
fig = self.create_plot()
report = self.create_report()
self.report.object = report
self.plot_pane.object = fig
def __init__(self, eval_df, b_stock_df, p_stock_df, **params):
self.eval_df = eval_df
self.b_stock_df = b_stock_df
self.p_stock_df = p_stock_df
self._date_range = pn.widgets.DateRangeSlider(
start=eval_df.date.min(),
end=eval_df.date.max(),
value=(eval_df.date.max() - timedelta(days=7), eval_df.date.max())
)
self.start_date = self._date_range.value_start
self.end_date = self._date_range.value_end
self.plot_pane = pn.pane.Plotly(
self.create_plot(), sizing_mode='stretch_width')
self.report = pn.pane.HTML(
self.create_report(), sizing_mode='stretch_width')
super().__init__(**params)
self._sync_widgets()
def __panel__(self):
self._layout = pn.Card(self._date_range, self.report, self.plot_pane,
width=500, header=pn.Row(pn.pane.Str('投资组合总结'),
pn.widgets.TooltipIcon(value=description.summary_card)))
return self._layout
@param.depends('start_date', 'end_date', 'eval_df', watch=True)
def update_selected_df(self):
self.selected_df = self.eval_df[self.eval_df.date.between(
self.start_date, self.end_date
)]
@param.depends('value', 'width', watch=True)
def _sync_widgets(self):
pass
@param.depends('_date_range.value', watch=True)
def _sync_params(self):
self.start_date = self._date_range.value[0]
self.end_date = self._date_range.value[1]
class DrawDownCard(Viewer):
def __init__(self, eval_df, calculated_p_stock, calculated_b_stock, **params):
self.eval_df = eval_df
self.calculated_p_stock = calculated_p_stock
self.calculated_b_stock = calculated_b_stock
self.drawdown_plot = pn.pane.Plotly(self.plot_drawdown())
super().__init__(**params)
def calculate_drawdown(self):
df = self.eval_df.copy()
# rolling max return
# TODO: consider adding this to the processing code
df['rolling_max_return_p'] = df['portfolio_return_p'].rolling(
window=len(df), min_periods=1).max()
# calculate drawdown
df['drawn_down'] = abs(
(1 + df.portfolio_return_p) / (1 + df.rolling_max_return_p) - 1)
return df
def plot_drawdown(self):
df = self.calculate_drawdown()
fig = px.line(df, x="date", y=['drawn_down'])
# add scatter to represetn new high
new_height_pnl = df[df.portfolio_return_p == df.rolling_max_return_p]
fig.add_trace(go.Scatter(
x=new_height_pnl['date'], y=new_height_pnl['drawn_down'], mode='markers', name='新的最高总回报'))
colname_to_name = {
'drawn_down': '回撤'
}
fig.update_layout(styling.plot_layout)
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
# hovertemplate=t.hovertemplate.replace(
# t.name, colname_to_name.get(t.name, t.name))
))
return fig
def update(self):
pass
def __panel__(self):
self._layout = pn.Card(self.drawdown_plot,
header=pn.Row(pn.pane.Str('回撤分析')),
width=500
)
return self._layout
class HistReturnCard(Viewer):
eval_df = param.Parameter()
return_barplot = param.Parameterized()
select_resolution = param.ObjectSelector(
default='每月回报', objects=['每日回报', '每周回报', '每月回报', '每年回报'])
def update_aggregate_df(self):
freq = None
if self.select_resolution == "每日回报":
return self.eval_df
elif self.select_resolution == "每月回报":
freq = 'M'
elif self.select_resolution == "每年回报":
freq = 'Y'
elif self.select_resolution == "每周回报":
freq = 'W'
# I don't think this formula is correct, check this later
agg_df = self.eval_df.groupby([pd.Grouper(key='date', freq=freq)])\
.aggregate({'portfolio_pct_p': 'sum', 'portfolio_pct_b': 'sum'})
agg_df['portfolio_return_p'] = np.exp(agg_df.portfolio_pct_p) - 1
agg_df['portfolio_return_b'] = np.exp(agg_df.portfolio_pct_b) - 1
return agg_df.reset_index()
def create_attributes_barplot(self):
self.attribute_df = self.update_attributes_df()
fig = px.bar(self.attribute_df, x='date', y=[
'allocation', 'selection', 'interaction', 'notional_return', 'active_return'])
colname_to_name = {
'allocation': '分配',
'selection': '选择',
'interaction': '交互',
'notional_return': '名义主动回报',
'active_return': '实际主动回报'
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
fig.update_layout(barmode='group', title='主动回报归因',
bargap=0.0, bargroupgap=0.0)
fig.update_layout(**styling.plot_layout)
fig.update_traces(**styling.barplot_trace)
return fig.to_dict()
def create_return_barplot(self):
self.agg_df = self.update_aggregate_df()
fig = px.bar(self.agg_df, x='date', y=[
'portfolio_return_p', 'portfolio_return_b'],
barmode='overlay',
title='周期回报',
)
# update legend
colname_to_name = {
'portfolio_return_p': 'portfolio回报率',
'portfolio_return_b': 'benchmark回报率'
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
fig.update_layout(**styling.plot_layout)
fig.update_traces(**styling.barplot_trace)
return fig.to_dict()
@param.depends('eval_df', 'select_resolution', watch=True)
def update(self):
return_barplot = self.create_return_barplot()
self.return_barplot.object = return_barplot
attributes_barplot = self.create_attributes_barplot()
self.attribute_barplot.object = attributes_barplot
def update_attributes_df(self):
freq = None
if self.select_resolution == "每日回报":
freq = 'D'
elif self.select_resolution == "每月回报":
freq = 'M'
elif self.select_resolution == "每年回报":
freq = 'Y'
elif self.select_resolution == "每周回报":
freq = 'W'
p_stock = processing.change_resolution(self.calculated_p_stock, freq)
b_stock = processing.change_resolution(self.calculated_b_stock, freq)
return processing.calculate_total_attribution(p_stock, b_stock)
def __init__(self, eval_df, calculated_p_stock, calculated_b_stock, **params):
self.eval_df = eval_df
self.calculated_p_stock = calculated_p_stock
self.calculated_b_stock = calculated_b_stock
self._range_slider = pn.widgets.DateRangeSlider(
name='Date Range Slider',
start=self.eval_df.date.min(), end=self.eval_df.date.max(),
value=(self.eval_df.date.min(), self.eval_df.date.max()),
)
self.return_barplot = pn.pane.Plotly(self.create_return_barplot())
self.attribute_barplot = pn.pane.Plotly(
self.create_attributes_barplot())
super().__init__(**params)
def __panel__(self):
self._layout = pn.Card(pn.Param(self.param.select_resolution, name='选择周期'),
self.return_barplot, self.attribute_barplot, width=500, header=pn.Row(pn.pane.Str('周期回报'),
pn.widgets.TooltipIcon(value=description.periodic_return_report)))
return self._layout
class PortfolioComposationCard(Viewer):
p_stock_df = param.Parameterized()
def create_cash_position_df(self):
aggregate_df = self.p_stock_df.groupby('date', as_index=False).agg({
'current_weight': 'sum'
})
aggregate_df['type'] = 'portfolio'
not_in_portfolio_df = aggregate_df.copy()
not_in_portfolio_df['type'] = 'not_in_portfolio'
not_in_portfolio_df['current_weight'] = 1000
# append df
aggregate_df = pd.concat([aggregate_df, not_in_portfolio_df])
# sort
aggregate_df.sort_values(by=['date'], inplace=True)
return aggregate_df[aggregate_df.date.between(self.date_range.value[0], self.date_range.value[1])]
@param.depends('p_stock_df', 'date_range.value', watch=True)
def update_trend_plot(self):
self.trend_plot.object = self.create_trend_plot()
def create_trend_plot(self):
aggregate_df = self.create_cash_position_df()
fig = px.bar(aggregate_df, x='date', y='current_weight', color='type')
fig.update_layout(legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
))
fig.update_traces(
marker_line_width=0,
selector=dict(type="bar"))
fig.update_layout(bargap=0,
bargroupgap=0,
)
fig.update_layout(uniformtext_minsize=8, uniformtext_mode='hide',
yaxis_title=None, xaxis_title=None,
margin=dict(l=0, r=0, t=0, b=0))
return fig.to_dict()
def create_treemap(self):
self.selected_df['position'] = 'portfolio'
not_in_portfolio_row = pd.DataFrame({
'display_name': ['不在portfolio中'],
'position': ['not_in_portfolio'],
'aggregate_sector': ['不在portfolio中'],
'current_weight': [1000],
'portfolio_return': [0],
'portfolio_pct': [0]
})
df = pd.concat([self.selected_df, not_in_portfolio_row],
ignore_index=True)
fig = px.treemap(df, path=[px.Constant('cash_position'), 'position', 'aggregate_sector', 'display_name'], values='current_weight',
color='portfolio_return', hover_data=['portfolio_return', 'portfolio_pct'],
color_continuous_scale='RdBu',
color_continuous_midpoint=np.average(
df['portfolio_return'])
)
fig.update_layout(styling.plot_layout)
fig.update_layout(coloraxis_colorbar=dict(
title="weighted return"))
colname_to_name = {
'cash_position': '现金分布',
'portfolio_return': '加权回报',
'not_in_portfolio': '不在portfolio中',
'current_weight': '现金',
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
return fig.to_dict()
def __init__(self, p_stock_df, **params):
self.p_stock_df = p_stock_df
self.date_picker = pn.widgets.DatetimePicker(name='选择某日资金分布',
start=self.p_stock_df.date.min(),
end=self.p_stock_df.date.max(),
value=self.p_stock_df.date.max(),
enabled_dates=[datetime_object.date(
) for datetime_object in self.p_stock_df.date.unique()],
enable_time=False,
)
self.date_range = pn.widgets.DateRangeSlider(name='选择资金分布走势区间',
start=self.p_stock_df.date.min(),
end=self.p_stock_df.date.max(),
value=(self.p_stock_df.date.min(
), self.p_stock_df.date.max()),
)
self.selected_df = self.p_stock_df[self.p_stock_df.date ==
self.date_picker.value]
self.tree_plot = pn.pane.Plotly(self.create_treemap())
self.trend_plot = pn.pane.Plotly(self.create_trend_plot())
# calculate money position
super().__init__(**params)
def __panel__(self):
self._layout = pn.Card(self.date_picker, self.tree_plot, self.date_range, self.trend_plot,
width=500, header=pn.pane.Str('资金分布'))
return self._layout
@param.depends('date_picker.value', 'p_stock_df', watch=True)
def update(self):
self.selected_df = self.p_stock_df[self.p_stock_df.date ==
self.date_picker.value]
tree_plot = self.create_treemap()
self.tree_plot.object = tree_plot
class BestAndWorstStocks(Viewer):
p_stock_df = param.Parameter()
b_stock_df = param.Parameter()
start_date = param.Parameter()
end_date = param.Parameter()
def calculate_attributes(self):
result_df = processing.calculate_attributes_between_dates(self.start_date,
self.end_date,
self.p_stock_df,
self.b_stock_df)
return result_df
def create_tabulator(self, df):
col_title_map = {
'display_name_p': '股票名称',
'ticker': '股票代码',
'pct_p': '加权回报率',
'prev_w_in_p_b': '在benchmark中的权重',
'prev_w_in_p_p': '在portfolio中的权重',
'allocation': '分配分数',
'selection': '选择分数',
'interaction': '交互分数',
'return': '未加权回报率',
'active_return': '加权主动回报率',
}
return pn.widgets.Tabulator(df, sizing_mode='stretch_width',
hidden_columns=['index', 'display_name_b',
'pct_b', 'in_portfolio',
],
frozen_columns=['display_name_p'],
titles=col_title_map)
@param.depends('start_date', 'end_date', watch=True)
def update(self):
result_df = self.get_processed_df()
self.best_5_tabulator.value = result_df.tail(5)
self.worst_5_tabulator.value = result_df.head(5)
def get_processed_df(self):
'''
calculate attributes and return a sorted dataframe on weighted return
'''
result_df = self.calculate_attributes()
result_df = result_df[result_df.in_portfolio]
result_df.sort_values(by='return', inplace=True)
return result_df
def __init__(self, p_stock_df, b_stock_df, **params):
self.p_stock_df = p_stock_df
self.b_stock_df = b_stock_df
self._date_range = pn.widgets.DateRangeSlider(
name='选择计算回报的时间区间',
start=p_stock_df.date.min(),
end=p_stock_df.date.max(),
value=(p_stock_df.date.max() -
timedelta(days=7), p_stock_df.date.max())
)
self.start_date = self._date_range.value_start
self.end_date = self._date_range.value_end
result_df = self.get_processed_df()
self.best_5_tabulator = self.create_tabulator(result_df.tail(5))
self.worst_5_tabulator = self.create_tabulator(result_df.head(5))
super().__init__(**params)
@param.depends('_date_range.value', watch=True)
def _sync_params(self):
self.start_date = self._date_range.value[0]
self.end_date = self._date_range.value[1]
# print('update range...')
def __panel__(self):
self._layout = pn.Card(self._date_range,
pn.pane.Str('加权回报率最高回报5只股票'),
self.best_5_tabulator,
pn.pane.Str('加权回报率最低回报5只股票'),
self.worst_5_tabulator,
max_width=500, header=pn.pane.Str('Portfolio中最高回报和最低加权回报率股票'))
return self._layout
class TopHeader(Viewer):
'''
display up to todays' PnL, total return and max drawdown
'''
eval_df = param.Parameter()
@param.depends('eval_df', watch=True)
def update(self):
'''
update Pnl, total return and max drawdown when df is updated
'''
return
def calculation(self):
'''calculate PnL, total return and max drawdown'''
pnl = self.eval_df[self.eval_df.date ==
self.eval_df.date.max()].cum_pnl.values[0]
total_return = self.eval_df[self.eval_df.date ==
self.eval_df.date.max()].portfolio_return_p.values[0]
# max draw down
self.eval_df['rolling_max_return'] = self.eval_df.portfolio_return_p.rolling(
window=len(self.eval_df), min_periods=1).max()
self.eval_df.draw_down = abs(
(1 + self.eval_df.portfolio_return_p) /
(1 + self.eval_df.rolling_max_return) - 1
)
max_drawdown = self.eval_df.draw_down.max()
return pnl, total_return, max_drawdown
def create_report(self, pnl, total_return, max_drawdown):
return pn.FlexBox(
f"PnL:{round(pnl,2)}¥", f"回报:{round(total_return * 100,2)}%", f'最大回撤:{round(max_drawdown * 100,2)}%', justify_content='space-evenly')
def __init__(self, eval_df, **params):
self.eval_df = eval_df
pnl, total_return, max_drawdown = self.calculation()
self.report = self.create_report(pnl, total_return, max_drawdown)
super().__init__(**params)
def __panel__(self):
self._layout = pn.Card(self.report, sizing_mode='stretch_width')
return self._layout