portfolio_management / appComponents.py
huggingface112's picture
add schema for stocks_price table
687ad2b
raw
history blame
35.3 kB
from script import processing
from datetime import datetime, timedelta
import panel as pn
import pandas as pd
import hvplot.pandas # noqa
import plotly.express as px
import numpy as np
import hvplot.pandas # noqa
from panel.viewable import Viewer
import param
from script import styling
from script import description
import plotly.graph_objs as go
# import warnings
pn.extension('mathjax')
pn.extension('plotly')
pn.extension('plotly')
# warnings.filterwarnings("ignore", category=pd.core.common.SettingWithCopyWarning)
# overal performance default to 30 days
def create_portfolio_overview(df_list):
calculated_b_stock, calculated_p_stock, p_eval_df, sector_eval_df = df_list
range_slider = pn.widgets.DateRangeSlider(name='date range',
start=sector_eval_df.date.min(),
end=sector_eval_df.date.max(),
value=(sector_eval_df.date.max() - timedelta(days=30),
sector_eval_df.date.max()),
align='center',
sizing_mode='stretch_width',
)
size = dict(width=780)
option = dict(legend_position="left")
active_tools = dict(tools=['hover'], active_tools=[], axiswise=True)
# def create_overview_panel()
ip_eval_df = p_eval_df.interactive()
isector_eval_df = sector_eval_df.interactive()
ranged_ip_eval_df = ip_eval_df[ip_eval_df.date.between(
range_slider.param.value_start, range_slider.param.value_end)]
ranged_isector_eval_df = isector_eval_df[isector_eval_df.date.between(
range_slider.param.value_start, range_slider.param.value_end)]
# return
return_plot = ranged_ip_eval_df.hvplot.line(x='date', y=['portfolio_return_p', 'portfolio_return_b'])\
.opts(title='投资组合总回报 v.s benchmark总回报', **size, **option)
# active return
active_return_plot = ranged_ip_eval_df.hvplot.line(x='date', y=['active_return'])\
.opts(title='每日主动回报', **size, axiswise=True)
# total risk and tracking error
risk_tracking_plot = ranged_ip_eval_df.hvplot.line(x='date', y=['risk', 'tracking_error'])\
.opts(title='风险和追踪误差', **size, **option)
# sector return
sector_return_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['portfolio_return_p'], by='aggregate_sector')\
.opts(title='投资组合各行业总回报', **size, **option)
# bsector_return_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['portfolio_return_b'], by='aggregate_sector')\
# .opts(title='benchmark sector return', **size, **option)
# sector active return
s_active_return_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['active_return'], by='aggregate_sector')\
.opts(title='投资组合各行业每日主动回报', **size, **option)
# sector risk and tracking error
s_risk_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['tracking_error'], by='aggregate_sector')\
.opts(title='投资组合各行业追踪误差', **size, **option)
s_tracking_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['risk'], by='aggregate_sector')\
.opts(title='投资组合各行业风险', **size, **option)
# attribute
def create_attribute_plot(start, end, calculated_b_stock, calculated_p_stock):
result = processing.calculate_attributes_between_dates(
start, end, calculated_b_stock, calculated_p_stock)
portfolio_attribute = result.aggregate({
'interaction': 'sum',
'allocation': 'sum',
'selection': 'sum',
})
layout = pn.Column(
pn.pane.DataFrame(portfolio_attribute.transpose()),
result.hvplot.bar(x='display_name_p',
y=['interaction', 'allocation', 'selection'],
shared_axes=False,
stacked=True,
rot=90).opts(**size, **option, title='投资组合总主动回报归因')
)
return layout
attribute_plot = pn.bind(create_attribute_plot,
start=range_slider.param.value_start,
end=range_slider.param.value_end,
calculated_b_stock=calculated_b_stock,
calculated_p_stock=calculated_p_stock)
# stock performance
# selected_p_stock = calculated_p_stock[calculated_p_stock.date ==
# calculated_p_stock.date.max()]
# stock_radar_plot = go.Figure()
# category = ['return', 'risk', 'portfolio_return', 'prev_w_in_p']
# for display_name, group in selected_p_stock.groupby('display_name'):
# stock_radar_plot.add_trace(go.Scatterpolar(
# r=group[category].values[0],
# theta=category,
# fill='toself',
# name=display_name
# ))
total_view_plots = pn.Column(return_plot.opts(**active_tools).output(),
risk_tracking_plot.opts(
**active_tools).output(),
active_return_plot.opts(
**active_tools).output(),
attribute_plot,
height=1000,
scroll=True)
sector_view_plots = pn.Column(sector_return_plot.opts(**active_tools).output(),
s_risk_plot.opts(**active_tools).output(),
s_tracking_plot.opts(
**active_tools).output(),
s_active_return_plot.opts(
**active_tools).output(),
height=1000,
scroll=True)
return pn.Column(
# pn.Row(align='center'),
range_slider,
pn.Row(total_view_plots, sector_view_plots, align='center'))
def attribution_view(daily_bnb_result, daily_sector_bnb_result, p_eval_df):
p_eval_df.date = pd.to_datetime(p_eval_df.date)
daily_bnb_result.date = pd.to_datetime(daily_bnb_result.date)
daily_sector_bnb_result.date = pd.to_datetime(daily_sector_bnb_result.date)
# interactive widget
dt_range = pn.widgets.DateRangeSlider(start=p_eval_df.date.min(
), end=p_eval_df.date.max(), value=(p_eval_df.date.min(), p_eval_df.date.max()))
# total attribution and return
p_eval_df_i = p_eval_df.interactive()
daily_bnb_result_i = daily_bnb_result.interactive()
daily_return_plot = p_eval_df_i[(p_eval_df_i.date >= dt_range.param.value_start) & (
p_eval_df_i.date <= dt_range.param.value_end)].hvplot(x='date', y=['portfolio_return_p', 'portfolio_return_b'], title='投资组合总回报 v.s benchmark总回报').output()
daily_bnb_plot = daily_bnb_result_i[daily_bnb_result_i.date.between(dt_range.param.value_start, dt_range.param.value_end)]\
.hvplot.bar(x='date', y=['allocation', 'selection', 'interaction', "active_return"], stacked=True, title='每日主动收益归因', yformatter='%.2f', xlabel='日期', shared_axes=False).output()
# return
daily_sector_bnb_df_i = daily_sector_bnb_result.interactive()
selected_range_df = daily_sector_bnb_df_i[daily_sector_bnb_df_i.date.between(
dt_range.param.value_start, dt_range.param.value_end)]
sector_active_return_plot = selected_range_df.hvplot.line(
x='date', y='active_return', by='aggregate_sector', width=1000, height=400, title='投资组合行业每日主动回报').output()
# attribution
def plot_attribute_by_sector(sector):
selected_sector_df = selected_range_df[selected_range_df.aggregate_sector == sector]
return selected_sector_df.hvplot.bar(x='date', y=['active_return', 'allocation', 'selection', 'interaction'], title='投资组合行业每日主动收入归因', stacked=True, shared_axes=False).output()
sector_attr_plot_tabs = pn.Tabs(*[(sector, plot_attribute_by_sector(sector))
for sector in daily_sector_bnb_result.aggregate_sector.unique()], dymacic=True)
# layout
sector_view = pn.Column(sector_attr_plot_tabs, sector_active_return_plot)
total_view = pn.Column(daily_return_plot, daily_bnb_plot)
return pn.Column(
pn.Row(dt_range),
pn.Row(total_view, sector_view)
)
# plot explore
def create_hvplot_explore(calculated_b_stock, calculated_p_stock, p_eval_df, sector_eval_df, attribution_result_df, s_attribution_result_df):
options = ['calculated_b_stock', 'calculated_p_stock', 'p_eval_df',
'sector_eval_df', 'attribution_result_df', 's_attribution_result_df']
name_to_df = {
'calculated_b_stock': calculated_b_stock,
'calculated_p_stock': calculated_p_stock,
'p_eval_df': p_eval_df,
'sector_eval_df': sector_eval_df,
'attribution_result_df': attribution_result_df,
's_attribution_result_df': s_attribution_result_df
}
selector = pn.widgets.Select(
name='Select', options=options, value=options[0])
def create_exploer(name):
df = name_to_df[name]
explorer = hvplot.explorer(df)
def plot_code(**kwargs):
code = f'```python\n{explorer.plot_code()}\n```'
return pn.pane.Markdown(code, sizing_mode='stretch_width')
pn.Column(
explorer,
'**Code**:',
pn.bind(plot_code, **explorer.param.objects())
)
return explorer
def create_perspective(name):
df = name_to_df[name]
return pn.pane.Perspective(df, columns=list(df.columns), width=1500, height=800)
perspective = pn.bind(create_perspective, name=selector)
exploer = pn.bind(create_exploer, name=selector)
exploer_component = pn.Column(selector, exploer, perspective)
return exploer_component
class TotalReturnCard(Viewer):
value = param.Range(doc="A numeric range.")
width = param.Integer(default=300)
start_date = param.Parameter()
end_date = param.Parameter()
eval_df = param.Parameter()
b_stock_df = param.Parameter()
p_stock_df = param.Parameter()
selected_df = param.Parameter()
plot_pane = param.Parameter()
report = param.Parameter()
def format_number(self, num):
return f'{round(num * 100, 2)}%'
def get_color(self, num):
return 'green' if num >= 0 else 'red'
def create_report(self):
# Calculate the total return and risk
result = processing.calculate_return(
self.eval_df, self.start_date, self.end_date)
most_recent_row = result.tail(1)
active_return = most_recent_row.active_return.values[0]
tracking_error = result.active_return.std() * np.sqrt(252)
total_return = most_recent_row.return_p.values[0]
mkt_cap = most_recent_row.mkt_cap.values[0]
risk = result['return_b'].std() * np.sqrt(252)
# Calculate the total attribution
attributes = processing.calculate_attributes_between_dates(
self.start_date, self.end_date, self.p_stock_df, self.b_stock_df)
total_attributes = attributes.aggregate({
'interaction': 'sum',
'allocation': 'sum',
'selection': 'sum',
'active_return': 'sum',
'notional_return': 'sum'
})
active_return_from_stock = total_attributes.active_return
notional_return = total_attributes.notional_return
interaction = total_attributes.interaction
allocation = total_attributes.allocation
selection = total_attributes.selection
# Create a function for text report
report = f"""
<style>
.compact-container {{
display: flex;
flex-direction: column;
gap: 5px;
}}
.compact-container > div {{
display: flex;
justify-content: space-between;
margin-bottom: 2px;
}}
.compact-container > div > h2,
.compact-container > div > h3,
.compact-container > div > p,
.compact-container > div > ul > li {{
margin: 0;
}}
.compact-container > ul {{
padding: 0;
margin: 0;
list-style-type: none;
}}
.compact-container > ul > li {{
display: flex;
margin-bottom: 2px;
}}
</style>
<div class="compact-container">
<u><b>总市值</b></u>
<div>
<h2 style="margin: 0;">¥{round(mkt_cap,2)}</h2>
<h2 style='color: {self.get_color(total_return)}; margin: 0;'>{self.format_number(total_return)}</h2>
</div>
<div>
<p style="margin: 0;">追踪误差</p>
<p style='color: {self.get_color(tracking_error)}; margin: 0;'>{self.format_number(tracking_error)}</p>
</div>
<div>
<p style="margin: 0;">风险</p>
<p style='color: {self.get_color(risk)}; margin: 0;'>{self.format_number(risk)}</p>
</div>
<div>
<p style="margin: 0;">归因</p>
<ul style="padding: 0; margin: 0; list-style-type: none;">
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">主动回报:</p>
<p style="color: {self.get_color(active_return)}; margin: 0;">{self.format_number(active_return)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">交互:</p>
<p style="color: {self.get_color(interaction)}; margin: 0;">{self.format_number(interaction)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">名义主动回报:</p>
<p style="color: {self.get_color(notional_return)}; margin: 0;">{self.format_number(notional_return)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">选择:</p>
<p style="color: {self.get_color(selection)}; margin: 0;">{self.format_number(selection)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">分配:</p>
<p style="color: {self.get_color(allocation)}; margin: 0;">{self.format_number(allocation)}</p>
</div>
</li>
</ul>
</div>
</div>
"""
return report
def create_plot(self):
result = processing.calculate_return(
self.eval_df, self.start_date, self.end_date)
fig = px.line(result, x="date", y=['return_p', 'return_b'])
fig.update_traces(mode="lines+markers",
marker=dict(size=5), line=dict(width=2))
fig.update_layout(styling.plot_layout)
colname_to_name = {
'return_p': 'Portfolio回报',
'return_b': 'benchmark回报'
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
# fig.layout.autosize = True
return fig.to_dict()
@param.depends('start_date', 'end_date', 'eval_df', watch=True)
def update(self):
fig = self.create_plot()
report = self.create_report()
self.report.object = report
self.plot_pane.object = fig
def __init__(self, eval_df, b_stock_df, p_stock_df, **params):
self.eval_df = eval_df
self.b_stock_df = b_stock_df
self.p_stock_df = p_stock_df
self._date_range = pn.widgets.DateRangeSlider(
start=eval_df.date.min(),
end=eval_df.date.max(),
value=(eval_df.date.max() - timedelta(days=7), eval_df.date.max())
)
self.start_date = self._date_range.value_start
self.end_date = self._date_range.value_end
self.plot_pane = pn.pane.Plotly(
self.create_plot(), sizing_mode='stretch_width')
self.report = pn.pane.HTML(
self.create_report(), sizing_mode='stretch_width')
super().__init__(**params)
self._sync_widgets()
def __panel__(self):
self._layout = pn.Card(self._date_range, self.report, self.plot_pane,
width=500, header=pn.Row(pn.pane.Str('投资组合总结'),
pn.widgets.TooltipIcon(value=description.summary_card)))
return self._layout
@param.depends('start_date', 'end_date', 'eval_df', watch=True)
def update_selected_df(self):
self.selected_df = self.eval_df[self.eval_df.date.between(
self.start_date, self.end_date
)]
@param.depends('value', 'width', watch=True)
def _sync_widgets(self):
pass
@param.depends('_date_range.value', watch=True)
def _sync_params(self):
self.start_date = self._date_range.value[0]
self.end_date = self._date_range.value[1]
class DrawDownCard(Viewer):
def __init__(self, eval_df, calculated_p_stock, calculated_b_stock, **params):
self.eval_df = eval_df
self.calculated_p_stock = calculated_p_stock
self.calculated_b_stock = calculated_b_stock
self.drawdown_plot = pn.pane.Plotly(self.plot_drawdown())
super().__init__(**params)
def calculate_drawdown(self):
df = self.eval_df.copy()
# rolling max return
df['rolling_max_return_p'] = df['portfolio_return_p'].rolling(
window=len(df), min_periods=1).max()
# calculate drawdown
df['drawn_down'] = abs(
(1 + df.portfolio_return_p) / (1 + df.rolling_max_return_p) - 1)
return df
def plot_drawdown(self):
df = self.calculate_drawdown()
fig = px.line(df, x="date", y=['drawn_down'])
# add scatter to represetn new high
new_height_pnl = df[df.portfolio_return_p == df.rolling_max_return_p]
fig.add_trace(go.Scatter(
x=new_height_pnl['date'], y=new_height_pnl['drawn_down'], mode='markers', name='新的最高总回报'))
colname_to_name = {
'drawn_down': '回撤'
}
fig.update_layout(styling.plot_layout)
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
# hovertemplate=t.hovertemplate.replace(
# t.name, colname_to_name.get(t.name, t.name))
))
return fig
def update(self):
pass
def __panel__(self):
self._layout = pn.Card(self.drawdown_plot,
header=pn.Row(pn.pane.Str('回撤分析')),
width=500
)
return self._layout
class HistReturnCard(Viewer):
eval_df = param.Parameter()
return_barplot = param.Parameterized()
select_resolution = param.ObjectSelector(
default='每月回报', objects=['每日回报', '每周回报', '每月回报', '每年回报'])
def update_aggregate_df(self):
freq = None
if self.select_resolution == "每日回报":
return self.eval_df
elif self.select_resolution == "每月回报":
freq = 'M'
elif self.select_resolution == "每年回报":
freq = 'Y'
elif self.select_resolution == "每周回报":
freq = 'W'
# I don't think this formula is correct, check this later
agg_df = self.eval_df.groupby([pd.Grouper(key='date', freq=freq)])\
.aggregate({'portfolio_pct_p': 'sum', 'portfolio_pct_b': 'sum'})
agg_df['portfolio_return_p'] = np.exp(agg_df.portfolio_pct_p) - 1
agg_df['portfolio_return_b'] = np.exp(agg_df.portfolio_pct_b) - 1
return agg_df.reset_index()
def create_attributes_barplot(self):
self.attribute_df = self.update_attributes_df()
fig = px.bar(self.attribute_df, x='date', y=[
'allocation', 'selection', 'interaction', 'notional_return', 'active_return'])
colname_to_name = {
'allocation': '分配',
'selection': '选择',
'interaction': '交互',
'notional_return': '名义主动回报',
'active_return': '实际主动回报'
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
fig.update_layout(barmode='group', title='主动回报归因',
bargap=0.0, bargroupgap=0.0)
fig.update_layout(**styling.plot_layout)
fig.update_traces(**styling.barplot_trace)
return fig.to_dict()
def create_return_barplot(self):
self.agg_df = self.update_aggregate_df()
fig = px.bar(self.agg_df, x='date', y=[
'portfolio_return_p', 'portfolio_return_b'],
barmode='overlay',
title='周期回报',
)
# update legend
colname_to_name = {
'portfolio_return_p': 'portfolio回报率',
'portfolio_return_b': 'benchmark回报率'
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
fig.update_layout(**styling.plot_layout)
fig.update_traces(**styling.barplot_trace)
return fig.to_dict()
@param.depends('eval_df', 'select_resolution', watch=True)
def update(self):
return_barplot = self.create_return_barplot()
self.return_barplot.object = return_barplot
attributes_barplot = self.create_attributes_barplot()
self.attribute_barplot.object = attributes_barplot
def update_attributes_df(self):
freq = None
if self.select_resolution == "每日回报":
freq = 'D'
elif self.select_resolution == "每月回报":
freq = 'M'
elif self.select_resolution == "每年回报":
freq = 'Y'
elif self.select_resolution == "每周回报":
freq = 'W'
p_stock = processing.change_resolution(self.calculated_p_stock, freq)
b_stock = processing.change_resolution(self.calculated_b_stock, freq)
return processing.calculate_total_attribution(p_stock, b_stock)
def __init__(self, eval_df, calculated_p_stock, calculated_b_stock, **params):
self.eval_df = eval_df
self.calculated_p_stock = calculated_p_stock
self.calculated_b_stock = calculated_b_stock
self._range_slider = pn.widgets.DateRangeSlider(
name='Date Range Slider',
start=self.eval_df.date.min(), end=self.eval_df.date.max(),
value=(self.eval_df.date.min(), self.eval_df.date.max()),
)
self.return_barplot = pn.pane.Plotly(self.create_return_barplot())
self.attribute_barplot = pn.pane.Plotly(
self.create_attributes_barplot())
super().__init__(**params)
def __panel__(self):
self._layout = pn.Card(pn.Param(self.param.select_resolution, name='选择周期'),
self.return_barplot, self.attribute_barplot, width=500, header=pn.Row(pn.pane.Str('周期回报'),
pn.widgets.TooltipIcon(value=description.periodic_return_report)))
return self._layout
class PortfolioComposationCard(Viewer):
p_stock_df = param.Parameterized()
def create_cash_position_df(self):
aggregate_df = self.p_stock_df.groupby('date', as_index=False).agg({
'current_weight': 'sum'
})
aggregate_df['type'] = 'portfolio'
not_in_portfolio_df = aggregate_df.copy()
not_in_portfolio_df['type'] = 'not_in_portfolio'
not_in_portfolio_df['current_weight'] = 1000
# append df
aggregate_df = pd.concat([aggregate_df, not_in_portfolio_df])
# sort
aggregate_df.sort_values(by=['date'], inplace=True)
return aggregate_df[aggregate_df.date.between(self.date_range.value[0], self.date_range.value[1])]
@param.depends('p_stock_df', 'date_range.value', watch=True)
def update_trend_plot(self):
self.trend_plot.object = self.create_trend_plot()
def create_trend_plot(self):
aggregate_df = self.create_cash_position_df()
fig = px.bar(aggregate_df, x='date', y='current_weight', color='type')
fig.update_layout(legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
))
fig.update_traces(
marker_line_width=0,
selector=dict(type="bar"))
fig.update_layout(bargap=0,
bargroupgap=0,
)
fig.update_layout(uniformtext_minsize=8, uniformtext_mode='hide',
yaxis_title=None, xaxis_title=None,
margin=dict(l=0, r=0, t=0, b=0))
return fig.to_dict()
def create_treemap(self):
self.selected_df['position'] = 'portfolio'
not_in_portfolio_row = pd.DataFrame({
'display_name': ['不在portfolio中'],
'position': ['not_in_portfolio'],
'aggregate_sector': ['不在portfolio中'],
'current_weight': [1000],
'portfolio_return': [0],
'portfolio_pct': [0]
})
df = pd.concat([self.selected_df, not_in_portfolio_row],
ignore_index=True)
fig = px.treemap(df, path=[px.Constant('cash_position'), 'position', 'aggregate_sector', 'display_name'], values='current_weight',
color='portfolio_return', hover_data=['portfolio_return', 'portfolio_pct'],
color_continuous_scale='RdBu',
color_continuous_midpoint=np.average(
df['portfolio_return'])
)
fig.update_layout(styling.plot_layout)
fig.update_layout(coloraxis_colorbar=dict(
title="weighted return"))
colname_to_name = {
'cash_position': '现金分布',
'portfolio_return': '加权回报',
'not_in_portfolio': '不在portfolio中',
'current_weight': '现金',
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
return fig.to_dict()
def __init__(self, p_stock_df, **params):
self.p_stock_df = p_stock_df
self.date_picker = pn.widgets.DatetimePicker(name='选择某日资金分布',
start=self.p_stock_df.date.min(),
end=self.p_stock_df.date.max(),
value=self.p_stock_df.date.max(),
enabled_dates=[datetime_object.date(
) for datetime_object in self.p_stock_df.date.unique()],
enable_time=False,
)
self.date_range = pn.widgets.DateRangeSlider(name='选择资金分布走势区间',
start=self.p_stock_df.date.min(),
end=self.p_stock_df.date.max(),
value=(self.p_stock_df.date.min(
), self.p_stock_df.date.max()),
)
self.selected_df = self.p_stock_df[self.p_stock_df.date ==
self.date_picker.value]
self.tree_plot = pn.pane.Plotly(self.create_treemap())
self.trend_plot = pn.pane.Plotly(self.create_trend_plot())
# calculate money position
super().__init__(**params)
def __panel__(self):
self._layout = pn.Card(self.date_picker, self.tree_plot, self.date_range, self.trend_plot,
width=500, header=pn.pane.Str('资金分布'))
return self._layout
@param.depends('date_picker.value', 'p_stock_df', watch=True)
def update(self):
self.selected_df = self.p_stock_df[self.p_stock_df.date ==
self.date_picker.value]
tree_plot = self.create_treemap()
self.tree_plot.object = tree_plot
class BestAndWorstStocks(Viewer):
p_stock_df = param.Parameter()
b_stock_df = param.Parameter()
start_date = param.Parameter()
end_date = param.Parameter()
def calculate_attributes(self):
result_df = processing.calculate_attributes_between_dates(self.start_date,
self.end_date,
self.p_stock_df,
self.b_stock_df)
return result_df
def create_tabulator(self, df):
col_title_map = {
'display_name_p': '股票名称',
'ticker': '股票代码',
'pct_p': '加权回报率',
'prev_w_in_p_b': '在benchmark中的权重',
'prev_w_in_p_p': '在portfolio中的权重',
'allocation': '分配分数',
'selection': '选择分数',
'interaction': '交互分数',
'return': '未加权回报率',
'active_return': '加权主动回报率',
}
return pn.widgets.Tabulator(df, sizing_mode='stretch_width',
hidden_columns=['index', 'display_name_b',
'pct_b', 'in_portfolio',
],
frozen_columns=['display_name_p'],
titles=col_title_map)
@param.depends('start_date', 'end_date', watch=True)
def update(self):
result_df = self.get_processed_df()
self.best_5_tabulator.value = result_df.tail(5)
self.worst_5_tabulator.value = result_df.head(5)
def get_processed_df(self):
'''
calculate attributes and return a sorted dataframe on weighted return
'''
result_df = self.calculate_attributes()
result_df = result_df[result_df.in_portfolio]
result_df.sort_values(by='return', inplace=True)
return result_df
def __init__(self, p_stock_df, b_stock_df, **params):
self.p_stock_df = p_stock_df
self.b_stock_df = b_stock_df
self._date_range = pn.widgets.DateRangeSlider(
name='选择计算回报的时间区间',
start=p_stock_df.date.min(),
end=p_stock_df.date.max(),
value=(p_stock_df.date.max() -
timedelta(days=7), p_stock_df.date.max())
)
self.start_date = self._date_range.value_start
self.end_date = self._date_range.value_end
result_df = self.get_processed_df()
self.best_5_tabulator = self.create_tabulator(result_df.tail(5))
self.worst_5_tabulator = self.create_tabulator(result_df.head(5))
super().__init__(**params)
@param.depends('_date_range.value', watch=True)
def _sync_params(self):
self.start_date = self._date_range.value[0]
self.end_date = self._date_range.value[1]
# print('update range...')
def __panel__(self):
self._layout = pn.Card(self._date_range,
pn.pane.Str('加权回报率最高回报5只股票'),
self.best_5_tabulator,
pn.pane.Str('加权回报率最低回报5只股票'),
self.worst_5_tabulator,
max_width=500, header=pn.pane.Str('Portfolio中最高回报和最低加权回报率股票'))
return self._layout
class TopHeader(Viewer):
'''
display up to todays' PnL, total return and max drawdown
'''
eval_df = param.Parameter()
@param.depends('eval_df', watch=True)
def update(self):
'''
update Pnl, total return and max drawdown when df is updated
'''
return
def calculation(self):
'''calculate PnL, total return and max drawdown'''
pnl = self.eval_df[self.eval_df.date ==
self.eval_df.date.max()].cum_pnl.values[0]
total_return = self.eval_df[self.eval_df.date ==
self.eval_df.date.max()].portfolio_return_p.values[0]
# max draw down
self.eval_df['rolling_max_return'] = self.eval_df.portfolio_return_p.rolling(
window=len(self.eval_df), min_periods=1).max()
self.eval_df.draw_down = abs(
(1 + self.eval_df.portfolio_return_p) /
(1 + self.eval_df.rolling_max_return) - 1
)
max_drawdown = self.eval_df.draw_down.max()
return pnl, total_return, max_drawdown
def create_report(self, pnl, total_return, max_drawdown):
return pn.FlexBox(
f"PnL:{round(pnl,2)}¥", f"回报:{round(total_return * 100,2)}%", f'最大回撤:{round(max_drawdown * 100,2)}%', justify_content='space-evenly')
def __init__(self, eval_df, **params):
self.eval_df = eval_df
pnl, total_return, max_drawdown = self.calculation()
self.report = self.create_report(pnl, total_return, max_drawdown)
super().__init__(**params)
def __panel__(self):
self._layout = pn.Card(self.report, sizing_mode='stretch_width')
return self._layout