portfolio_management / appComponents.py
huggingface112's picture
remove unused files
576c56b
raw
history blame
No virus
29.7 kB
import processing
from datetime import datetime, timedelta
import panel as pn
import pandas as pd
import hvplot.pandas # noqa
import plotly.express as px
import numpy as np
import hvplot.pandas # noqa
from panel.viewable import Viewer
import param
import styling
import description
import plotly.graph_objs as go
# import warnings
pn.extension('mathjax')
pn.extension('plotly')
class TotalReturnCard(Viewer):
start_date = param.Parameter()
end_date = param.Parameter()
b_stock_df = param.Parameter()
p_stock_df = param.Parameter()
def format_number(self, num):
return f'{round(num * 100, 2)}%'
def get_color(self, num):
return 'green' if num >= 0 else 'red'
def create_report(self):
# Calculate the risk, tracking error, active return
# get the result from entry with max time
most_recent_row = self.result.loc[self.result.time.idxmax()]
active_return = most_recent_row.active_return
tracking_error = most_recent_row.tracking_error
total_return = most_recent_row.weighted_return_p
mkt_cap = most_recent_row.cash
risk = most_recent_row.risk
# Calculate the total attribution
# attributes = processing.calculate_attributes_between_dates(
# self.start_date, self.end_date, self.p_stock_df, self.b_stock_df)
# total_attributes = attributes.aggregate({
# 'interaction': 'sum',
# 'allocation': 'sum',
# 'selection': 'sum',
# 'active_return': 'sum',
# 'notional_return': 'sum'
# })
# active_return_from_stock = total_attributes.active_return
# notional_return = total_attributes.notional_return
# interaction = total_attributes.interaction
# allocation = total_attributes.allocation
# selection = total_attributes.selection
# TODO dummy data
active_return_from_stock = 0
notional_return = 0
interaction = 0
allocation = 0
selection = 0
# Create a function for text report
report = f"""
<style>
.compact-container {{
display: flex;
flex-direction: column;
gap: 5px;
}}
.compact-container > div {{
display: flex;
justify-content: space-between;
margin-bottom: 2px;
}}
.compact-container > div > h2,
.compact-container > div > h3,
.compact-container > div > p,
.compact-container > div > ul > li {{
margin: 0;
}}
.compact-container > ul {{
padding: 0;
margin: 0;
list-style-type: none;
}}
.compact-container > ul > li {{
display: flex;
margin-bottom: 2px;
}}
</style>
<div class="compact-container">
<u><b>总市值</b></u>
<div>
<h2 style="margin: 0;">¥{round(mkt_cap,2)}</h2>
<h2 style='color: {self.get_color(total_return)}; margin: 0;'>{self.format_number(total_return)}</h2>
</div>
<div>
<p style="margin: 0;">追踪误差</p>
<p style='color: {self.get_color(tracking_error)}; margin: 0;'>{self.format_number(tracking_error)}</p>
</div>
<div>
<p style="margin: 0;">风险</p>
<p style='color: {self.get_color(risk)}; margin: 0;'>{self.format_number(risk)}</p>
</div>
<div>
<p style="margin: 0;">归因</p>
<ul style="padding: 0; margin: 0; list-style-type: none;">
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">主动回报:</p>
<p style="color: {self.get_color(active_return)}; margin: 0;">{self.format_number(active_return)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">交互:</p>
<p style="color: {self.get_color(interaction)}; margin: 0;">{self.format_number(interaction)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">名义主动回报:</p>
<p style="color: {self.get_color(notional_return)}; margin: 0;">{self.format_number(notional_return)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">选择:</p>
<p style="color: {self.get_color(selection)}; margin: 0;">{self.format_number(selection)}</p>
</div>
</li>
<li style="margin-bottom: 2px;">
<div style="display: flex;">
<p style="margin: 0;">分配:</p>
<p style="color: {self.get_color(allocation)}; margin: 0;">{self.format_number(allocation)}</p>
</div>
</li>
</ul>
</div>
</div>
"""
return report
def _create_result_df(self, analytic_b, analytic_p):
'''
calculate weighted return, tracking error, risk for the whole portfolio
'''
return_b_df = processing.calculate_weighted_return(
analytic_b, self.start_date, self.end_date)
return_p_df = processing.calculate_weighted_return(
analytic_p, self.start_date, self.end_date)
# weighted pct
processing.calculate_weighted_pct(return_b_df)
processing.calculate_weighted_pct(return_p_df)
# not needed but to accomendate post processing
return_b_df['in_benchmark'] = True
return_p_df['in_portfolio'] = False
merged_df = pd.merge(return_b_df, return_p_df, on=[
'ticker', 'time'], how='outer', suffixes=('_b', '_p'))
processing.post_process_merged_analytic_df(merged_df)
# fill emtpy weighted_return with 0
# merged_df['weighted_return_b'] = merged_df['weighted_return_b'].fillna(0)
# merged_df['weighted_return_p'] = merged_df['weighted_return_p'].fillna(0)
# aggregate on date
result = merged_df.groupby('time').aggregate({'weighted_return_p': 'sum',
'weighted_return_b': 'sum',
"cash": 'sum',
'weighted_pct_p': 'sum',
'weighted_pct_b': 'sum',
})
# active return
result['active_return'] = result.weighted_return_p - \
result.weighted_return_b
result.sort_values('time', inplace=True)
# tracking error
result['tracking_error'] = result['active_return'].rolling(
len(result), min_periods=1).std() * np.sqrt(252)
# risk std of pct
result['risk'] = result['weighted_pct_b'].rolling(
len(result), min_periods=1).std() * np.sqrt(252)
# result.time = result.index
result.reset_index(inplace=True)
return result
def create_plot(self):
fig = px.line(self.result, y=[
'weighted_return_p', 'weighted_return_b'])
fig.update_traces(mode="lines+markers",
marker=dict(size=5), line=dict(width=2))
fig.update_layout(styling.plot_layout)
colname_to_name = {
'weighted_return_p': 'Portfolio回报',
'weighted_return_b': 'benchmark回报'
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
# fig.layout.autosize = True
return fig.to_dict()
@param.depends('start_date', 'end_date', 'b_stock_df', 'p_stock_df', watch=True)
def update(self):
self.result = self._create_result_df(self.p_stock_df, self.b_stock_df)
fig = self.create_plot()
report = self.create_report()
self.report.object = report
self.plot_pane.object = fig
def __init__(self, b_stock_df, p_stock_df, **params):
self.b_stock_df = b_stock_df
self.p_stock_df = p_stock_df
self._date_range = pn.widgets.DateRangeSlider(
start=p_stock_df.time.min(),
end=b_stock_df.time.max(),
value=(p_stock_df.time.max() -
timedelta(days=7), p_stock_df.time.max())
)
self.start_date = self._date_range.value_start
self.end_date = self._date_range.value_end
self.result = self._create_result_df(b_stock_df, p_stock_df)
self.plot_pane = pn.pane.Plotly(
self.create_plot(), sizing_mode='stretch_width')
self.report = pn.pane.HTML(
self.create_report(), sizing_mode='stretch_width')
super().__init__(**params)
# self._sync_widgets()
def __panel__(self):
self._layout = pn.Card(self._date_range, self.report, self.plot_pane,
width=500, header=pn.Row(pn.pane.Str('投资组合总结'),
pn.widgets.TooltipIcon(value=description.summary_card)))
return self._layout
# @param.depends('value', 'width', watch=True)
# def _sync_widgets(self):
# pass
@param.depends('_date_range.value', watch=True)
def _sync_params(self):
self.start_date = self._date_range.value[0]
self.end_date = self._date_range.value[1]
class DrawDownCard(Viewer):
selected_key_column = param.Parameter()
calcualted_p_stock = param.Parameter()
def __init__(self, calculated_p_stock, **params):
self.select = pn.widgets.Select(
name='Select', value='盈利', options=['盈利', '回报'])
self.calculated_p_stock = calculated_p_stock
self._sycn_params()
self.drawdown_plot = pn.pane.Plotly(self.plot_drawdown())
super().__init__(**params)
@param.depends('select.value', watch=True)
def _sycn_params(self):
self.selected_key_column = 'cum_pnl' if self.select.value == '盈利' else 'weighted_return'
def _aggregate_by_sum(self):
# calculate weighted return
processed_df = processing.calculate_weighted_return(
self.calculated_p_stock)
agg_df = processed_df.groupby('time').aggregate({
'weighted_return': 'sum',
'cash': 'sum',
'pnl': 'sum',
})
# calcualte cum pnl
agg_df['cum_pnl'] = agg_df['pnl'].cumsum()
return agg_df
def calculate_drawdown(self):
agg_df = self._aggregate_by_sum()
df = processing.calculate_draw_down_on(
agg_df, self.selected_key_column)
df.reset_index(inplace=True)
return df
def plot_drawdown(self):
df = self.calculate_drawdown()
fig = px.line(df, x='time', y=['drawn_down'])
# add scatter to represetn new high
new_height_pnl = df[df[self.selected_key_column] ==
df[f'rolling_max_{self.selected_key_column}']]
fig.add_trace(go.Scatter(x=new_height_pnl['time'],
y=new_height_pnl['drawn_down'], mode='markers', name='新的最高总回报'))
colname_to_name = {
'drawn_down': '回撤'
}
fig.update_layout(styling.plot_layout)
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
# hovertemplate=t.hovertemplate.replace(
# t.name, colname_to_name.get(t.name, t.name))
))
return fig
@param.depends('selected_key_column', watch=True)
def update(self):
self.drawdown_plot.object = self.plot_drawdown().to_dict()
def __panel__(self):
self._layout = pn.Card(
self.select,
self.drawdown_plot,
header=pn.Row(pn.pane.Str('回撤分析')),
width=500
)
return self._layout
class HistReturnCard(Viewer):
return_barplot = param.Parameterized()
calculated_b_stock = param.Parameterized()
calculated_p_stock = param.Parameterized()
select_resolution = param.ObjectSelector(
default='每周回报', objects=['每日回报', '每周回报', '每月回报', '每年回报'])
def _calculate_return(self, df, freq):
# start on tuesday, end on monday
grouped = df.groupby(pd.Grouper(key='time', freq=freq))
agg_df = grouped.agg({'weighted_log_return': 'sum'})
# time indicating the last end of the week
agg_df['time'] = agg_df.index
# convert cumulative log return to percentage return
agg_df['return'] = np.exp(agg_df['weighted_log_return']) - 1
# return agg_df
return agg_df.reset_index(drop=True)
def update_aggregate_df(self):
freq = None
if self.select_resolution == "每日回报":
freq = "D"
elif self.select_resolution == "每月回报":
freq = 'M'
elif self.select_resolution == "每年回报":
freq = 'Y'
elif self.select_resolution == "每周回报":
freq = 'W-MON'
p_return = self._calculate_return(self.calculated_p_stock, freq)
b_return = self._calculate_return(self.calculated_b_stock, freq)
merge_df = pd.merge(p_return, b_return, on='time',
how='outer', suffixes=('_p', '_b'))
return merge_df
def create_attributes_barplot(self):
self.attribute_df = self._update_attributes_df()
fig = px.bar(self.attribute_df, x='period_str', y=[
'allocation', 'selection', 'interaction', 'notional_active_return', 'active_return'])
colname_to_name = {
'allocation': '分配',
'selection': '选择',
'interaction': '交互',
'notional_active_return': '名义主动回报',
'active_return': '实际主动回报'
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
fig.update_layout(barmode='group', title='主动回报归因',
bargap=0.0, bargroupgap=0.0)
fig.update_layout(**styling.plot_layout)
fig.update_traces(**styling.barplot_trace)
return fig.to_dict()
def create_return_barplot(self):
self.agg_df = self.update_aggregate_df()
fig = px.bar(self.agg_df, x='time', y=[
'return_p', 'return_b'],
barmode='overlay',
title='周期回报',
)
# update legend
colname_to_name = {
'return_p': 'portfolio回报率',
'return_b': 'benchmark回报率'
}
fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
legendgroup=colname_to_name.get(
t.name, t.name),
hovertemplate=t.hovertemplate.replace(
t.name, colname_to_name.get(t.name, t.name))
))
fig.update_layout(**styling.plot_layout)
fig.update_traces(**styling.barplot_trace)
return fig.to_dict()
@param.depends('calculated_p_stock', 'calculated_b_stock', 'select_resolution', watch=True)
def update(self):
return_barplot = self.create_return_barplot()
self.return_barplot.object = return_barplot
attributes_barplot = self.create_attributes_barplot()
self.attribute_barplot.object = attributes_barplot
def _update_attributes_df(self):
freq = None
if self.select_resolution == "每日回报":
freq = 'D'
elif self.select_resolution == "每月回报":
freq = 'M'
elif self.select_resolution == "每年回报":
freq = 'Y'
elif self.select_resolution == "每周回报":
freq = 'W-MON'
agg_p = processing.aggregate_analytic_df_by_period(
self.calculated_p_stock, freq)
agg_b = processing.aggregate_analytic_df_by_period(
self.calculated_b_stock, freq)
bhb_df = processing.calculate_periodic_BHB(agg_p, agg_b)
agg_bhb = processing.aggregate_bhb_df(bhb_df)
agg_bhb['period_str'] = agg_bhb.index.map(lambda x: str(x))
return agg_bhb
def __init__(self, calculated_p_stock, calculated_b_stock, **params):
self.calculated_p_stock = calculated_p_stock
self.calculated_b_stock = calculated_b_stock
self._range_slider = pn.widgets.DateRangeSlider(
name='Date Range Slider',
start=self.calculated_p_stock.time.min(), end=self.calculated_p_stock.time.max(),
value=(self.calculated_p_stock.time.min(),
self.calculated_p_stock.time.max()),
)
self.return_barplot = pn.pane.Plotly(self.create_return_barplot())
self.attribute_barplot = pn.pane.Plotly(
self.create_attributes_barplot())
super().__init__(**params)
def __panel__(self):
self._layout = pn.Card(pn.Param(self.param.select_resolution, name='选择周期'),
self.return_barplot, self.attribute_barplot, width=500, header=pn.Row(pn.pane.Str('周期回报'),
pn.widgets.TooltipIcon(value=description.periodic_return_report)))
return self._layout
class PortfolioComposationCard(Viewer):
p_stock_df = param.Parameterized()
selected_date = param.Parameterized()
def create_cash_position_df(self):
aggregate_df = self.p_stock_df.groupby('time', as_index=False).agg({
'cash': 'sum'
})
aggregate_df['type'] = 'portfolio'
not_in_portfolio_df = aggregate_df.copy()
not_in_portfolio_df['type'] = 'not_in_portfolio'
not_in_portfolio_df['cash'] = 1000
# append df
aggregate_df = pd.concat([aggregate_df, not_in_portfolio_df])
# sort
aggregate_df.sort_values(by=['time'], inplace=True)
return aggregate_df[aggregate_df.time.between(self.date_range.value[0], self.date_range.value[1])]
@param.depends('p_stock_df', 'date_range.value', watch=True)
def update_trend_plot(self):
self.trend_plot.object = self.create_trend_plot()
def create_trend_plot(self):
aggregate_df = self.create_cash_position_df()
fig = px.bar(aggregate_df, x='time', y='cash', color='type')
fig.update_layout(legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
))
fig.update_traces(
marker_line_width=0,
selector=dict(type="bar"))
fig.update_layout(bargap=0,
bargroupgap=0,
)
fig.update_layout(uniformtext_minsize=8, uniformtext_mode='hide',
yaxis_title=None, xaxis_title=None,
margin=dict(l=0, r=0, t=0, b=0))
return fig.to_dict()
def create_treemap(self):
self.selected_df = self.p_stock_df[self.p_stock_df.time ==
self.datetime_picker.value]
self.selected_df['position'] = '股票'
not_in_portfolio_row = pd.DataFrame({
'display_name': ['闲置'],
'position': ['闲置'],
'aggregate_sector': ['闲置'],
'cash': [100],
'weighted_return': [0]
})
df = pd.concat([self.selected_df, not_in_portfolio_row],
ignore_index=True)
fig = px.treemap(df,
# path=[px.Constant('cash_position'), 'position',
# 'aggregate_sector', 'display_name'],
path=['position', 'aggregate_sector', 'display_name'],
values='cash',
color='weighted_return',
hover_data=['weighted_return', 'cash'],
color_continuous_scale='RdBu',
color_continuous_midpoint=np.average(
df['weighted_return'])
)
fig.update_layout(styling.plot_layout)
fig.update_layout(coloraxis_colorbar=dict(
title="累计加权回报率"))
# colname_to_name = {
# 'cash_position': '现金分布',
# 'portfolio_return': '加权回报',
# 'not_in_portfolio': '不在portfolio中',
# 'current_weight': '现金',
# }
# fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name),
# hovertemplate=t.hovertemplate.replace(
# t.name, colname_to_name.get(t.name, t.name))
# ))
return fig.to_dict()
def __init__(self, analytic_df, **params):
self.p_stock_df = analytic_df
self.p_stock_df = processing.calculate_weighted_return(self.p_stock_df,
start=self.p_stock_df.time.min(),
end=self.p_stock_df.time.max())
# convert to datetime to date
enabled_dates = [time.date() for time in self.p_stock_df.time.unique()]
self.datetime_picker = pn.widgets.DatetimePicker(name='选择某日资金分布',
start=self.p_stock_df.time.min(),
end=self.p_stock_df.time.max(),
value=self.p_stock_df.time.max(),
enabled_dates=enabled_dates,
)
self.date_range = pn.widgets.DateRangeSlider(name='选择资金分布走势区间',
start=self.p_stock_df.time.min(),
end=self.p_stock_df.time.max(),
value=(self.p_stock_df.time.min(
), self.p_stock_df.time.max()),
)
self.tree_plot = pn.pane.Plotly(self.create_treemap())
self.trend_plot = pn.pane.Plotly(self.create_trend_plot())
# calculate money position
super().__init__(**params)
def __panel__(self):
self._layout = pn.Card(self.datetime_picker, self.tree_plot, self.date_range, self.trend_plot,
width=500, header=pn.pane.Str('资金分布'))
return self._layout
@param.depends('datetime_picker.value', 'p_stock_df', watch=True)
def update(self):
tree_plot = self.create_treemap()
self.tree_plot.object = tree_plot
class BestAndWorstStocks(Viewer):
start_date = param.Parameter()
end_date = param.Parameter()
hidden_col = [
'index',
'open',
'high',
'low',
'close',
'volume',
'money',
'pct',
'sector',
'aggregate_sector',
'ave_price',
'weight',
'ini_w',
'name',
'pnl'
]
forzen_columns = ['display_name', 'return', 'cum_pnl', 'shares']
description = "股票表现排名"
tooltip = "在一个时间窗口中累计盈利最高和最低的股票,包括已经卖出的股票,如果表格的日期小于窗口的结束时间代表已经卖出"
def create_tabulator(self, df):
col_title_map = {
'display_name': '股票名称',
'ticker': '股票代码',
'time': '日期',
'return': '回报率',
'sector': '行业',
'shares': '持仓',
'cash': '现金',
'cum_pnl': '累计盈利',
}
return pn.widgets.Tabulator(df, sizing_mode='stretch_width',
hidden_columns=self.hidden_col,
frozen_columns=self.forzen_columns,
titles=col_title_map
)
@param.depends('start_date', 'end_date', watch=True)
def update(self):
result_df = self.get_processed_df()
self.best_5_tabulator.value = result_df.head(5)
self.worst_5_tabulator.value = result_df.tail(5)
def _get_cum_return(self, df):
'''return a df contain cumulative return at the end date'''
result_df = processing.calcualte_return(df=df,
start=self.start_date,
end=self.end_date)
grouped = result_df.groupby('ticker')
last_row = result_df.loc[grouped.time.idxmax()]
return last_row
def get_processed_df(self):
'''
calculate attributes and return a sorted dataframe on weighted return
'''
df = processing.calculate_cum_pnl(self.analytic_df,
start=self.start_date,
end=self.end_date)
df = self._get_cum_return(df)
return df.sort_values(by='cum_pnl', ascending=False)
def __init__(self, analytic_df, **params):
self.analytic_df = analytic_df
self._date_range = pn.widgets.DateRangeSlider(
name='选择计算回报的时间区间',
start=self.analytic_df.time.min(),
end=self.analytic_df.time.max(),
value=(self.analytic_df.time.max() -
timedelta(days=7), self.analytic_df.time.max())
)
self.start_date = self._date_range.value_start
self.end_date = self._date_range.value_end
result_df = self.get_processed_df()
self.best_5_tabulator = self.create_tabulator(result_df.head(5))
self.worst_5_tabulator = self.create_tabulator(result_df.tail(5))
super().__init__(**params)
@param.depends('_date_range.value', watch=True)
def _sync_params(self):
self.start_date = self._date_range.value[0]
self.end_date = self._date_range.value[1]
# print('update range...')
def __panel__(self):
self._layout = pn.Card(self._date_range,
pn.pane.Str('加权回报率最高回报5只股票'),
self.best_5_tabulator,
pn.pane.Str('加权回报率最低回报5只股票'),
self.worst_5_tabulator,
max_width=500,
header=pn.Row(
pn.pane.Str(self.description),
pn.widgets.TooltipIcon(value=self.tooltip)
)
)
return self._layout
class TopHeader(Viewer):
'''
display up to todays' PnL, total return and max drawdown
'''
eval_df = param.Parameter()
@param.depends('eval_df', watch=True)
def update(self):
'''
update Pnl, total return and max drawdown when df is updated
'''
return
def _process(self):
'''calculate accumulative pnl, total return and Max Drawdown on return'''
# return
result_df = processing.calculate_weighted_return(self.eval_df)
# merge by date
agg_df = result_df.groupby('time').aggregate({
'weighted_return': 'sum',
'cash': 'sum',
'pnl': 'sum',
})
agg_df.reset_index(inplace=True)
# accumulative pnl
agg_df['cum_pnl'] = agg_df['pnl'].cumsum()
# calcualte drawdown
result = processing.calculate_draw_down_on(agg_df)
max_draw_down = result.drawn_down.min()
# last row
last_row = agg_df.loc[agg_df.time.idxmax()]
return last_row.cum_pnl, last_row.weighted_return, max_draw_down
def create_report(self, pnl, total_return, max_drawdown):
return pn.FlexBox(
f"PnL:{round(pnl,2)}¥", f"回报:{round(total_return * 100,2)}%", f'最大回撤:{round(max_drawdown * 100,2)}%', justify_content='space-evenly')
def __init__(self, eval_df, **params):
self.eval_df = eval_df
cum_pnl, total_return, max_drawdown = self._process()
self.report = self.create_report(cum_pnl, total_return, max_drawdown)
super().__init__(**params)
def __panel__(self):
self._layout = pn.Card(self.report, sizing_mode='stretch_width')
return self._layout