import processing from datetime import datetime, timedelta import panel as pn import pandas as pd import hvplot.pandas # noqa import plotly.express as px import numpy as np import hvplot.pandas # noqa from panel.viewable import Viewer import param from script import styling from script import description import plotly.graph_objs as go # import warnings pn.extension('mathjax') pn.extension('plotly') # warnings.filterwarnings("ignore", category=pd.core.common.SettingWithCopyWarning) # overal performance default to 30 days def create_portfolio_overview(df_list): calculated_b_stock, calculated_p_stock, p_eval_df, sector_eval_df = df_list range_slider = pn.widgets.DateRangeSlider(name='date range', start=sector_eval_df.date.min(), end=sector_eval_df.date.max(), value=(sector_eval_df.date.max() - timedelta(days=30), sector_eval_df.date.max()), align='center', sizing_mode='stretch_width', ) size = dict(width=780) option = dict(legend_position="left") active_tools = dict(tools=['hover'], active_tools=[], axiswise=True) # def create_overview_panel() ip_eval_df = p_eval_df.interactive() isector_eval_df = sector_eval_df.interactive() ranged_ip_eval_df = ip_eval_df[ip_eval_df.date.between( range_slider.param.value_start, range_slider.param.value_end)] ranged_isector_eval_df = isector_eval_df[isector_eval_df.date.between( range_slider.param.value_start, range_slider.param.value_end)] # return return_plot = ranged_ip_eval_df.hvplot.line(x='date', y=['portfolio_return_p', 'portfolio_return_b'])\ .opts(title='投资组合总回报 v.s benchmark总回报', **size, **option) # active return active_return_plot = ranged_ip_eval_df.hvplot.line(x='date', y=['active_return'])\ .opts(title='每日主动回报', **size, axiswise=True) # total risk and tracking error risk_tracking_plot = ranged_ip_eval_df.hvplot.line(x='date', y=['risk', 'tracking_error'])\ .opts(title='风险和追踪误差', **size, **option) # sector return sector_return_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['portfolio_return_p'], by='aggregate_sector')\ .opts(title='投资组合各行业总回报', **size, **option) # bsector_return_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['portfolio_return_b'], by='aggregate_sector')\ # .opts(title='benchmark sector return', **size, **option) # sector active return s_active_return_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['active_return'], by='aggregate_sector')\ .opts(title='投资组合各行业每日主动回报', **size, **option) # sector risk and tracking error s_risk_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['tracking_error'], by='aggregate_sector')\ .opts(title='投资组合各行业追踪误差', **size, **option) s_tracking_plot = ranged_isector_eval_df.hvplot.line(x='date', y=['risk'], by='aggregate_sector')\ .opts(title='投资组合各行业风险', **size, **option) # attribute def create_attribute_plot(start, end, calculated_b_stock, calculated_p_stock): result = processing.calculate_attributes_between_dates( start, end, calculated_b_stock, calculated_p_stock) portfolio_attribute = result.aggregate({ 'interaction': 'sum', 'allocation': 'sum', 'selection': 'sum', }) layout = pn.Column( pn.pane.DataFrame(portfolio_attribute.transpose()), result.hvplot.bar(x='display_name_p', y=['interaction', 'allocation', 'selection'], shared_axes=False, stacked=True, rot=90).opts(**size, **option, title='投资组合总主动回报归因') ) return layout attribute_plot = pn.bind(create_attribute_plot, start=range_slider.param.value_start, end=range_slider.param.value_end, calculated_b_stock=calculated_b_stock, calculated_p_stock=calculated_p_stock) # stock performance # selected_p_stock = calculated_p_stock[calculated_p_stock.date == # calculated_p_stock.date.max()] # stock_radar_plot = go.Figure() # category = ['return', 'risk', 'portfolio_return', 'prev_w_in_p'] # for display_name, group in selected_p_stock.groupby('display_name'): # stock_radar_plot.add_trace(go.Scatterpolar( # r=group[category].values[0], # theta=category, # fill='toself', # name=display_name # )) total_view_plots = pn.Column(return_plot.opts(**active_tools).output(), risk_tracking_plot.opts( **active_tools).output(), active_return_plot.opts( **active_tools).output(), attribute_plot, height=1000, scroll=True) sector_view_plots = pn.Column(sector_return_plot.opts(**active_tools).output(), s_risk_plot.opts(**active_tools).output(), s_tracking_plot.opts( **active_tools).output(), s_active_return_plot.opts( **active_tools).output(), height=1000, scroll=True) return pn.Column( # pn.Row(align='center'), range_slider, pn.Row(total_view_plots, sector_view_plots, align='center')) def attribution_view(daily_bnb_result, daily_sector_bnb_result, p_eval_df): p_eval_df.date = pd.to_datetime(p_eval_df.date) daily_bnb_result.date = pd.to_datetime(daily_bnb_result.date) daily_sector_bnb_result.date = pd.to_datetime(daily_sector_bnb_result.date) # interactive widget dt_range = pn.widgets.DateRangeSlider(start=p_eval_df.date.min( ), end=p_eval_df.date.max(), value=(p_eval_df.date.min(), p_eval_df.date.max())) # total attribution and return p_eval_df_i = p_eval_df.interactive() daily_bnb_result_i = daily_bnb_result.interactive() daily_return_plot = p_eval_df_i[(p_eval_df_i.date >= dt_range.param.value_start) & ( p_eval_df_i.date <= dt_range.param.value_end)].hvplot(x='date', y=['portfolio_return_p', 'portfolio_return_b'], title='投资组合总回报 v.s benchmark总回报').output() daily_bnb_plot = daily_bnb_result_i[daily_bnb_result_i.date.between(dt_range.param.value_start, dt_range.param.value_end)]\ .hvplot.bar(x='date', y=['allocation', 'selection', 'interaction', "active_return"], stacked=True, title='每日主动收益归因', yformatter='%.2f', xlabel='日期', shared_axes=False).output() # return daily_sector_bnb_df_i = daily_sector_bnb_result.interactive() selected_range_df = daily_sector_bnb_df_i[daily_sector_bnb_df_i.date.between( dt_range.param.value_start, dt_range.param.value_end)] sector_active_return_plot = selected_range_df.hvplot.line( x='date', y='active_return', by='aggregate_sector', width=1000, height=400, title='投资组合行业每日主动回报').output() # attribution def plot_attribute_by_sector(sector): selected_sector_df = selected_range_df[selected_range_df.aggregate_sector == sector] return selected_sector_df.hvplot.bar(x='date', y=['active_return', 'allocation', 'selection', 'interaction'], title='投资组合行业每日主动收入归因', stacked=True, shared_axes=False).output() sector_attr_plot_tabs = pn.Tabs(*[(sector, plot_attribute_by_sector(sector)) for sector in daily_sector_bnb_result.aggregate_sector.unique()], dymacic=True) # layout sector_view = pn.Column(sector_attr_plot_tabs, sector_active_return_plot) total_view = pn.Column(daily_return_plot, daily_bnb_plot) return pn.Column( pn.Row(dt_range), pn.Row(total_view, sector_view) ) # plot explore def create_hvplot_explore(calculated_b_stock, calculated_p_stock, p_eval_df, sector_eval_df, attribution_result_df, s_attribution_result_df): options = ['calculated_b_stock', 'calculated_p_stock', 'p_eval_df', 'sector_eval_df', 'attribution_result_df', 's_attribution_result_df'] name_to_df = { 'calculated_b_stock': calculated_b_stock, 'calculated_p_stock': calculated_p_stock, 'p_eval_df': p_eval_df, 'sector_eval_df': sector_eval_df, 'attribution_result_df': attribution_result_df, 's_attribution_result_df': s_attribution_result_df } selector = pn.widgets.Select( name='Select', options=options, value=options[0]) def create_exploer(name): df = name_to_df[name] explorer = hvplot.explorer(df) def plot_code(**kwargs): code = f'```python\n{explorer.plot_code()}\n```' return pn.pane.Markdown(code, sizing_mode='stretch_width') pn.Column( explorer, '**Code**:', pn.bind(plot_code, **explorer.param.objects()) ) return explorer def create_perspective(name): df = name_to_df[name] return pn.pane.Perspective(df, columns=list(df.columns), width=1500, height=800) perspective = pn.bind(create_perspective, name=selector) exploer = pn.bind(create_exploer, name=selector) exploer_component = pn.Column(selector, exploer, perspective) return exploer_component class TotalReturnCard(Viewer): value = param.Range(doc="A numeric range.") width = param.Integer(default=300) start_date = param.Parameter() end_date = param.Parameter() eval_df = param.Parameter() b_stock_df = param.Parameter() p_stock_df = param.Parameter() selected_df = param.Parameter() plot_pane = param.Parameter() report = param.Parameter() def format_number(self, num): return f'{round(num * 100, 2)}%' def get_color(self, num): return 'green' if num >= 0 else 'red' def create_report(self): # Calculate the total return and risk result = processing.calculate_norm_return( self.eval_df, self.start_date, self.end_date) most_recent_row = result.tail(1) active_return = most_recent_row.active_return.values[0] tracking_error = result.active_return.std() * np.sqrt(252) total_return = most_recent_row.return_p.values[0] mkt_cap = most_recent_row.mkt_cap.values[0] risk = result['return_b'].std() * np.sqrt(252) # Calculate the total attribution attributes = processing.calculate_attributes_between_dates( self.start_date, self.end_date, self.p_stock_df, self.b_stock_df) total_attributes = attributes.aggregate({ 'interaction': 'sum', 'allocation': 'sum', 'selection': 'sum', 'active_return': 'sum', 'notional_return': 'sum' }) active_return_from_stock = total_attributes.active_return notional_return = total_attributes.notional_return interaction = total_attributes.interaction allocation = total_attributes.allocation selection = total_attributes.selection # Create a function for text report report = f"""
总市值

¥{round(mkt_cap,2)}

{self.format_number(total_return)}

追踪误差

{self.format_number(tracking_error)}

风险

{self.format_number(risk)}

归因

""" return report def create_plot(self): result = processing.calculate_norm_return( self.eval_df, self.start_date, self.end_date) fig = px.line(result, x="date", y=['return_p', 'return_b']) fig.update_traces(mode="lines+markers", marker=dict(size=5), line=dict(width=2)) fig.update_layout(styling.plot_layout) colname_to_name = { 'return_p': 'Portfolio回报', 'return_b': 'benchmark回报' } fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), legendgroup=colname_to_name.get( t.name, t.name), hovertemplate=t.hovertemplate.replace( t.name, colname_to_name.get(t.name, t.name)) )) # fig.layout.autosize = True return fig.to_dict() @param.depends('start_date', 'end_date', 'eval_df', watch=True) def update(self): fig = self.create_plot() report = self.create_report() self.report.object = report self.plot_pane.object = fig def __init__(self, eval_df, b_stock_df, p_stock_df, **params): self.eval_df = eval_df self.b_stock_df = b_stock_df self.p_stock_df = p_stock_df self._date_range = pn.widgets.DateRangeSlider( start=eval_df.date.min(), end=eval_df.date.max(), value=(eval_df.date.max() - timedelta(days=7), eval_df.date.max()) ) self.start_date = self._date_range.value_start self.end_date = self._date_range.value_end self.plot_pane = pn.pane.Plotly( self.create_plot(), sizing_mode='stretch_width') self.report = pn.pane.HTML( self.create_report(), sizing_mode='stretch_width') super().__init__(**params) self._sync_widgets() def __panel__(self): self._layout = pn.Card(self._date_range, self.report, self.plot_pane, width=500, header=pn.Row(pn.pane.Str('投资组合总结'), pn.widgets.TooltipIcon(value=description.summary_card))) return self._layout @param.depends('start_date', 'end_date', 'eval_df', watch=True) def update_selected_df(self): self.selected_df = self.eval_df[self.eval_df.date.between( self.start_date, self.end_date )] @param.depends('value', 'width', watch=True) def _sync_widgets(self): pass @param.depends('_date_range.value', watch=True) def _sync_params(self): self.start_date = self._date_range.value[0] self.end_date = self._date_range.value[1] class DrawDownCard(Viewer): def __init__(self, eval_df, calculated_p_stock, calculated_b_stock, **params): self.eval_df = eval_df self.calculated_p_stock = calculated_p_stock self.calculated_b_stock = calculated_b_stock self.drawdown_plot = pn.pane.Plotly(self.plot_drawdown()) super().__init__(**params) def calculate_drawdown(self): df = self.eval_df.copy() # rolling max return df['rolling_max_return_p'] = df['portfolio_return_p'].rolling( window=len(df), min_periods=1).max() # calculate drawdown df['drawn_down'] = abs( (1 + df.portfolio_return_p) / (1 + df.rolling_max_return_p) - 1) return df def plot_drawdown(self): df = self.calculate_drawdown() fig = px.line(df, x="date", y=['drawn_down']) # add scatter to represetn new high new_height_pnl = df[df.portfolio_return_p == df.rolling_max_return_p] fig.add_trace(go.Scatter( x=new_height_pnl['date'], y=new_height_pnl['drawn_down'], mode='markers', name='新的最高总回报')) colname_to_name = { 'drawn_down': '回撤' } fig.update_layout(styling.plot_layout) fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), legendgroup=colname_to_name.get( t.name, t.name), # hovertemplate=t.hovertemplate.replace( # t.name, colname_to_name.get(t.name, t.name)) )) return fig def update(self): pass def __panel__(self): self._layout = pn.Card(self.drawdown_plot, header=pn.Row(pn.pane.Str('回撤分析')), width=500 ) return self._layout class HistReturnCard(Viewer): return_barplot = param.Parameterized() calculated_b_stock = param.Parameterized() calculated_p_stock = param.Parameterized() select_resolution = param.ObjectSelector( default='每月回报', objects=['每日回报', '每周回报', '每月回报', '每年回报']) def _calculate_return(self, df, freq): # start on tuesday, end on monday grouped = df.groupby(pd.Grouper(key='time', freq=freq)) agg_df = grouped.agg({'weighted_log_return': 'sum'}) # time indicating the last end of the week agg_df['time'] = agg_df.index # convert cumulative log return to percentage return agg_df['return'] = np.exp(agg_df['weighted_log_return']) - 1 # return agg_df return agg_df.reset_index(drop=True) def update_aggregate_df(self): freq = None if self.select_resolution == "每日回报": freq = "D" elif self.select_resolution == "每月回报": freq = 'M' elif self.select_resolution == "每年回报": freq = 'Y' elif self.select_resolution == "每周回报": freq = 'W-MON' p_return = self._calculate_return(self.calculated_p_stock, freq) b_return = self._calculate_return(self.calculated_b_stock, freq) merge_df = pd.merge(p_return, b_return, on='time', how='outer', suffixes=('_p', '_b')) return merge_df def create_attributes_barplot(self): self.attribute_df = self._update_attributes_df() fig = px.bar(self.attribute_df, x='period_str', y=[ 'allocation', 'selection', 'interaction', 'notional_active_return', 'active_return']) colname_to_name = { 'allocation': '分配', 'selection': '选择', 'interaction': '交互', 'notional_active_return': '名义主动回报', 'active_return': '实际主动回报' } fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), legendgroup=colname_to_name.get( t.name, t.name), hovertemplate=t.hovertemplate.replace( t.name, colname_to_name.get(t.name, t.name)) )) fig.update_layout(barmode='group', title='主动回报归因', bargap=0.0, bargroupgap=0.0) fig.update_layout(**styling.plot_layout) fig.update_traces(**styling.barplot_trace) return fig.to_dict() def create_return_barplot(self): self.agg_df = self.update_aggregate_df() fig = px.bar(self.agg_df, x='time', y=[ 'return_p', 'return_b'], barmode='overlay', title='周期回报', ) # update legend colname_to_name = { 'portfolio_return_p': 'portfolio回报率', 'portfolio_return_b': 'benchmark回报率' } fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), legendgroup=colname_to_name.get( t.name, t.name), hovertemplate=t.hovertemplate.replace( t.name, colname_to_name.get(t.name, t.name)) )) fig.update_layout(**styling.plot_layout) fig.update_traces(**styling.barplot_trace) return fig.to_dict() @param.depends('calculated_p_stock', 'calculated_b_stock', 'select_resolution', watch=True) def update(self): return_barplot = self.create_return_barplot() self.return_barplot.object = return_barplot attributes_barplot = self.create_attributes_barplot() self.attribute_barplot.object = attributes_barplot def _update_attributes_df(self): freq = None if self.select_resolution == "每日回报": freq = 'D' elif self.select_resolution == "每月回报": freq = 'M' elif self.select_resolution == "每年回报": freq = 'Y' elif self.select_resolution == "每周回报": freq = 'W-MON' agg_p = processing.aggregate_analytic_df_by_period(self.calculated_p_stock, freq) agg_b = processing.aggregate_analytic_df_by_period(self.calculated_b_stock, freq) bhb_df = processing.calculate_periodic_BHB(agg_p, agg_b) agg_bhb = processing.aggregate_bhb_df(bhb_df) agg_bhb['period_str'] = agg_bhb.index.map(lambda x: str(x)) return agg_bhb def __init__(self, calculated_p_stock, calculated_b_stock, **params): self.calculated_p_stock = calculated_p_stock self.calculated_b_stock = calculated_b_stock self._range_slider = pn.widgets.DateRangeSlider( name='Date Range Slider', start=self.calculated_p_stock.time.min(), end=self.calculated_p_stock.time.max(), value=(self.calculated_p_stock.time.min(), self.calculated_p_stock.time.max()), ) self.return_barplot = pn.pane.Plotly(self.create_return_barplot()) self.attribute_barplot = pn.pane.Plotly( self.create_attributes_barplot()) super().__init__(**params) def __panel__(self): self._layout = pn.Card(pn.Param(self.param.select_resolution, name='选择周期'), self.return_barplot, self.attribute_barplot, width=500, header=pn.Row(pn.pane.Str('周期回报'), pn.widgets.TooltipIcon(value=description.periodic_return_report))) return self._layout class PortfolioComposationCard(Viewer): p_stock_df = param.Parameterized() selected_date = param.Parameterized() def create_cash_position_df(self): aggregate_df = self.p_stock_df.groupby('time', as_index=False).agg({ 'cash': 'sum' }) aggregate_df['type'] = 'portfolio' not_in_portfolio_df = aggregate_df.copy() not_in_portfolio_df['type'] = 'not_in_portfolio' not_in_portfolio_df['cash'] = 1000 # append df aggregate_df = pd.concat([aggregate_df, not_in_portfolio_df]) # sort aggregate_df.sort_values(by=['time'], inplace=True) return aggregate_df[aggregate_df.time.between(self.date_range.value[0], self.date_range.value[1])] @param.depends('p_stock_df', 'date_range.value', watch=True) def update_trend_plot(self): self.trend_plot.object = self.create_trend_plot() def create_trend_plot(self): aggregate_df = self.create_cash_position_df() fig = px.bar(aggregate_df, x='time', y='cash', color='type') fig.update_layout(legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 )) fig.update_traces( marker_line_width=0, selector=dict(type="bar")) fig.update_layout(bargap=0, bargroupgap=0, ) fig.update_layout(uniformtext_minsize=8, uniformtext_mode='hide', yaxis_title=None, xaxis_title=None, margin=dict(l=0, r=0, t=0, b=0)) return fig.to_dict() def create_treemap(self): self.selected_df = self.p_stock_df[self.p_stock_df.time == self.datetime_picker.value] self.selected_df['position'] = '股票' not_in_portfolio_row = pd.DataFrame({ 'display_name': ['闲置'], 'position': ['闲置'], 'aggregate_sector': ['闲置'], 'cash': [100], 'weighted_return': [0] }) df = pd.concat([self.selected_df, not_in_portfolio_row], ignore_index=True) fig = px.treemap(df, # path=[px.Constant('cash_position'), 'position', # 'aggregate_sector', 'display_name'], path=['position', 'aggregate_sector', 'display_name'], values='cash', color='weighted_return', hover_data=['weighted_return', 'cash'], color_continuous_scale='RdBu', color_continuous_midpoint=np.average( df['weighted_return']) ) fig.update_layout(styling.plot_layout) fig.update_layout(coloraxis_colorbar=dict( title="累计加权回报率")) # colname_to_name = { # 'cash_position': '现金分布', # 'portfolio_return': '加权回报', # 'not_in_portfolio': '不在portfolio中', # 'current_weight': '现金', # } # fig.for_each_trace(lambda t: t.update(name=colname_to_name.get(t.name, t.name), # hovertemplate=t.hovertemplate.replace( # t.name, colname_to_name.get(t.name, t.name)) # )) return fig.to_dict() def __init__(self, analytic_df, **params): self.p_stock_df = analytic_df self.p_stock_df = processing.calculate_weighted_return(self.p_stock_df, start=self.p_stock_df.time.min(), end=self.p_stock_df.time.max()) # convert to datetime to date enabled_dates = [time.date() for time in self.p_stock_df.time.unique()] self.datetime_picker = pn.widgets.DatetimePicker(name='选择某日资金分布', start=self.p_stock_df.time.min(), end=self.p_stock_df.time.max(), value=self.p_stock_df.time.max(), enabled_dates=enabled_dates, ) self.date_range = pn.widgets.DateRangeSlider(name='选择资金分布走势区间', start=self.p_stock_df.time.min(), end=self.p_stock_df.time.max(), value=(self.p_stock_df.time.min( ), self.p_stock_df.time.max()), ) self.tree_plot = pn.pane.Plotly(self.create_treemap()) self.trend_plot = pn.pane.Plotly(self.create_trend_plot()) # calculate money position super().__init__(**params) def __panel__(self): self._layout = pn.Card(self.datetime_picker, self.tree_plot, self.date_range, self.trend_plot, width=500, header=pn.pane.Str('资金分布')) return self._layout @param.depends('datetime_picker.value', 'p_stock_df', watch=True) def update(self): tree_plot = self.create_treemap() self.tree_plot.object = tree_plot class BestAndWorstStocks(Viewer): start_date = param.Parameter() end_date = param.Parameter() hidden_col = [ 'index', 'open', 'high', 'low', 'close', 'volume', 'money', 'pct', 'sector', 'aggregate_sector', 'ave_price', 'weight', 'ini_w', 'name', 'pnl' ] forzen_columns = ['display_name', 'return', 'cum_pnl', 'shares'] description = "股票表现排名" tooltip = "在一个时间窗口中累计盈利最高和最低的股票,包括已经卖出的股票,如果表格的日期小于窗口的结束时间代表已经卖出" def create_tabulator(self, df): col_title_map = { 'display_name': '股票名称', 'ticker': '股票代码', 'time': '日期', 'return': '回报率', 'sector': '行业', 'shares': '持仓', 'cash': '现金', 'cum_pnl': '累计盈利', } return pn.widgets.Tabulator(df, sizing_mode='stretch_width', hidden_columns=self.hidden_col, frozen_columns=self.forzen_columns, titles=col_title_map ) @param.depends('start_date', 'end_date', watch=True) def update(self): result_df = self.get_processed_df() self.best_5_tabulator.value = result_df.head(5) self.worst_5_tabulator.value = result_df.tail(5) def _get_cum_return(self, df): '''return a df contain cumulative return at the end date''' result_df = processing.calcualte_return(df=df, start=self.start_date, end=self.end_date) grouped = result_df.groupby('ticker') last_row = result_df.loc[grouped.time.idxmax()] return last_row def get_processed_df(self): ''' calculate attributes and return a sorted dataframe on weighted return ''' df = processing.calculate_cum_pnl(self.analytic_df, start=self.start_date, end=self.end_date) df = self._get_cum_return(df) return df.sort_values(by='cum_pnl', ascending=False) def __init__(self, analytic_df, **params): self.analytic_df = analytic_df self._date_range = pn.widgets.DateRangeSlider( name='选择计算回报的时间区间', start=self.analytic_df.time.min(), end=self.analytic_df.time.max(), value=(self.analytic_df.time.max() - timedelta(days=7), self.analytic_df.time.max()) ) self.start_date = self._date_range.value_start self.end_date = self._date_range.value_end result_df = self.get_processed_df() self.best_5_tabulator = self.create_tabulator(result_df.head(5)) self.worst_5_tabulator = self.create_tabulator(result_df.tail(5)) super().__init__(**params) @param.depends('_date_range.value', watch=True) def _sync_params(self): self.start_date = self._date_range.value[0] self.end_date = self._date_range.value[1] # print('update range...') def __panel__(self): self._layout = pn.Card(self._date_range, pn.pane.Str('加权回报率最高回报5只股票'), self.best_5_tabulator, pn.pane.Str('加权回报率最低回报5只股票'), self.worst_5_tabulator, max_width=500, header=pn.Row( pn.pane.Str(self.description), pn.widgets.TooltipIcon(value=self.tooltip) ) ) return self._layout class TopHeader(Viewer): ''' display up to todays' PnL, total return and max drawdown ''' eval_df = param.Parameter() @param.depends('eval_df', watch=True) def update(self): ''' update Pnl, total return and max drawdown when df is updated ''' return def calculation(self): '''calculate PnL, total return and max drawdown''' pnl = self.eval_df[self.eval_df.date == self.eval_df.date.max()].cum_pnl.values[0] total_return = self.eval_df[self.eval_df.date == self.eval_df.date.max()].portfolio_return_p.values[0] # max draw down self.eval_df['rolling_max_return'] = self.eval_df.portfolio_return_p.rolling( window=len(self.eval_df), min_periods=1).max() self.eval_df.draw_down = abs( (1 + self.eval_df.portfolio_return_p) / (1 + self.eval_df.rolling_max_return) - 1 ) max_drawdown = self.eval_df.draw_down.max() return pnl, total_return, max_drawdown def create_report(self, pnl, total_return, max_drawdown): return pn.FlexBox( f"PnL:{round(pnl,2)}¥", f"回报:{round(total_return * 100,2)}%", f'最大回撤:{round(max_drawdown * 100,2)}%', justify_content='space-evenly') def __init__(self, eval_df, **params): self.eval_df = eval_df pnl, total_return, max_drawdown = self.calculation() self.report = self.create_report(pnl, total_return, max_drawdown) super().__init__(**params) def __panel__(self): self._layout = pn.Card(self.report, sizing_mode='stretch_width') return self._layout