huggingface112 commited on
Commit
29f1ee3
1 Parent(s): 61dcaa7

enhanced async, and notification for debugging

Browse files
Files changed (5) hide show
  1. backgroundTask.py +5 -1
  2. index_page.py +30 -25
  3. pipeline.py +14 -82
  4. portfolioEditingPage.py +179 -92
  5. processing.py +6 -0
backgroundTask.py CHANGED
@@ -1,5 +1,9 @@
 
 
1
  import sys
2
- sys.path.append('/Users/lamonkey/Desktop/portfolio_management')
 
 
3
  from pipeline import daily_update
4
  import panel as pn
5
  from datetime import timedelta
 
1
+ # import sys
2
+ # sys.path.append('/Users/lamonkey/Desktop/portfolio_management')
3
  import sys
4
+ # add pipeline to path
5
+ import os
6
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__))))
7
  from pipeline import daily_update
8
  import panel as pn
9
  from datetime import timedelta
index_page.py CHANGED
@@ -14,37 +14,42 @@ engine = create_engine(db_url)
14
 
15
  analytic_p = db.get_portfolio_analytic_df()
16
  analytic_b = db.get_benchmark_analytic_df()
 
 
 
17
 
18
- stock_overview = appComponents.BestAndWorstStocks(
19
- analytic_df=analytic_p)
20
- composation_card = appComponents.PortfolioComposationCard(
21
- analytic_p)
22
- monthly_return_card = appComponents.HistReturnCard(
23
- calculated_p_stock=analytic_p, calculated_b_stock=analytic_b)
24
- total_return_card = appComponents.TotalReturnCard(name='Range',
25
- b_stock_df=analytic_b,
26
- p_stock_df=analytic_p,
27
- value=(0, 20))
28
- drawdown_card = appComponents.DrawDownCard(
29
- calculated_p_stock=analytic_p)
30
-
31
- top_header = appComponents.TopHeader(
32
- eval_df=analytic_p,
33
- )
34
 
35
- template = pn.template.FastListTemplate(
36
- title="Portfolio一览",
37
- # sidebar=[freq, phase],
38
- )
39
- template.sidebar.append(SideNavBar())
40
- # template.main.extend([drawdown_card, stock_overview, composation_card, monthly_return_card, total_return_card])
41
- template.main.extend(
42
- [pn.Row(top_header),
43
- pn.Row(
 
 
 
 
 
 
 
 
 
 
 
44
  pn.Column(monthly_return_card, stock_overview,
45
  width=500, margin=(10, 10, 10, 10)),
46
  pn.Column(total_return_card, drawdown_card, margin=(10, 10, 10, 10)),
47
  pn.Column(composation_card, margin=(10, 10, 10, 10)),
48
  )]
 
 
 
 
49
  )
 
 
 
50
  template.servable()
 
14
 
15
  analytic_p = db.get_portfolio_analytic_df()
16
  analytic_b = db.get_benchmark_analytic_df()
17
+ if len(analytic_p) == 0:
18
+ app = [
19
+ pn.pane.Str('No portfolio data found, please update portfolio first')]
20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
+ else:
23
+
24
+ stock_overview = appComponents.BestAndWorstStocks(
25
+ analytic_df=analytic_p)
26
+ composation_card = appComponents.PortfolioComposationCard(
27
+ analytic_p)
28
+ monthly_return_card = appComponents.HistReturnCard(
29
+ calculated_p_stock=analytic_p, calculated_b_stock=analytic_b)
30
+ total_return_card = appComponents.TotalReturnCard(name='Range',
31
+ b_stock_df=analytic_b,
32
+ p_stock_df=analytic_p,
33
+ value=(0, 20))
34
+ drawdown_card = appComponents.DrawDownCard(
35
+ calculated_p_stock=analytic_p)
36
+
37
+ top_header = appComponents.TopHeader(
38
+ eval_df=analytic_p,
39
+ )
40
+ app = [pn.Row(top_header),
41
+ pn.Row(
42
  pn.Column(monthly_return_card, stock_overview,
43
  width=500, margin=(10, 10, 10, 10)),
44
  pn.Column(total_return_card, drawdown_card, margin=(10, 10, 10, 10)),
45
  pn.Column(composation_card, margin=(10, 10, 10, 10)),
46
  )]
47
+
48
+ template = pn.template.FastListTemplate(
49
+ title="Portfolio一览",
50
+ # sidebar=[freq, phase],
51
  )
52
+ template.sidebar.append(SideNavBar())
53
+ # template.main.extend([drawdown_card, stock_overview, composation_card, monthly_return_card, total_return_card])
54
+ template.main.extend(app)
55
  template.servable()
pipeline.py CHANGED
@@ -9,11 +9,13 @@ import table_schema as ts
9
  import db_operation as db
10
  from log import Log
11
  import processing
 
12
  # import settings
13
  # fetch new stock price
14
  stock_price_stream = Stream()
15
- event = Stream()
16
-
 
17
  # log
18
  log = Log('instance/log.json')
19
  # save stock price to db
@@ -27,71 +29,6 @@ log = Log('instance/log.json')
27
  # run using --setup
28
  db_url = 'sqlite:///instance/local.db'
29
 
30
-
31
- def create_portfolio_profile_df(stocks: list[dict]):
32
- profile_df = pd.DataFrame(stocks)
33
- profile_df = add_details_to_stock_df(profile_df)
34
-
35
- # check if there is duplicate ticker
36
- if profile_df.ticker.duplicated().any():
37
- raise Exception(
38
- 'VALIDATION ERROR: cannot have duplicate ticker with the same date')
39
-
40
- return profile_df
41
-
42
-
43
- def need_to_update(table_name: str, freq: dt.datetime):
44
- '''check table with table_name need to update
45
- Return
46
- ------
47
- None if no need to update
48
- (start_date, end_date, freq) if need to update
49
- '''
50
- with create_engine(db_url).connect() as conn:
51
- max_date = conn.execute(
52
- text(f"SELECT MAX(date) FROM {table_name}")).fetchone()[0]
53
- max_date = utils.convert_string_to_datetime(max_date)
54
- current_time = utils.time_in_beijing()
55
- if current_time - max_date > freq:
56
- return (max_date + freq, current_time, freq)
57
- else:
58
- return None
59
-
60
-
61
- def need_to_forward_fill_stock_price():
62
- '''
63
- check if need to pull new stock price from jq
64
-
65
- RETURN
66
- ------
67
- (min_date, max_date) : if update is needed
68
- the start and end date need to fetch new stock price
69
- None if no need to fetch new stock price
70
-
71
- '''
72
- # TODO refactor to db
73
- # get min date from portfolio_profile
74
- with create_engine(db_url).connect() as conn:
75
- table_name = 'portfolio_profile'
76
- query = f"SELECT DISTINCT date FROM {table_name} ORDER BY date ASC LIMIT 1"
77
- df = pd.read_sql(query, con=conn)
78
- df.date = pd.to_datetime(df.date)
79
- min_date = df.date[0]
80
- # TODO refactor to db
81
- # compare to min date from stocks_price
82
- with create_engine(db_url).connect() as conn:
83
- table_name = 'stocks_price'
84
- query = f"SELECT DISTINCT time FROM {table_name} ORDER BY time ASC LIMIT 1"
85
- df = pd.read_sql(query, con=conn)
86
- df.time = pd.to_datetime(df.time)
87
-
88
- # return
89
- if min_date <= df.time[0]:
90
- return (min_date, df.time[0] - dt.timedelta(days=1))
91
- else:
92
- return None
93
-
94
-
95
  def get_most_recent_profile(type):
96
  table_name = 'benchmark_profile' if type == 'benchmark' else 'portfolio_profile'
97
  query = f"SELECT * FROM {table_name} WHERE date = (SELECT MAX(date) FROM {table_name})"
@@ -193,21 +130,15 @@ def update_portfolio_profile_to_db(portfolio_df):
193
 
194
  if (_validate_schema(portfolio_df, ts.PORTFOLIO_TABLE_SCHEMA)):
195
  raise ValueError(
196
- 'portfoliijuo_df has different schema than PORTFOLIO_DB_SCHEMA')
197
-
198
- with create_engine(db_url).connect() as conn:
199
- print("updating profile to db")
200
- try:
201
  portfolio_df[ts.PORTFOLIO_TABLE_SCHEMA.keys()].to_sql(
202
- ts.PORTFOLIO_TABLE, con=conn, if_exists='append', index=False)
203
 
204
- event.emit('update_portfolio')
205
- # handle_portfolio_update()
206
-
207
- return True
208
- except:
209
- return False
210
-
211
 
212
 
213
  def right_fill_stock_price():
@@ -501,11 +432,12 @@ async def run():
501
  # print(stock_df)
502
  stock_price_stream.emit(stock_df)
503
 
504
-
505
  def handle_event(e):
506
  if e == "update_portfolio":
507
  print("handling portfolio update")
508
  handle_portfolio_update()
509
  print("done handling portfolio update")
510
 
511
- event.sink(handle_event)
 
 
9
  import db_operation as db
10
  from log import Log
11
  import processing
12
+ from tornado import gen
13
  # import settings
14
  # fetch new stock price
15
  stock_price_stream = Stream()
16
+ event = Stream(asynchronous=True)
17
+ # display notification to
18
+ notification_queue = Stream()
19
  # log
20
  log = Log('instance/log.json')
21
  # save stock price to db
 
29
  # run using --setup
30
  db_url = 'sqlite:///instance/local.db'
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  def get_most_recent_profile(type):
33
  table_name = 'benchmark_profile' if type == 'benchmark' else 'portfolio_profile'
34
  query = f"SELECT * FROM {table_name} WHERE date = (SELECT MAX(date) FROM {table_name})"
 
130
 
131
  if (_validate_schema(portfolio_df, ts.PORTFOLIO_TABLE_SCHEMA)):
132
  raise ValueError(
133
+ 'portfolio_ef has different schema than PORTFOLIO_DB_SCHEMA')
134
+ try:
135
+ with create_engine(db_url).connect() as conn:
 
 
136
  portfolio_df[ts.PORTFOLIO_TABLE_SCHEMA.keys()].to_sql(
137
+ ts.PORTFOLIO_TABLE, con=conn, if_exists='replace', index=False)
138
 
139
+ except Exception as e:
140
+ print(e)
141
+ raise e
 
 
 
 
142
 
143
 
144
  def right_fill_stock_price():
 
432
  # print(stock_df)
433
  stock_price_stream.emit(stock_df)
434
 
435
+ @gen.coroutine
436
  def handle_event(e):
437
  if e == "update_portfolio":
438
  print("handling portfolio update")
439
  handle_portfolio_update()
440
  print("done handling portfolio update")
441
 
442
+
443
+ event.sink(handle_event)
portfolioEditingPage.py CHANGED
@@ -1,5 +1,6 @@
1
  # %%
2
  # load portfolio
 
3
  import panel as pn
4
  from utils import create_stocks_entry_from_excel, style_number, create_share_changes_report
5
  import datetime as dt
@@ -8,10 +9,11 @@ from utils import time_in_beijing
8
  import api
9
  import pandas as pd
10
  from sqlalchemy import create_engine
11
- from pipeline import update_portfolio_profile_to_db
12
  import table_schema
13
- import pipeline
14
  import db_operation as db
 
15
  from sidebar import SideNavBar
16
  db_url = 'sqlite:///instance/local.db'
17
  pn.extension()
@@ -26,27 +28,88 @@ pn.extension(notifications=True)
26
  MIN_COMPONENT_WIDTH = 375
27
  MAX_COMPONENT_WIDTH = 600
28
 
29
- # %%
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
  def notify(func):
32
  def wrapper(*args, **kwargs):
33
  try:
34
- notification = func(*args, **kwargs)
35
- if notification is not None:
36
- if notification['type'] == 'success':
37
- pn.state.notifications.success(notification['description'], duration=4000)
38
- elif notification['type'] == 'error':
39
- pn.state.notifications.error(notification['description'], duration=4000)
40
- elif notification['type'] == 'warning':
41
- pn.state.notifications.warning(notification['description'], duration=4000)
42
- elif notification['type'] == 'info':
43
- pn.state.notifications.info(notification['description'], duration=4000)
44
- else:
45
- raise Exception('unknow notification type')
 
 
 
 
 
 
 
46
  except Exception as e:
47
- pn.state.notifications.error(str(e), duration=4000)
48
  return wrapper
49
 
 
50
  def app():
51
 
52
  # load portfolio df
@@ -97,12 +160,12 @@ def app():
97
  portfolio_tabulator = pn.widgets.Tabulator(p_profile,
98
  layout='fit_columns',
99
  height_policy='max',
100
- width=1000,
101
  groupby=['date'],
102
  hidden_columns=hidden_column, titles=col_to_titles,
103
  formatters=tabulator_formatters,
104
  editors=bokeh_editors,
105
- pagination='local',
106
  # page_size=25,
107
  # frozen_columns=frozen_columns
108
  )
@@ -123,7 +186,9 @@ def app():
123
  hidden_columns=hidden_column,
124
  height_policy='max',
125
  titles=col_to_titles)
126
-
 
 
127
  # create component
128
  new_stock_btn = pn.widgets.Button(
129
  name='增加新股票', button_type='primary', sizing_mode='stretch_width')
@@ -142,6 +207,7 @@ def app():
142
  width_policy='max', height_policy='max', scroll=True)
143
  # floating window row
144
  floating_windows = pn.Row()
 
145
  @notify
146
  def _update_history_tabulator(action, df=None):
147
  '''handle update history tabulator'''
@@ -165,18 +231,21 @@ def app():
165
  history_tabulator.patch({
166
  'change_saved': [(index[0], False)]
167
  }, as_index=True)
168
-
169
- return {'type':'warning','description':'添加成功请保存'}
170
  # hanlde editing portoflio tabulator
171
  elif action == 'edit':
172
  # mark synced_to_db to false when entry is edited
173
  date = df
174
  index = history_tabulator.value[history_tabulator.value.date == date].index.to_list(
175
  )
 
 
 
176
  history_tabulator.patch({
177
- 'change_saved': [(index[0], False)]
178
  }, as_index=True)
179
- return {'type':'warning','description':'修改成功请保存'}
180
  # handle sync to db
181
  elif action == 'sync':
182
  # patch all synced_to_db to true
@@ -188,12 +257,13 @@ def app():
188
  history_tabulator.patch({
189
  'change_saved': [(index, True) for index in indices]
190
  }, as_index=True)
191
- return {'type':'success','description':'同步成功以更新'}
 
192
  @notify
193
  def delete_stock(row):
194
  '''delete a stock entry'''
195
  stock_column.remove(row)
196
- return {'type':'success','description':'删除成功'}
197
 
198
  def create_new_stock_entry(ticker=None, shares=0, ave_price=0.0, disable_ticker=True):
199
  '''create a new new stock entry'''
@@ -214,15 +284,10 @@ def app():
214
  start=0,
215
  sizing_mode='stretch_width')
216
 
217
- mean_price_input = pn.widgets.FloatInput(
218
- name='平均成本',
219
- value=ave_price, step=0.01, start=0, sizing_mode='stretch_width')
220
-
221
  row = pn.Row(
222
  delete_btn,
223
  ticker_selector,
224
  share_input,
225
- mean_price_input,
226
  width_policy='max',
227
  )
228
  delete_btn.on_click(lambda _, row=row: delete_stock(row))
@@ -250,48 +315,36 @@ def app():
250
  stock_column.clear()
251
  stock_column.extend(stock_entries)
252
 
 
253
  def update_profile_tabulator(e):
254
  '''add all stocks entry to ui'''
255
- new_entry = [dict(ticker=row[1].value,
256
- shares=row[2].value,
257
- ave_price=row[3].value,
258
- date=datetime_picker.value) for row in stock_column]
259
-
260
- if len(new_entry) == 0:
261
- print("no entry added")
262
- return
263
-
264
- new_profile = pipeline.create_portfolio_profile_df(new_entry)
265
- # calculate share changes
266
- tmp_profile = pd.concat([p_profile, new_profile], ignore_index=True)
267
- tmp_profile.sort_values(by='date', inplace=True)
268
- tmp_profile['share_changes'] = tmp_profile.groupby('ticker')[
269
- 'shares'].diff()
270
- tmp_profile['share_changes'] = tmp_profile['share_changes'].fillna(
271
- tmp_profile['shares'])
272
- new_profile = new_profile.merge(tmp_profile[[
273
- 'ticker', 'date', 'share_changes', 'change_saved']], on=['ticker', 'date'], how='left')
274
- # fill emtpy change_saved to False
275
- new_profile['change_saved'] = new_profile['change_saved'].fillna(False)
276
- new_profile['sync_to_db'] = True
277
- # calculate cash and weight
278
- new_profile['cash'] = new_profile.shares * new_profile.ave_price
279
- new_profile['weight'] = new_profile.cash / new_profile.cash.sum()
280
-
281
- # update history tabulator
282
- _update_history_tabulator('append', new_profile)
283
- _stream_to_portfolio_tabulator(new_profile)
284
 
 
 
 
 
 
 
 
 
285
 
286
  def add_new_stock(e):
287
  row = create_new_stock_entry()
288
  stock_column.append(row)
289
 
 
290
  def _stream_to_portfolio_tabulator(entry):
 
291
  if len(portfolio_tabulator.value) == 0:
292
  portfolio_tabulator.value = entry
 
293
  else:
294
  portfolio_tabulator.stream(entry, follow=True)
 
295
 
296
  def handle_click_on_history_tabulator(e):
297
  '''handle click click on history tabulator'''
@@ -311,25 +364,57 @@ def app():
311
  new_portfolio = portfolio_tabulator.value
312
  # only update selected row to db
313
  selected_portfolio = new_portfolio[new_portfolio['sync_to_db']]
314
- successed = update_portfolio_profile_to_db(selected_portfolio)
 
 
 
315
  # update history tabulator and portfolio tabulator
316
- if successed:
317
- # mark changes as saved
318
- indices = selected_portfolio[~selected_portfolio['change_saved']].index.to_list(
319
- )
320
- portfolio_tabulator.patch({
321
- 'change_saved': [(index, True) for index in indices]
322
- }, as_index=True)
323
- _update_history_tabulator('sync')
324
- return {'type': 'success', 'description': '保存成功'}
325
- else:
326
- return {'type':'error','description':'保存失败'}
327
  def handle_edit_portfolio_tabulator(e):
328
  date = portfolio_tabulator.value.iloc[e.row]['date']
329
  _update_history_tabulator(df=date, action='edit')
330
- print(date)
331
 
332
- # %%
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
333
  # register event handler
334
  upload_to_db_btn.on_click(handle_sync_to_db)
335
  preview_btn.on_click(update_profile_tabulator)
@@ -337,41 +422,43 @@ def app():
337
  history_tabulator.on_click(
338
  handle_click_on_history_tabulator
339
  )
 
340
  portfolio_tabulator.on_edit(handle_edit_portfolio_tabulator)
341
-
342
- # %%
343
  # create handler component to add to panel so can be listened to
344
  upload_xlsx_handler = pn.bind(update_stock_column, file_input)
345
 
346
- # %%
347
  # layout
348
 
349
  editor_widget = pn.Column(floating_windows, datetime_picker, upload_to_db_btn, new_stock_btn,
350
- preview_btn, file_input, pn.widgets.TooltipIcon(
351
  value="用于更新修改持仓信息,默认股票为最近持仓,默认时间为目前北京时间,点击增加新股票按钮,输入股票代码和持仓选择日期(北京时间),点击预览,确认无误后点击保存到数据库。或者直接拖拽excel文件到下方上传按钮"),
352
  stock_column, width=MIN_COMPONENT_WIDTH, height_policy='max')
353
  # tooltip
354
  toolTip2 = pn.widgets.TooltipIcon(
355
  value="持仓总结,每一行的已同步到数据库代表所做更改是否已同步到数据库,点击保存到数据库将上传所有更改。点击右侧📋按钮查看详细持仓变化报告")
356
 
357
- return pn.Row(
358
- pn.layout.HSpacer(),
359
- editor_widget,
360
- pn.Spacer(width=10),
361
- history_tabulator,
362
- pn.Spacer(width=10),
363
- portfolio_tabulator,
364
- pn.Spacer(width=10),
365
- upload_xlsx_handler,
366
- pn.layout.HSpacer(),
367
- height=1500,
368
- # width_policy='max', height_policy='max')
369
- # sizing_mode='stretch_both',
370
- )
 
 
371
 
372
 
373
  # app
374
- template = pn.template.FastListTemplate(title='portfolio编辑')
 
375
  template.sidebar.append(SideNavBar())
376
  template.main.append(app())
377
  template.servable()
 
1
  # %%
2
  # load portfolio
3
+ import time
4
  import panel as pn
5
  from utils import create_stocks_entry_from_excel, style_number, create_share_changes_report
6
  import datetime as dt
 
9
  import api
10
  import pandas as pd
11
  from sqlalchemy import create_engine
12
+
13
  import table_schema
14
+
15
  import db_operation as db
16
+ import pipeline
17
  from sidebar import SideNavBar
18
  db_url = 'sqlite:///instance/local.db'
19
  pn.extension()
 
28
  MIN_COMPONENT_WIDTH = 375
29
  MAX_COMPONENT_WIDTH = 600
30
 
31
+ def create_portfolio_stream_entry(stocks, portfolio_df):
32
+
33
+ # create entry with ticker, date and details
34
+ stream_entry = pd.DataFrame(stocks)
35
+
36
+ # if have duplicate ticker raise error
37
+ if stream_entry.ticker.duplicated().any():
38
+ raise Exception('VALIDATION_ERROR: plase remove duplicate ticker')
39
+
40
+ # raise error if portfolio already have same ticker with same date
41
+ date = stream_entry.date[0]
42
+ selected_df = portfolio_df[portfolio_df.date == date]
43
+ tickers = stream_entry.ticker.tolist()
44
+ filter_out_ticker = selected_df[selected_df.ticker.isin(tickers)].ticker
45
+
46
+ if len(filter_out_ticker) > 0:
47
+ raise Exception(f'VALIDATION_ERROR: {" ".join(filter_out_ticker)}在{date}已存在,请先删除再添加')
48
+
49
+
50
+ stream_entry = pipeline.add_details_to_stock_df(stream_entry)
51
+
52
+ # calculate share changes, use tmp_df to save intermediate result
53
+ tmp_df = pd.concat([stream_entry, portfolio_df], ignore_index=True, join='outer')
54
+ tmp_df.sort_values(by='date', inplace=True)
55
+ tmp_df['share_changes'] = tmp_df.groupby('ticker').shares.diff()
56
+
57
+ # for ticker previous not existing use shares as share_changes
58
+ tmp_df.share_changes = tmp_df.share_changes.fillna(tmp_df.shares)
59
+
60
+ # add share_chagnes back to stream_entry
61
+ stream_entry = stream_entry.merge(
62
+ tmp_df[['ticker','date','share_changes','change_saved']],
63
+ on=['ticker','date'],
64
+ how='left'
65
+ )
66
+
67
+ # indicate not saved
68
+ stream_entry['change_saved'] = False
69
+
70
+ # indicate sync to db
71
+ stream_entry['sync_to_db'] = True
72
+
73
+ # fill empty ave_price with latest closing price
74
+ # TODO: for now all ave_price is fetching from api
75
+ ticker = stream_entry.ticker.tolist()
76
+ close_price = api.fetch_stocks_price(security=ticker, end_date=date, frequency='minute', count=1)[['ticker','close']]
77
+ close_price.rename(columns={'close':'ave_price'}, inplace=True)
78
+ stream_entry = stream_entry.merge(close_price, on='ticker', how='left')
79
+
80
+ # calculate cash(mkt_value) and weight
81
+ stream_entry['cash'] = stream_entry.shares * stream_entry.ave_price
82
+ stream_entry['weight'] = stream_entry.cash / stream_entry.cash.sum()
83
+ return stream_entry
84
+
85
 
86
  def notify(func):
87
  def wrapper(*args, **kwargs):
88
  try:
89
+ notifications = func(*args, **kwargs)
90
+
91
+ if notifications is not None:
92
+ for notification in notifications:
93
+ duration = notification.get('duration', 4000)
94
+ if notification['type'] == 'success':
95
+ pn.state.notifications.success(
96
+ notification['description'], duration=duration)
97
+ elif notification['type'] == 'error':
98
+ pn.state.notifications.error(
99
+ notification['description'], duration=duration)
100
+ elif notification['type'] == 'warning':
101
+ pn.state.notifications.warning(
102
+ notification['description'], duration=duration)
103
+ elif notification['type'] == 'info':
104
+ pn.state.notifications.info(
105
+ notification['description'], duration=duration)
106
+ else:
107
+ raise Exception('unknow notification type')
108
  except Exception as e:
109
+ pn.state.notifications.error(str(e), duration=0)
110
  return wrapper
111
 
112
+
113
  def app():
114
 
115
  # load portfolio df
 
160
  portfolio_tabulator = pn.widgets.Tabulator(p_profile,
161
  layout='fit_columns',
162
  height_policy='max',
163
+ # width=1000,
164
  groupby=['date'],
165
  hidden_columns=hidden_column, titles=col_to_titles,
166
  formatters=tabulator_formatters,
167
  editors=bokeh_editors,
168
+ pagination='remote',
169
  # page_size=25,
170
  # frozen_columns=frozen_columns
171
  )
 
186
  hidden_columns=hidden_column,
187
  height_policy='max',
188
  titles=col_to_titles)
189
+ # perform calculation btn
190
+ force_recalculate_btn = pn.widgets.Button(
191
+ name='重新计算', button_type='primary', sizing_mode='stretch_width')
192
  # create component
193
  new_stock_btn = pn.widgets.Button(
194
  name='增加新股票', button_type='primary', sizing_mode='stretch_width')
 
207
  width_policy='max', height_policy='max', scroll=True)
208
  # floating window row
209
  floating_windows = pn.Row()
210
+
211
  @notify
212
  def _update_history_tabulator(action, df=None):
213
  '''handle update history tabulator'''
 
231
  history_tabulator.patch({
232
  'change_saved': [(index[0], False)]
233
  }, as_index=True)
234
+
235
+ yield {'type': 'warning', 'description': '添加成功请保存'}
236
  # hanlde editing portoflio tabulator
237
  elif action == 'edit':
238
  # mark synced_to_db to false when entry is edited
239
  date = df
240
  index = history_tabulator.value[history_tabulator.value.date == date].index.to_list(
241
  )
242
+ # check if all change saved
243
+ all_saved = all(
244
+ portfolio_tabulator.value[portfolio_tabulator.value.date == date]['change_saved'])
245
  history_tabulator.patch({
246
+ 'change_saved': [(index[0], all_saved)]
247
  }, as_index=True)
248
+ yield {'type': 'warning', 'description': '修改成功请保存'}
249
  # handle sync to db
250
  elif action == 'sync':
251
  # patch all synced_to_db to true
 
257
  history_tabulator.patch({
258
  'change_saved': [(index, True) for index in indices]
259
  }, as_index=True)
260
+ yield {'type': 'success', 'description': '同步成功以更新'}
261
+
262
  @notify
263
  def delete_stock(row):
264
  '''delete a stock entry'''
265
  stock_column.remove(row)
266
+ yield {'type': 'success', 'description': '删除成功'}
267
 
268
  def create_new_stock_entry(ticker=None, shares=0, ave_price=0.0, disable_ticker=True):
269
  '''create a new new stock entry'''
 
284
  start=0,
285
  sizing_mode='stretch_width')
286
 
 
 
 
 
287
  row = pn.Row(
288
  delete_btn,
289
  ticker_selector,
290
  share_input,
 
291
  width_policy='max',
292
  )
293
  delete_btn.on_click(lambda _, row=row: delete_stock(row))
 
315
  stock_column.clear()
316
  stock_column.extend(stock_entries)
317
 
318
+ @notify
319
  def update_profile_tabulator(e):
320
  '''add all stocks entry to ui'''
321
+ # TODO: make this idempotent
322
+ new_entries = [dict(ticker=row[1].value,
323
+ shares=row[2].value,
324
+ date=datetime_picker.value) for row in stock_column]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
325
 
326
+ try:
327
+ new_profile = create_portfolio_stream_entry(new_entries, portfolio_tabulator.value)
328
+ # update history tabulator
329
+ _update_history_tabulator('append', new_profile)
330
+ _stream_to_portfolio_tabulator(new_profile)
331
+ yield {'type': 'success', 'description': f'已添加{len(new_entries)}条新股票,请保存'}
332
+ except Exception as e:
333
+ raise Exception(e)
334
 
335
  def add_new_stock(e):
336
  row = create_new_stock_entry()
337
  stock_column.append(row)
338
 
339
+ @notify
340
  def _stream_to_portfolio_tabulator(entry):
341
+ # not using stream because it will cause index mismatch
342
  if len(portfolio_tabulator.value) == 0:
343
  portfolio_tabulator.value = entry
344
+
345
  else:
346
  portfolio_tabulator.stream(entry, follow=True)
347
+ yield {'type': 'success', 'description': f'添加{len(entry)}条股票'}
348
 
349
  def handle_click_on_history_tabulator(e):
350
  '''handle click click on history tabulator'''
 
364
  new_portfolio = portfolio_tabulator.value
365
  # only update selected row to db
366
  selected_portfolio = new_portfolio[new_portfolio['sync_to_db']]
367
+ try:
368
+ pipeline.update_portfolio_profile_to_db(selected_portfolio)
369
+ except Exception as e:
370
+ raise Exception(f'同步到数据库失败,错误信息:{e}')
371
  # update history tabulator and portfolio tabulator
372
+
373
+ # mark changes as saved
374
+ indices = selected_portfolio[~selected_portfolio['change_saved']].index.to_list(
375
+ )
376
+ portfolio_tabulator.patch({
377
+ 'change_saved': [(index, True) for index in indices]
378
+ }, as_index=True)
379
+ _update_history_tabulator('sync')
380
+ yield {'type': 'success', 'description': '保存成功'}
381
+
 
382
  def handle_edit_portfolio_tabulator(e):
383
  date = portfolio_tabulator.value.iloc[e.row]['date']
384
  _update_history_tabulator(df=date, action='edit')
 
385
 
386
+ def hanlde_edit_history_tabulator(e):
387
+ # toggle sync on all entry on a date
388
+ if e.column == 'sync_to_db':
389
+ date = history_tabulator.value.iloc[e.row]['date']
390
+ # index of all entry on portfolio tabulator
391
+ indices = portfolio_tabulator.value[portfolio_tabulator.value.date.between(
392
+ date.replace(microsecond=0), date.replace(microsecond=999999))].index.to_list()
393
+ # patch all indices on sync_to_db to e.value
394
+ portfolio_tabulator.patch({
395
+ 'sync_to_db': [(index, e.value) for index in indices]
396
+ }, as_index=True)
397
+
398
+ @notify
399
+ def handle_force_recalculation(e):
400
+ try:
401
+ yield {'type': 'info', 'description': "开始重新计算可能会花费1分钟以上", 'duration': 0}
402
+ # fill missing benchmark profile
403
+ yield {'type': 'info', 'description': "正在获取benchmark数据", 'duration': 0}
404
+ pipeline.left_fill_benchmark_profile()
405
+ # fill missing stock price
406
+ yield {'type': 'info', 'description': "正在更新股票数据", 'duration': 0}
407
+ pipeline.left_fill_stocks_price()
408
+
409
+ # recalculate
410
+ yield {'type': 'info', 'description': "正在重新计算权重", 'duration': 0}
411
+ pipeline.batch_processing()
412
+
413
+ yield {'type': 'info', 'description': '完成✅', 'duration': 0}
414
+
415
+ except Exception as e:
416
+ raise Exception(f'重新计算失败,错误信息:{e}')
417
+
418
  # register event handler
419
  upload_to_db_btn.on_click(handle_sync_to_db)
420
  preview_btn.on_click(update_profile_tabulator)
 
422
  history_tabulator.on_click(
423
  handle_click_on_history_tabulator
424
  )
425
+ force_recalculate_btn.on_click(handle_force_recalculation)
426
  portfolio_tabulator.on_edit(handle_edit_portfolio_tabulator)
427
+ history_tabulator.on_edit(hanlde_edit_history_tabulator)
 
428
  # create handler component to add to panel so can be listened to
429
  upload_xlsx_handler = pn.bind(update_stock_column, file_input)
430
 
 
431
  # layout
432
 
433
  editor_widget = pn.Column(floating_windows, datetime_picker, upload_to_db_btn, new_stock_btn,
434
+ preview_btn, force_recalculate_btn, file_input, pn.widgets.TooltipIcon(
435
  value="用于更新修改持仓信息,默认股票为最近持仓,默认时间为目前北京时间,点击增加新股票按钮,输入股票代码和持仓选择日期(北京时间),点击预览,确认无误后点击保存到数据库。或者直接拖拽excel文件到下方上传按钮"),
436
  stock_column, width=MIN_COMPONENT_WIDTH, height_policy='max')
437
  # tooltip
438
  toolTip2 = pn.widgets.TooltipIcon(
439
  value="持仓总结,每一行的已同步到数据库代表所做更改是否已同步到数据库,点击保存到数据库将上传所有更改。点击右侧📋按钮查看详细持仓变化报告")
440
 
441
+ return pn.Column(
442
+
443
+ pn.Row(
444
+ pn.layout.HSpacer(),
445
+ editor_widget,
446
+ pn.Spacer(width=10),
447
+ history_tabulator,
448
+ pn.Spacer(width=10),
449
+ portfolio_tabulator,
450
+ pn.Spacer(width=10),
451
+ upload_xlsx_handler,
452
+ pn.layout.HSpacer(),
453
+ height=1500,
454
+ # width_policy='max', height_policy='max')
455
+ # sizing_mode='stretch_both',
456
+ ))
457
 
458
 
459
  # app
460
+ template = pn.template.FastListTemplate(
461
+ title='portfolio编辑', sidebar_width=200, collapsed_sidebar=True)
462
  template.sidebar.append(SideNavBar())
463
  template.main.append(app())
464
  template.servable()
processing.py CHANGED
@@ -423,7 +423,13 @@ def create_analytic_df(price_df, profile_df):
423
  filling information from profile df to stock price df
424
 
425
  '''
 
 
 
 
 
426
  uni_profile_df = _uniformize_time_series(profile_df)
 
427
  # TODO handle rename column here
428
  df = price_df.merge(uni_profile_df, on=['ticker', 'time'], how='outer')
429
  df.sort_values(by=['ticker', 'time'], inplace=True)
 
423
  filling information from profile df to stock price df
424
 
425
  '''
426
+ # daily stock price use begin of the date, need to convert profile_df day to begin of the date
427
+ profile_df['time'] = profile_df['time'].map(
428
+ lambda x: datetime(x.year, x.month, x.day))
429
+
430
+ # make every time entry the same dimension
431
  uni_profile_df = _uniformize_time_series(profile_df)
432
+
433
  # TODO handle rename column here
434
  df = price_df.merge(uni_profile_df, on=['ticker', 'time'], how='outer')
435
  df.sort_values(by=['ticker', 'time'], inplace=True)