# %% # load portfolio import time import panel as pn from utils import create_stocks_entry_from_excel, style_number, create_share_changes_report import datetime as dt from bokeh.models.widgets.tables import CheckboxEditor, NumberEditor, SelectEditor from utils import time_in_beijing import api import pandas as pd from sqlalchemy import create_engine import table_schema import db_operation as db import pipeline from sidebar import SideNavBar db_url = 'sqlite:///instance/local.db' pn.extension() pn.extension('tabulator') pn.extension('plotly') pn.extension('floatpanel') pn.extension(notifications=True) # %% # the width of iphone se MIN_COMPONENT_WIDTH = 375 MAX_COMPONENT_WIDTH = 600 def create_portfolio_stream_entry(stocks, portfolio_df): # create entry with ticker, date and details stream_entry = pd.DataFrame(stocks) # if have duplicate ticker raise error if stream_entry.ticker.duplicated().any(): raise Exception('VALIDATION_ERROR: plase remove duplicate ticker') # raise error if portfolio already have same ticker with same date date = stream_entry.date[0] selected_df = portfolio_df[portfolio_df.date == date] tickers = stream_entry.ticker.tolist() filter_out_ticker = selected_df[selected_df.ticker.isin(tickers)].ticker if len(filter_out_ticker) > 0: raise Exception(f'VALIDATION_ERROR: {" ".join(filter_out_ticker)}在{date}已存在,请先删除再添加') stream_entry = pipeline.add_details_to_stock_df(stream_entry) # calculate share changes, use tmp_df to save intermediate result tmp_df = pd.concat([stream_entry, portfolio_df], ignore_index=True, join='outer') tmp_df.sort_values(by='date', inplace=True) tmp_df['share_changes'] = tmp_df.groupby('ticker').shares.diff() # for ticker previous not existing use shares as share_changes tmp_df.share_changes = tmp_df.share_changes.fillna(tmp_df.shares) # add share_chagnes back to stream_entry stream_entry = stream_entry.merge( tmp_df[['ticker','date','share_changes','change_saved']], on=['ticker','date'], how='left' ) # indicate not saved stream_entry['change_saved'] = False # indicate sync to db stream_entry['sync_to_db'] = True # fill empty ave_price with latest closing price # TODO: for now all ave_price is fetching from api ticker = stream_entry.ticker.tolist() close_price = api.fetch_stocks_price(security=ticker, end_date=date, frequency='minute', count=1)[['ticker','close']] close_price.rename(columns={'close':'ave_price'}, inplace=True) stream_entry = stream_entry.merge(close_price, on='ticker', how='left') # calculate cash(mkt_value) and weight stream_entry['cash'] = stream_entry.shares * stream_entry.ave_price stream_entry['weight'] = stream_entry.cash / stream_entry.cash.sum() return stream_entry def notify(func): def wrapper(*args, **kwargs): try: notifications = func(*args, **kwargs) if notifications is not None: for notification in notifications: duration = notification.get('duration', 4000) if notification['type'] == 'success': pn.state.notifications.success( notification['description'], duration=duration) elif notification['type'] == 'error': pn.state.notifications.error( notification['description'], duration=duration) elif notification['type'] == 'warning': pn.state.notifications.warning( notification['description'], duration=duration) elif notification['type'] == 'info': pn.state.notifications.info( notification['description'], duration=duration) else: raise Exception('unknow notification type') except Exception as e: pn.state.notifications.error(str(e), duration=0) return wrapper def app(): # load portfolio df p_profile = db.get_all_portfolio_profile() p_profile.sort_values(by=['date'], inplace=True) # change in shares for same ticker p_profile['share_changes'] = p_profile.groupby(['ticker'])[ 'shares'].diff() p_profile['share_changes'] = p_profile['share_changes'].fillna( p_profile['shares']) # indicate if change is saved p_profile['change_saved'] = True p_profile['sync_to_db'] = True # get all stocks ticker for auto fill stock_details = db.get_all_stocks() all_tickers = stock_details.ticker.to_list() # get most recent portfolio for auto generate entry most_recent_portfolio = None if len(p_profile) == 0: most_recent_portfolio = p_profile else: most_recent_portfolio = p_profile[p_profile.date == max( p_profile.date)] # create portfolio table tabulator hidden_column = ['index', 'sector', 'name'] col_to_titles = {'ticker': '证劵代码', 'weight': '权重', 'date': '时间', 'aggregate_sector': '分类', 'display_name': '名称', 'shares': '持仓', 'change_saved': '已同步', 'sync_to_db': '存入', 'share_changes': '持仓变化', 'cash': '现金', 'ave_price': '平均成本', } # styling tabulator_formatters = { # 'float': {'type': 'progress', 'max': 10}, 'sync_to_db': {'type': 'tickCross'}, 'change_saved': {'type': 'tickCross'}, } bokeh_editors = { 'ticker': SelectEditor(options=all_tickers), 'shares': NumberEditor(), } # frozen_columns = ['date','ticker','display_name','shares','sync_to_db','change_saved'] portfolio_tabulator = pn.widgets.Tabulator(p_profile, layout='fit_columns', height_policy='max', # width=1000, groupby=['date'], hidden_columns=hidden_column, titles=col_to_titles, formatters=tabulator_formatters, editors=bokeh_editors, pagination='remote', # page_size=25, # frozen_columns=frozen_columns ) portfolio_tabulator.style.apply(style_number, subset=['share_changes']) # history tabulator history_dt = p_profile[['date', 'sync_to_db', 'change_saved']].copy() history_dt = history_dt.groupby('date').agg({ "sync_to_db": lambda x: all(x), 'change_saved': lambda x: all(x), }) history_dt['date'] = history_dt.index history_dt.reset_index(drop=True, inplace=True) history_tabulator = pn.widgets.Tabulator(history_dt, formatters=tabulator_formatters, buttons={'detail': "📋"}, hidden_columns=hidden_column, height_policy='max', titles=col_to_titles) # perform calculation btn force_recalculate_btn = pn.widgets.Button( name='重新计算', button_type='primary', sizing_mode='stretch_width') # create component new_stock_btn = pn.widgets.Button( name='增加新股票', button_type='primary', sizing_mode='stretch_width') preview_btn = pn.widgets.Button( name='预览', button_type='primary', sizing_mode='stretch_width') file_input = pn.widgets.FileInput( accept='.xlsx', sizing_mode='stretch_width') # strip timezone info datetime_picker = pn.widgets.DatetimePicker(name='Datetime Picker', value=time_in_beijing().replace(tzinfo=None), sizing_mode='stretch_width') upload_to_db_btn = pn.widgets.Button( name='保存到数据库', button_type='warning', sizing_mode='stretch_width') # emtpy stock_column to display new entires stock_column = pn.Column( width_policy='max', height_policy='max', scroll=True) # floating window row floating_windows = pn.Row() @notify def _update_history_tabulator(action, df=None): '''handle update history tabulator''' # handle add new entires to view if action == 'append' and df is not None: index = history_tabulator.value[history_tabulator.value.date == df.date[0]].index.to_list() if len(index) == 0: # drop duplicate date in df df = df.drop_duplicates(subset='date', keep='first') # if not in history tabulator add new entry selected_df = df[['date', 'sync_to_db', 'change_saved']] # if stream to empty tabulator, index will be mismatched if (len(history_tabulator.value) == 0): history_tabulator.value = selected_df else: history_tabulator.stream( df[['date', 'sync_to_db', 'change_saved']], follow=True) else: # if in history tabulator patch change_saved to false history_tabulator.patch({ 'change_saved': [(index[0], False)] }, as_index=True) yield {'type': 'warning', 'description': '添加成功请保存'} # hanlde editing portoflio tabulator elif action == 'edit': # mark synced_to_db to false when entry is edited date = df index = history_tabulator.value[history_tabulator.value.date == date].index.to_list( ) # check if all change saved all_saved = all( portfolio_tabulator.value[portfolio_tabulator.value.date == date]['change_saved']) history_tabulator.patch({ 'change_saved': [(index[0], all_saved)] }, as_index=True) yield {'type': 'warning', 'description': '修改成功请保存'} # handle sync to db elif action == 'sync': # patch all synced_to_db to true indices = history_tabulator.value[ ~history_tabulator.value['change_saved']].index.to_list() # add an offset to address the issue when df is empty index start from 1 history_tabulator.patch({ 'change_saved': [(index, True) for index in indices] }, as_index=True) yield {'type': 'success', 'description': '同步成功以更新'} @notify def delete_stock(row): '''delete a stock entry''' stock_column.remove(row) yield {'type': 'success', 'description': '删除成功'} def create_new_stock_entry(ticker=None, shares=0, ave_price=0.0, disable_ticker=True): '''create a new new stock entry''' delete_btn = pn.widgets.Button( name='❌', width=50, height=60, sizing_mode='fixed') ticker_selector = pn.widgets.AutocompleteInput( value=ticker, name='证劵代码', sizing_mode='stretch_width', options=all_tickers, placeholder='input ticker', ) share_input = pn.widgets.IntInput( name='持仓', value=shares, step=1, start=0, sizing_mode='stretch_width') row = pn.Row( delete_btn, ticker_selector, share_input, width_policy='max', ) delete_btn.on_click(lambda _, row=row: delete_stock(row)) return row def update_stock_column(xlsx_file=None): stock_entries = [] if xlsx_file is None: for ticker, shares in most_recent_portfolio[['ticker', 'shares']].values: stock_entries.append(create_new_stock_entry( ticker=ticker, shares=shares)) # create from xlsx_file else: stocks_list = create_stocks_entry_from_excel(xlsx_file) for entry in stocks_list: stock_entries.append(create_new_stock_entry( ave_price=entry['mean_price'], ticker=entry['ticker'], shares=entry['shares'])) # modify time datetime_picker.value = stocks_list[0]['date'] file_input.value = None # update stock_column.clear() stock_column.extend(stock_entries) @notify def update_profile_tabulator(e): '''add all stocks entry to ui''' # TODO: make this idempotent new_entries = [dict(ticker=row[1].value, shares=row[2].value, date=datetime_picker.value) for row in stock_column] try: new_profile = create_portfolio_stream_entry(new_entries, portfolio_tabulator.value) # update history tabulator _update_history_tabulator('append', new_profile) _stream_to_portfolio_tabulator(new_profile) yield {'type': 'success', 'description': f'已添加{len(new_entries)}条新股票,请保存'} except Exception as e: raise Exception(e) def add_new_stock(e): row = create_new_stock_entry() stock_column.append(row) @notify def _stream_to_portfolio_tabulator(entry): # not using stream because it will cause index mismatch if len(portfolio_tabulator.value) == 0: portfolio_tabulator.value = entry else: portfolio_tabulator.stream(entry, follow=True) yield {'type': 'success', 'description': f'添加{len(entry)}条股票'} def handle_click_on_history_tabulator(e): '''handle click click on history tabulator''' if e.column == 'detail': row_index = e.row date = history_tabulator.value.iloc[row_index]['date'] date_str = date.strftime("%Y-%m-%d : %H:%M:%S") record_df = portfolio_tabulator.value[portfolio_tabulator.value.date == date] floatpanel = pn.layout.FloatPanel(create_share_changes_report( record_df), name=date_str, margin=20, position='right-top') floating_windows.append(floatpanel) @notify def handle_sync_to_db(e): # TODO: change to use profile df instead, because tabulator might not contain all entry (currently have no problem) '''sync selected entry to db''' new_portfolio = portfolio_tabulator.value # only update selected row to db selected_portfolio = new_portfolio[new_portfolio['sync_to_db']] try: pipeline.update_portfolio_profile_to_db(selected_portfolio) except Exception as e: raise Exception(f'同步到数据库失败,错误信息:{e}') # update history tabulator and portfolio tabulator # mark changes as saved indices = selected_portfolio[~selected_portfolio['change_saved']].index.to_list( ) portfolio_tabulator.patch({ 'change_saved': [(index, True) for index in indices] }, as_index=True) _update_history_tabulator('sync') yield {'type': 'success', 'description': '保存成功'} def handle_edit_portfolio_tabulator(e): date = portfolio_tabulator.value.iloc[e.row]['date'] _update_history_tabulator(df=date, action='edit') def hanlde_edit_history_tabulator(e): # toggle sync on all entry on a date if e.column == 'sync_to_db': date = history_tabulator.value.iloc[e.row]['date'] # index of all entry on portfolio tabulator indices = portfolio_tabulator.value[portfolio_tabulator.value.date.between( date.replace(microsecond=0), date.replace(microsecond=999999))].index.to_list() # patch all indices on sync_to_db to e.value portfolio_tabulator.patch({ 'sync_to_db': [(index, e.value) for index in indices] }, as_index=True) @notify def handle_force_recalculation(e): try: yield {'type': 'info', 'description': "开始重新计算可能会花费1分钟以上", 'duration': 0} # fill missing benchmark profile yield {'type': 'info', 'description': "正在获取benchmark数据", 'duration': 0} pipeline.left_fill_benchmark_profile() # fill missing stock price yield {'type': 'info', 'description': "正在更新股票数据", 'duration': 0} pipeline.left_fill_stocks_price() # recalculate yield {'type': 'info', 'description': "正在重新计算权重", 'duration': 0} pipeline.batch_processing() yield {'type': 'info', 'description': '完成✅', 'duration': 0} except Exception as e: raise Exception(f'重新计算失败,错误信息:{e}') # register event handler upload_to_db_btn.on_click(handle_sync_to_db) preview_btn.on_click(update_profile_tabulator) new_stock_btn.on_click(add_new_stock) history_tabulator.on_click( handle_click_on_history_tabulator ) force_recalculate_btn.on_click(handle_force_recalculation) portfolio_tabulator.on_edit(handle_edit_portfolio_tabulator) history_tabulator.on_edit(hanlde_edit_history_tabulator) # create handler component to add to panel so can be listened to upload_xlsx_handler = pn.bind(update_stock_column, file_input) # layout editor_widget = pn.Column(floating_windows, datetime_picker, upload_to_db_btn, new_stock_btn, preview_btn, force_recalculate_btn, file_input, pn.widgets.TooltipIcon( value="用于更新修改持仓信息,默认股票为最近持仓,默认时间为目前北京时间,点击增加新股票按钮,输入股票代码和持仓选择日期(北京时间),点击预览,确认无误后点击保存到数据库。或者直接拖拽excel文件到下方上传按钮"), stock_column, width=MIN_COMPONENT_WIDTH, height_policy='max') # tooltip toolTip2 = pn.widgets.TooltipIcon( value="持仓总结,每一行的已同步到数据库代表所做更改是否已同步到数据库,点击保存到数据库将上传所有更改。点击右侧📋按钮查看详细持仓变化报告") return pn.Column( pn.Row( pn.layout.HSpacer(), editor_widget, pn.Spacer(width=10), history_tabulator, pn.Spacer(width=10), portfolio_tabulator, pn.Spacer(width=10), upload_xlsx_handler, pn.layout.HSpacer(), height=1500, # width_policy='max', height_policy='max') # sizing_mode='stretch_both', )) # app template = pn.template.FastListTemplate( title='portfolio编辑', sidebar_width=200, collapsed_sidebar=True) template.sidebar.append(SideNavBar()) template.main.append(app()) template.servable()