huggingface112 commited on
Commit
2b059b0
1 Parent(s): 6eb9bc1

imp left_fill, right_fill and update log

Browse files
.vscode/settings.json ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "[python]": {
3
+ "editor.defaultFormatter": "ms-python.autopep8"
4
+
5
+ },
6
+ "python.linting.flake8Args": [
7
+ "--ignore=E501"
8
+ ]
9
+
10
+
11
+ }
Dockerfile CHANGED
@@ -13,8 +13,8 @@ COPY . /code
13
  VOLUME /code/instance
14
 
15
 
16
- # CMD ["panel", "serve", "/code/portfolioEditingPage.py", "--address", "0.0.0.0", "--port", "7860", "--allow-websocket-origin", "lamonkey-portfolio-management.hf.space"]
17
- CMD ["panel", "serve", "/code/portfolioEditingPage.py", "--address", "0.0.0.0", "--port", "7860", "--allow-websocket-origin", "*"]
18
  RUN mkdir /.cache
19
  RUN chmod 777 /.cache
20
  RUN mkdir .chroma
 
13
  VOLUME /code/instance
14
 
15
 
16
+ CMD ["panel", "serve", "/code/portfolioEditingPage.py", "--address", "0.0.0.0", "--port", "7860", "--allow-websocket-origin", "lamonkey-portfolio-management.hf.space"]
17
+ # CMD ["panel", "serve", "/code/portfolioEditingPage.py", "--address", "0.0.0.0", "--port", "7860", "--allow-websocket-origin", "*"]
18
  RUN mkdir /.cache
19
  RUN chmod 777 /.cache
20
  RUN mkdir .chroma
api.py CHANGED
@@ -299,9 +299,9 @@ def fetch_benchmark_profile(start_date: datetime, end_date: datetime, delta_time
299
  update_df = pd.concat(results)
300
  update_df['ticker'] = update_df.index
301
  update_df['date'] = pd.to_datetime(update_df['date'])
302
- update_df.rename({'date': 'time'}, inplace=True, axis=1)
303
  # remove duplicate row
304
  update_df = update_df.drop_duplicates(
305
- subset=['ticker', 'time'], keep='last')
306
  update_df.reset_index(drop=True, inplace=True)
307
  return update_df
 
299
  update_df = pd.concat(results)
300
  update_df['ticker'] = update_df.index
301
  update_df['date'] = pd.to_datetime(update_df['date'])
302
+ # update_df.rename({'date': 'time'}, inplace=True, axis=1)
303
  # remove duplicate row
304
  update_df = update_df.drop_duplicates(
305
+ subset=['ticker', 'date'], keep='last')
306
  update_df.reset_index(drop=True, inplace=True)
307
  return update_df
db_operation.py CHANGED
@@ -34,12 +34,27 @@ def _validate_schema(df, schema):
34
  # return False
35
  return True
36
 
37
- # def append_benchmark_profile(df):
38
- # '''append new entry to benchmark profile table'''
39
- # with create_engine(db_url).connect() as conn:
40
- # df.to_sql(ts.BENCHMARK_PROFILE_TABLE, con=conn, if_exists='append', index=False)
41
-
42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  def get_most_recent_profile(type):
44
  table_name = 'benchmark_profile' if type == 'benchmark' else 'portfolio_profile'
45
  query = f"SELECT * FROM {table_name} WHERE date = (SELECT MAX(date) FROM {table_name})"
@@ -110,9 +125,10 @@ def get_all_stocks():
110
  all_stocks = pd.read_sql(ts.STOCKS_DETAILS_TABLE, con=conn)
111
  return all_stocks
112
 
113
- def _get_all_row(table_name):
114
  with create_engine(db_url).connect() as conn:
115
  df = pd.read_sql(table_name, con=conn)
 
116
  return df
117
 
118
  def get_all_stocks_price():
 
34
  # return False
35
  return True
36
 
 
 
 
 
 
37
 
38
+ def get_all_benchmark_profile():
39
+ '''return all entries in the benchmark profile table'''
40
+ return _get_all_row(ts.BENCHMARK_TABLE)
41
+
42
+ def append_to_benchmark_profile(df):
43
+ '''append new entry to benchmark profile table'''
44
+ # handle possible duplication caused by right fill
45
+ most_recent_dates = get_most_recent_benchmark_profile().date
46
+ if len(most_recent_dates) > 0:
47
+ date = most_recent_dates[0]
48
+ # drop df.date == date
49
+ df = df[df.date != date]
50
+ if len(df) != 0:
51
+ _append_df_to_db(df, ts.BENCHMARK_TABLE, ts.BENCHMARK_TABLE_SCHEMA)
52
+ else:
53
+ _append_df_to_db(df, ts.BENCHMARK_TABLE, ts.BENCHMARK_TABLE_SCHEMA)
54
+
55
+ def get_most_recent_benchmark_profile():
56
+ '''return the most recent entry in the benchmark profile table'''
57
+ return _get_most_recent(ts.BENCHMARK_TABLE)
58
  def get_most_recent_profile(type):
59
  table_name = 'benchmark_profile' if type == 'benchmark' else 'portfolio_profile'
60
  query = f"SELECT * FROM {table_name} WHERE date = (SELECT MAX(date) FROM {table_name})"
 
125
  all_stocks = pd.read_sql(ts.STOCKS_DETAILS_TABLE, con=conn)
126
  return all_stocks
127
 
128
+ def _get_all_row(table_name, ts_column='date'):
129
  with create_engine(db_url).connect() as conn:
130
  df = pd.read_sql(table_name, con=conn)
131
+ df[ts_column] = pd.to_datetime(df[ts_column])
132
  return df
133
 
134
  def get_all_stocks_price():
instance/local.db CHANGED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:554c882ab3dd2eb175ca2076ebf2458988340f165620dd03f487d3d23e6f13ea
3
- size 6856704
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:b0651e0e8a48a69d61d008bf5ae3262ca38a8968b020e700604841d55f8784a3
3
+ size 8728576
instance/log.json ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ {
2
+ "daily_update": "2023-08-29 03:02:51"
3
+ }
newBackgroundTask.py DELETED
@@ -1,12 +0,0 @@
1
- import sys
2
- sys.path.append('/Users/lamonkey/Desktop/risk-monitor-dashboard')
3
- from pipeline import update
4
- import panel as pn
5
- from datetime import timedelta
6
-
7
- # pn.state.schedule_task(
8
- # 'task', run, period=timedelta(seconds=3)
9
- # )
10
-
11
- # update stock price and benchmark profile
12
- update()
 
 
 
 
 
 
 
 
 
 
 
 
 
pipeline.py CHANGED
@@ -1,24 +1,20 @@
1
- import sys
2
- # sys.path.append('/Users/lamonkey/Desktop/risk-monitor-dashboard')
3
- import panel as pn
4
  import datetime as dt
5
- import asyncio
6
- import random
7
  from sqlalchemy import create_engine, text
8
  import pandas as pd
9
  from streamz import Stream
10
- from datetime import timedelta
11
- import settings
12
- import os
13
  import utils
14
  import api
15
- import numpy as np
16
  import pytz
17
  import table_schema as ts
18
  import db_operation as db
 
 
19
  # fetch new stock price
20
  stock_price_stream = Stream()
21
 
 
 
 
22
  # save stock price to db
23
  # stock_price_stream.sink(save_stock_price)
24
  # from dask.distributed import Client
@@ -61,7 +57,7 @@ def need_to_update(table_name: str, freq: dt.datetime):
61
  return None
62
 
63
 
64
- def need_to_fetch_new_stock_price():
65
  '''
66
  check if need to pull new stock price from jq
67
 
@@ -72,6 +68,7 @@ def need_to_fetch_new_stock_price():
72
  None if no need to fetch new stock price
73
 
74
  '''
 
75
  # get min date from portfolio_profile
76
  with create_engine(db_url).connect() as conn:
77
  table_name = 'portfolio_profile'
@@ -79,7 +76,7 @@ def need_to_fetch_new_stock_price():
79
  df = pd.read_sql(query, con=conn)
80
  df.date = pd.to_datetime(df.date)
81
  min_date = df.date[0]
82
-
83
  # compare to min date from stocks_price
84
  with create_engine(db_url).connect() as conn:
85
  table_name = 'stocks_price'
@@ -105,8 +102,8 @@ def get_most_recent_profile(type):
105
 
106
 
107
  def update_stocks_details_to_db():
108
- '''create table contain all stocks detail in db
109
- will override existing table if exists
110
  Table Schema
111
  ------------
112
  'display_name', 'name', 'start_date', 'end_date', 'type', 'ticker',
@@ -122,31 +119,6 @@ def update_stocks_details_to_db():
122
  if_exists='replace', index=False)
123
 
124
 
125
- def fetch_new_stocks_price():
126
- '''
127
- get a df contain updated stock prices for both benchmark and portfolio,
128
- also indicate if the stock is in portfolio and benchmark
129
- '''
130
- # most recent profiles
131
- p_portfolio = get_most_recent_profile('portfolio')
132
- p_benchmark = get_most_recent_profile('benchmark')
133
- # combine ticker
134
- unique_tickers = pd.concat([p_portfolio, p_benchmark])[
135
- 'ticker'].unique().tolist()
136
- # fetch list of stock
137
- # TODO: hard code delta time to 1 day
138
- start_date = p_portfolio.date[0] + dt.timedelta(days=1)
139
- end_date = utils.time_in_beijing()
140
- freq = 'daily'
141
- stock_df = api.fetch_stocks_price(
142
- unique_tickers, start_date, end_date, freq)
143
- stock_df['in_portfolio'] = stock_df['ticker'].isin(
144
- p_portfolio['ticker'].unique().tolist())
145
- stock_df['in_benchmark'] = stock_df['ticker'].isin(
146
- p_benchmark['ticker'].unique().tolist())
147
- return stock_df
148
-
149
-
150
  def need_to_update_stocks_price(delta_time):
151
  # convert p_portfolio.date[0] to timezone-aware datetime object
152
  tz = pytz.timezone('Asia/Shanghai')
@@ -237,9 +209,10 @@ def update_portfolio_profile_to_db(portfolio_df):
237
  return False
238
  # TODO trigger recomputation of analysis
239
 
240
- def update_daily_stocks_price():
 
241
  '''
242
- update all stocks price until today. used for fetching new stock price
243
 
244
  if no portfolio, terminate without warning
245
  default start date is the most recent date in portfolio
@@ -260,36 +233,18 @@ def update_daily_stocks_price():
260
  new_stocks_price = fetch_all_stocks_price_between(start, end)
261
  db.append_to_stocks_price_table(new_stocks_price)
262
 
263
- def update_stock_price():
264
- '''get daily stocks price until today'''
265
- # most recent profiles
266
- p_portfolio = get_most_recent_profile('portfolio')
267
- p_benchmark = get_most_recent_profile('benchmark')
268
- # combine ticker
269
- unique_tickers = pd.concat([p_portfolio, p_benchmark])[
270
- 'ticker'].unique().tolist()
271
- # fetch list of stock
272
- # TODO: hard code delta time to 1 day
273
- start_date = p_portfolio.date[0] + dt.timedelta(days=1)
274
- end_date = utils.time_in_beijing()
275
- freq = 'daily'
276
- stock_df = api.fetch_stocks_price(
277
- unique_tickers, start_date, end_date, freq)
278
- stock_df['in_portfolio'] = stock_df['ticker'].isin(
279
- p_portfolio['ticker'].unique().tolist())
280
- stock_df['in_benchmark'] = stock_df['ticker'].isin(
281
- p_benchmark['ticker'].unique().tolist())
282
- return stock_df
283
-
284
 
285
  def fetch_all_stocks_price_between(start, end):
286
  '''
287
  patch stock price db with all daily stock price within window
288
  inclusive on both start and end date
 
289
  Parameters
290
  ----------
291
- window: tuple
292
- (start, end) date of the window
 
 
293
 
294
  Returns
295
  -------
@@ -302,12 +257,144 @@ def fetch_all_stocks_price_between(start, end):
302
  tickers = selected_stocks.ticker.to_list()
303
  # fetch stock price and append to db
304
  stock_price = api.fetch_stocks_price(
305
- security=tickers, start_date=start, end_date=end, frequency='daily')
 
 
 
 
306
  # drop where closing price is null
307
  stock_price.dropna(subset=['close'], inplace=True)
308
  return stock_price
309
 
310
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
311
  def update():
312
  '''
313
  run only once, update stock price and benchmark profile
@@ -316,7 +403,7 @@ def update():
316
  # collect daily stock price until today in beijing time
317
  if need_to_update_stocks_price(dt.timedelta(days=1)):
318
  print("Updating stock_price table")
319
- stock_df = update_stock_price()
320
  stock_df = add_details_to_stock_df(stock_df)
321
  save_stock_price_to_db(stock_df)
322
  stock_price_stream.emit(stock_df)
 
 
 
 
1
  import datetime as dt
 
 
2
  from sqlalchemy import create_engine, text
3
  import pandas as pd
4
  from streamz import Stream
 
 
 
5
  import utils
6
  import api
 
7
  import pytz
8
  import table_schema as ts
9
  import db_operation as db
10
+ from log import Log
11
+ # import settings
12
  # fetch new stock price
13
  stock_price_stream = Stream()
14
 
15
+
16
+ # log
17
+ log = Log('instance/log.json')
18
  # save stock price to db
19
  # stock_price_stream.sink(save_stock_price)
20
  # from dask.distributed import Client
 
57
  return None
58
 
59
 
60
+ def need_to_forward_fill_stock_price():
61
  '''
62
  check if need to pull new stock price from jq
63
 
 
68
  None if no need to fetch new stock price
69
 
70
  '''
71
+ # TODO refactor to db
72
  # get min date from portfolio_profile
73
  with create_engine(db_url).connect() as conn:
74
  table_name = 'portfolio_profile'
 
76
  df = pd.read_sql(query, con=conn)
77
  df.date = pd.to_datetime(df.date)
78
  min_date = df.date[0]
79
+ # TODO refactor to db
80
  # compare to min date from stocks_price
81
  with create_engine(db_url).connect() as conn:
82
  table_name = 'stocks_price'
 
102
 
103
 
104
  def update_stocks_details_to_db():
105
+ '''override stocks_details table with new stocks detail
106
+
107
  Table Schema
108
  ------------
109
  'display_name', 'name', 'start_date', 'end_date', 'type', 'ticker',
 
119
  if_exists='replace', index=False)
120
 
121
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  def need_to_update_stocks_price(delta_time):
123
  # convert p_portfolio.date[0] to timezone-aware datetime object
124
  tz = pytz.timezone('Asia/Shanghai')
 
209
  return False
210
  # TODO trigger recomputation of analysis
211
 
212
+
213
+ def right_fill_stock_price():
214
  '''
215
+ update all stocks price until today.
216
 
217
  if no portfolio, terminate without warning
218
  default start date is the most recent date in portfolio
 
233
  new_stocks_price = fetch_all_stocks_price_between(start, end)
234
  db.append_to_stocks_price_table(new_stocks_price)
235
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
 
237
  def fetch_all_stocks_price_between(start, end):
238
  '''
239
  patch stock price db with all daily stock price within window
240
  inclusive on both start and end date
241
+
242
  Parameters
243
  ----------
244
+ start : datetime
245
+ start date inclusive
246
+ end: datetime
247
+ end date inclusive
248
 
249
  Returns
250
  -------
 
257
  tickers = selected_stocks.ticker.to_list()
258
  # fetch stock price and append to db
259
  stock_price = api.fetch_stocks_price(
260
+ security=tickers,
261
+ start_date=start,
262
+ end_date=end,
263
+ frequency='daily',
264
+ skip_paused=True,)
265
  # drop where closing price is null
266
  stock_price.dropna(subset=['close'], inplace=True)
267
  return stock_price
268
 
269
 
270
+ def right_fill_bechmark_profile():
271
+ '''
272
+ right fill the benchmark profile table
273
+
274
+ fill any missing entries between the most recent date in benchmark profile and today
275
+ if no benchmark profile, fill from most recent date in portfolio profile to today
276
+ if no portfolio profile, terminate without warning
277
+ '''
278
+ # get most recent date in benchmark profile
279
+ b_ends = db.get_most_recent_benchmark_profile().date
280
+ # get todays date
281
+ today = utils.time_in_beijing()
282
+ # most recent portfolio dates
283
+ p_ends = db.get_most_recent_portfolio_profile().date
284
+ # if portfolio is empty, terminate
285
+ if len(p_ends) == 0:
286
+ return
287
+ # if no benchmark profile, start is the most recent date in benchmark profile, end is today
288
+ elif len(b_ends) == 0:
289
+ start = p_ends[0]
290
+ end = today
291
+ # start is most recent benchmark, end is today
292
+ else:
293
+ start = b_ends[0]
294
+ end = today
295
+
296
+ # fetch and update
297
+ new_entry = api.fetch_benchmark_profile(start, end)
298
+ detailed_new_entry = utils.add_details_to_stock_df(new_entry)
299
+ db.append_to_benchmark_profile(detailed_new_entry)
300
+
301
+
302
+ def left_fill_benchmark_profile():
303
+ '''
304
+ left fill the benchmark profile table,
305
+
306
+ fill any missing entries between the earliest date in portfolio profile and the earliest date in benchmark profile
307
+ if no portfolio profile, terminate without warning
308
+ if no benchmark profile, the span would be from the earliest date in portfolio profile to the most recent date in portfolio profile
309
+ '''
310
+ # get starttime of benchmark profile
311
+ b_starts = db.get_oldest_benchmark_profile().date
312
+ # get starttime of portfolio profile
313
+ p_starts = db.get_oldest_portfolio_profile().date
314
+ # if no portfolio profile, terminate
315
+ if len(p_starts) == 0:
316
+ return
317
+ # use start and end date of portfolio profile if no benchmark profile entry
318
+ elif len(b_starts) == 0:
319
+ p_start = p_starts[0]
320
+ b_start = db.get_most_recent_portfolio_profile().date[0]
321
+ else:
322
+ b_start = b_starts[0]
323
+ p_start = p_starts[0]
324
+ if p_start < b_start:
325
+ # back fill benchmark profile
326
+ new_entry = api.fetch_benchmark_profile(p_start, b_start)
327
+ detailed_new_entry = utils.add_details_to_stock_df(new_entry)
328
+ # handle duplicate display_name
329
+ detailed_new_entry.drop(columns=['display_name_x'], inplace=True)
330
+ detailed_new_entry.rename(
331
+ columns={'display_name_y': 'display_name'}, inplace=True)
332
+
333
+ # append to db
334
+ db.append_to_benchmark_profile(detailed_new_entry)
335
+ # return detailed_new_entry
336
+ # else do nothing
337
+
338
+
339
+ def left_fill_stocks_price():
340
+ '''
341
+ left fill stock price
342
+ fill missing entries between the oldest date in portfolio profile and the oldest date in stock price table
343
+
344
+ if no portfolio profile, terminate without warning
345
+ if no stock price table, the span would be from the oldest date in portfolio profile to the most recent date in portfolio profile
346
+
347
+
348
+ '''
349
+ # get oldest time in portfolio profile
350
+ p_start = db.get_oldest_portfolio_profile().date
351
+ # get oldest time in stock price table
352
+ stock_start = db.get_oldest_stocks_price().time
353
+ # if no portfolio profile, terminate
354
+ if len(p_start) == 0:
355
+ return
356
+ # no stock price, span the entire portfolio profile
357
+ elif len(stock_start) == 0:
358
+ start = p_start[0]
359
+ end = db.get_most_recent_portfolio_profile().date[0]
360
+ else:
361
+ start = p_start[0]
362
+ end = stock_start[0]
363
+
364
+ if start < end:
365
+ # fetch and update
366
+ new_entry = fetch_all_stocks_price_between(start, end)
367
+ db.append_to_stocks_price_table(new_entry)
368
+
369
+
370
+ def updaet_benchmark_to_db():
371
+ '''
372
+ update daily benchmark weight
373
+ '''
374
+ pass
375
+
376
+
377
+ async def daily_update():
378
+ last_update = log.get_time('daily_update')
379
+ if last_update is None or utils.time_in_beijing() - last_update >= dt.timedelta(days=1):
380
+ print("running daily update")
381
+ # check if need to update
382
+ # update stock price
383
+ left_fill_stocks_price()
384
+ right_fill_stock_price()
385
+ print("updated stocks price")
386
+ # update benchmark index
387
+ left_fill_benchmark_profile()
388
+ right_fill_bechmark_profile()
389
+ print("updated benchmark profile")
390
+ # update all stock detail
391
+ update_stocks_details_to_db()
392
+ print("updated stocks details")
393
+ log.update_log('daily_update')
394
+ else:
395
+ print("no update needed")
396
+
397
+
398
  def update():
399
  '''
400
  run only once, update stock price and benchmark profile
 
403
  # collect daily stock price until today in beijing time
404
  if need_to_update_stocks_price(dt.timedelta(days=1)):
405
  print("Updating stock_price table")
406
+ # stock_df = update_stock_price()
407
  stock_df = add_details_to_stock_df(stock_df)
408
  save_stock_price_to_db(stock_df)
409
  stock_price_stream.emit(stock_df)
settings.py CHANGED
@@ -1,6 +1,9 @@
1
  from datetime import timedelta
2
  stream_frequency = timedelta(seconds=60)
3
 
 
 
 
4
  TABLE_NAME_AND_FREQ = [
5
  ('benchmark_profile', timedelta(days=1)),
6
  ('portfolio_profile', timedelta(days=1))
 
1
  from datetime import timedelta
2
  stream_frequency = timedelta(seconds=60)
3
 
4
+
5
+ FREQUENCY = timedelta(hours=24)
6
+
7
  TABLE_NAME_AND_FREQ = [
8
  ('benchmark_profile', timedelta(days=1)),
9
  ('portfolio_profile', timedelta(days=1))
testing_pipeline.ipynb CHANGED
The diff for this file is too large to render. See raw diff