import sys # sys.path.append('/Users/lamonkey/Desktop/risk-monitor-dashboard') import panel as pn import datetime as dt import asyncio import random from sqlalchemy import create_engine, text import pandas as pd from streamz import Stream from datetime import timedelta import settings import os import utils import api import numpy as np import pytz import table_schema as ts import db_operation as db # fetch new stock price stock_price_stream = Stream() # save stock price to db # stock_price_stream.sink(save_stock_price) # from dask.distributed import Client # client = Client() # import nest_asyncio # nest_asyncio.apply() # import settings # run using --setup db_url = 'sqlite:///instance/local.db' def create_portfolio_profile_df(stocks: list[dict]): profile_df = pd.DataFrame(stocks) profile_df = add_details_to_stock_df(profile_df) # check if there is duplicate ticker if profile_df.ticker.duplicated().any(): raise Exception( 'VALIDATION ERROR: cannot have duplicate ticker with the same date') return profile_df def need_to_update(table_name: str, freq: dt.datetime): '''check table with table_name need to update Return ------ None if no need to update (start_date, end_date, freq) if need to update ''' with create_engine(db_url).connect() as conn: max_date = conn.execute( text(f"SELECT MAX(date) FROM {table_name}")).fetchone()[0] max_date = utils.convert_string_to_datetime(max_date) current_time = utils.time_in_beijing() if current_time - max_date > freq: return (max_date + freq, current_time, freq) else: return None def need_to_fetch_new_stock_price(): ''' check if need to pull new stock price from jq RETURN ------ (min_date, max_date) : if update is needed the start and end date need to fetch new stock price None if no need to fetch new stock price ''' # get min date from portfolio_profile with create_engine(db_url).connect() as conn: table_name = 'portfolio_profile' query = f"SELECT DISTINCT date FROM {table_name} ORDER BY date ASC LIMIT 1" df = pd.read_sql(query, con=conn) df.date = pd.to_datetime(df.date) min_date = df.date[0] # compare to min date from stocks_price with create_engine(db_url).connect() as conn: table_name = 'stocks_price' query = f"SELECT DISTINCT time FROM {table_name} ORDER BY time ASC LIMIT 1" df = pd.read_sql(query, con=conn) df.time = pd.to_datetime(df.time) # return if min_date <= df.time[0]: return (min_date, df.time[0] - dt.timedelta(days=1)) else: return None def get_most_recent_profile(type): table_name = 'benchmark_profile' if type == 'benchmark' else 'portfolio_profile' query = f"SELECT * FROM {table_name} WHERE date = (SELECT MAX(date) FROM {table_name})" with create_engine(db_url).connect() as conn: df = pd.read_sql(query, con=conn) # convert date to datetime object df['date'] = pd.to_datetime(df['date']) return df def update_stocks_details_to_db(): '''create table contain all stocks detail in db will override existing table if exists Table Schema ------------ 'display_name', 'name', 'start_date', 'end_date', 'type', 'ticker', 'sector', 'aggregate_sector' ''' df = api.get_all_stocks_detail() # validation if not _validate_schema(df, ts.STOCKS_DETAILS_TABLE_SCHEMA): raise ValueError( 'df has different schema than STOCKS_DETAILS_TABLE_SCHEMA') with create_engine(db_url).connect() as conn: df.to_sql(ts.STOCKS_DETAILS_TABLE, con=conn, if_exists='replace', index=False) def fetch_new_stocks_price(): ''' get a df contain updated stock prices for both benchmark and portfolio, also indicate if the stock is in portfolio and benchmark ''' # most recent profiles p_portfolio = get_most_recent_profile('portfolio') p_benchmark = get_most_recent_profile('benchmark') # combine ticker unique_tickers = pd.concat([p_portfolio, p_benchmark])[ 'ticker'].unique().tolist() # fetch list of stock # TODO: hard code delta time to 1 day start_date = p_portfolio.date[0] + dt.timedelta(days=1) end_date = utils.time_in_beijing() freq = 'daily' stock_df = api.fetch_stocks_price( unique_tickers, start_date, end_date, freq) stock_df['in_portfolio'] = stock_df['ticker'].isin( p_portfolio['ticker'].unique().tolist()) stock_df['in_benchmark'] = stock_df['ticker'].isin( p_benchmark['ticker'].unique().tolist()) return stock_df def need_to_update_stocks_price(delta_time): # convert p_portfolio.date[0] to timezone-aware datetime object tz = pytz.timezone('Asia/Shanghai') # get stock price df with create_engine(db_url).connect() as conn: # check if a table exist if not conn.dialect.has_table(conn, 'stocks_price'): return True else: query = "SELECT * FROM stocks_price WHERE time = (SELECT MAX(time) FROM stocks_price)" most_recent_price = pd.read_sql(query, con=conn) most_recent_price.time = pd.to_datetime(most_recent_price.time) date_time = tz.localize(most_recent_price.time[0].to_pydatetime()) if utils.time_in_beijing() - date_time > delta_time: return True else: return False def processing(): ''' run the whole processing pipeline here ''' pass def add_details_to_stock_df(stock_df): with create_engine(db_url).connect() as conn: detail_df = pd.read_sql(ts.STOCKS_DETAILS_TABLE, con=conn) merged_df = pd.merge(stock_df, detail_df[ ['sector', 'name', 'aggregate_sector', 'display_name', 'ticker'] ], on='ticker', how='left') merged_df['aggregate_sector'].fillna('其他', inplace=True) return merged_df def _validate_schema(df, schema): ''' validate df has the same columns and data types as schema Parameters ---------- df: pd.DataFrame schema: dict {column_name: data_type} Returns ------- bool True if df has the same columns and data types as schema False otherwise ''' # check if the DataFrame has the same columns as the schema if set(df.columns) != set(schema.keys()): return False # check if the data types of the columns match the schema # TODO: ignoring type check for now # for col, dtype in schema.items(): # if df[col].dtype != dtype: # return False return True def save_stock_price_to_db(df: pd.DataFrame): print('saving to stock to db') with create_engine(db_url).connect() as conn: df.to_sql('stocks_price', con=conn, if_exists='append', index=False) def update_portfolio_profile_to_db(portfolio_df): '''overwrite the portfolio profile table in db''' if (_validate_schema(portfolio_df, ts.PORTFOLIO_TABLE_SCHEMA)): raise ValueError( 'portfoliijuo_df has different schema than PORTFOLIO_DB_SCHEMA') with create_engine(db_url).connect() as conn: print("updating profile to db") try: portfolio_df[ts.PORTFOLIO_TABLE_SCHEMA.keys()].to_sql( ts.PORTFOLIO_TABLE, con=conn, if_exists='append', index=False) return True except: return False # TODO trigger recomputation of analysis def update_daily_stocks_price(): ''' update all stocks price until today. used for fetching new stock price if no portfolio, terminate without warning default start date is the most recent date in portfolio ''' most_recent_portfolio = db.get_most_recent_portfolio_profile() most_recent_stocks_price = db.get_most_recent_stocks_price() # fetch all stocks price until today stocks_dates = most_recent_stocks_price.time portfolio_dates = most_recent_portfolio.date if len(portfolio_dates) == 0: return start = stocks_dates[0] if len(stocks_dates) > 0 else portfolio_dates[0] end = utils.time_in_beijing() # frequency is set to daily if end - start > dt.timedelta(days=1): new_stocks_price = fetch_all_stocks_price_between(start, end) db.append_to_stocks_price_table(new_stocks_price) def update_stock_price(): '''get daily stocks price until today''' # most recent profiles p_portfolio = get_most_recent_profile('portfolio') p_benchmark = get_most_recent_profile('benchmark') # combine ticker unique_tickers = pd.concat([p_portfolio, p_benchmark])[ 'ticker'].unique().tolist() # fetch list of stock # TODO: hard code delta time to 1 day start_date = p_portfolio.date[0] + dt.timedelta(days=1) end_date = utils.time_in_beijing() freq = 'daily' stock_df = api.fetch_stocks_price( unique_tickers, start_date, end_date, freq) stock_df['in_portfolio'] = stock_df['ticker'].isin( p_portfolio['ticker'].unique().tolist()) stock_df['in_benchmark'] = stock_df['ticker'].isin( p_benchmark['ticker'].unique().tolist()) return stock_df def fetch_all_stocks_price_between(start, end): ''' patch stock price db with all daily stock price within window inclusive on both start and end date Parameters ---------- window: tuple (start, end) date of the window Returns ------- None ''' # all trading stocks available between start day and end date all_stocks = db.get_all_stocks() selected_stocks = all_stocks[(all_stocks.start_date <= end) & ( all_stocks.end_date >= start)] tickers = selected_stocks.ticker.to_list() # fetch stock price and append to db stock_price = api.fetch_stocks_price( security=tickers, start_date=start, end_date=end, frequency='daily') # drop where closing price is null stock_price.dropna(subset=['close'], inplace=True) return stock_price def update(): ''' run only once, update stock price and benchmark profile ''' print("Checking stock_price table") # collect daily stock price until today in beijing time if need_to_update_stocks_price(dt.timedelta(days=1)): print("Updating stock_price table") stock_df = update_stock_price() stock_df = add_details_to_stock_df(stock_df) save_stock_price_to_db(stock_df) stock_price_stream.emit(stock_df) async def run(): ''' start the pipeline here to check update and fetch new data ''' print("background_task running!") # TODO: update benchmark_profile # if (need_to_update_stocks_price()): if True: print("running update") # TODO testing code get stock price df with create_engine(db_url).connect() as conn: stock_df = pd.read_sql('stocks_price', con=conn) print('sending data!') # print(stock_df) stock_price_stream.emit(stock_df) # # latest stock price # stock_df = update_stocks_price() # # add display name and sector to stock_df # stock_df = add_details_to_stock_df(stock_df) # save_stock_price_to_db(stock_df) # stock_price_stream.emit(stock_df) # update sotck_price # send fetched data # run processing # send fetched data