Spaces:
Runtime error
Runtime error
import sys | |
# sys.path.append('/Users/lamonkey/Desktop/risk-monitor-dashboard') | |
import panel as pn | |
import datetime as dt | |
import asyncio | |
import random | |
from sqlalchemy import create_engine, text | |
import pandas as pd | |
from streamz import Stream | |
from datetime import timedelta | |
import settings | |
import os | |
import utils | |
import api | |
import numpy as np | |
import pytz | |
import table_schema as ts | |
# fetch new stock price | |
stock_price_stream = Stream() | |
# save stock price to db | |
# stock_price_stream.sink(save_stock_price) | |
# from dask.distributed import Client | |
# client = Client() | |
# import nest_asyncio | |
# nest_asyncio.apply() | |
# import settings | |
# run using --setup | |
db_url = 'sqlite:///instance/local.db' | |
def create_portfolio_profile_df(stocks: list[dict]): | |
profile_df = pd.DataFrame(stocks) | |
profile_df = add_details_to_stock_df(profile_df) | |
# check if there is duplicate ticker | |
if profile_df.ticker.duplicated().any(): | |
raise Exception( | |
'VALIDATION ERROR: cannot have duplicate ticker with the same date') | |
return profile_df | |
def need_to_update(table_name: str, freq: dt.datetime): | |
'''check table with table_name need to update | |
Return | |
------ | |
None if no need to update | |
(start_date, end_date, freq) if need to update | |
''' | |
with create_engine(db_url).connect() as conn: | |
max_date = conn.execute( | |
text(f"SELECT MAX(date) FROM {table_name}")).fetchone()[0] | |
max_date = utils.convert_string_to_datetime(max_date) | |
current_time = utils.time_in_beijing() | |
if current_time - max_date > freq: | |
return (max_date + freq, current_time, freq) | |
else: | |
return None | |
def need_to_fetch_new_stock_price(): | |
''' | |
check if need to pull new stock price from jq | |
RETURN | |
------ | |
(min_date, max_date) : if update is needed | |
the start and end date need to fetch new stock price | |
None if no need to fetch new stock price | |
''' | |
# get min date from portfolio_profile | |
with create_engine(db_url).connect() as conn: | |
table_name = 'portfolio_profile' | |
query = f"SELECT DISTINCT date FROM {table_name} ORDER BY date ASC LIMIT 1" | |
df = pd.read_sql(query, con=conn) | |
df.date = pd.to_datetime(df.date) | |
min_date = df.date[0] | |
# compare to min date from stocks_price | |
with create_engine(db_url).connect() as conn: | |
table_name = 'stocks_price' | |
query = f"SELECT DISTINCT time FROM {table_name} ORDER BY time ASC LIMIT 1" | |
df = pd.read_sql(query, con=conn) | |
df.time = pd.to_datetime(df.time) | |
# return | |
if min_date <= df.time[0]: | |
return (min_date, df.time[0] - dt.timedelta(days=1)) | |
else: | |
return None | |
def get_most_recent_profile(type): | |
table_name = 'benchmark_profile' if type == 'benchmark' else 'portfolio_profile' | |
query = f"SELECT * FROM {table_name} WHERE date = (SELECT MAX(date) FROM {table_name})" | |
with create_engine(db_url).connect() as conn: | |
df = pd.read_sql(query, con=conn) | |
# convert date to datetime object | |
df['date'] = pd.to_datetime(df['date']) | |
return df | |
def update_stocks_details_to_db(): | |
'''create table contain all stocks detail in db | |
will override existing table if exists | |
Table Schema | |
------------ | |
'display_name', 'name', 'start_date', 'end_date', 'type', 'ticker', | |
'sector', 'aggregate_sector' | |
''' | |
df = api.get_all_stocks_detail() | |
# validation | |
if not _validate_schema(df, ts.STOCKS_DETAILS_TABLE_SCHEMA): | |
raise ValueError( | |
'df has different schema than STOCKS_DETAILS_TABLE_SCHEMA') | |
with create_engine(db_url).connect() as conn: | |
df.to_sql(ts.STOCKS_DETAILS_TABLE, con=conn, | |
if_exists='replace', index=False) | |
def fetch_new_stocks_price(): | |
''' | |
get a df contain updated stock prices for both benchmark and portfolio, | |
also indicate if the stock is in portfolio and benchmark | |
''' | |
# most recent profiles | |
p_portfolio = get_most_recent_profile('portfolio') | |
p_benchmark = get_most_recent_profile('benchmark') | |
# combine ticker | |
unique_tickers = pd.concat([p_portfolio, p_benchmark])[ | |
'ticker'].unique().tolist() | |
# fetch list of stock | |
# TODO: hard code delta time to 1 day | |
start_date = p_portfolio.date[0] + dt.timedelta(days=1) | |
end_date = utils.time_in_beijing() | |
freq = 'daily' | |
stock_df = api.fetch_stocks_price( | |
unique_tickers, start_date, end_date, freq) | |
stock_df['in_portfolio'] = stock_df['ticker'].isin( | |
p_portfolio['ticker'].unique().tolist()) | |
stock_df['in_benchmark'] = stock_df['ticker'].isin( | |
p_benchmark['ticker'].unique().tolist()) | |
return stock_df | |
def need_to_update_stocks_price(delta_time): | |
# convert p_portfolio.date[0] to timezone-aware datetime object | |
tz = pytz.timezone('Asia/Shanghai') | |
# get stock price df | |
with create_engine(db_url).connect() as conn: | |
# check if a table exist | |
if not conn.dialect.has_table(conn, 'stocks_price'): | |
return True | |
else: | |
query = "SELECT * FROM stocks_price WHERE time = (SELECT MAX(time) FROM stocks_price)" | |
most_recent_price = pd.read_sql(query, con=conn) | |
most_recent_price.time = pd.to_datetime(most_recent_price.time) | |
date_time = tz.localize(most_recent_price.time[0].to_pydatetime()) | |
if utils.time_in_beijing() - date_time > delta_time: | |
return True | |
else: | |
return False | |
def processing(): | |
''' | |
run the whole processing pipeline here | |
''' | |
pass | |
def add_details_to_stock_df(stock_df): | |
with create_engine(db_url).connect() as conn: | |
detail_df = pd.read_sql(ts.STOCKS_DETAILS_TABLE, con=conn) | |
merged_df = pd.merge(stock_df, detail_df[ | |
['sector', 'name', | |
'aggregate_sector', | |
'display_name', | |
'ticker'] | |
], on='ticker', how='left') | |
merged_df['aggregate_sector'].fillna('其他', inplace=True) | |
return merged_df | |
def _validate_schema(df, schema): | |
''' | |
validate df has the same columns and data types as schema | |
Parameters | |
---------- | |
df: pd.DataFrame | |
schema: dict | |
{column_name: data_type} | |
Returns | |
------- | |
bool | |
True if df has the same columns and data types as schema | |
False otherwise | |
''' | |
# check if the DataFrame has the same columns as the schema | |
if set(df.columns) != set(schema.keys()): | |
return False | |
# check if the data types of the columns match the schema | |
# TODO: ignoring type check for now | |
# for col, dtype in schema.items(): | |
# if df[col].dtype != dtype: | |
# return False | |
return True | |
def save_stock_price_to_db(df: pd.DataFrame): | |
print('saving to stock to db') | |
with create_engine(db_url).connect() as conn: | |
df.to_sql('stocks_price', con=conn, if_exists='append', index=False) | |
def update_portfolio_profile_to_db(portfolio_df): | |
'''overwrite the portfolio profile table in db''' | |
if (_validate_schema(portfolio_df, ts.PORTFOLIO_TABLE_SCHEMA)): | |
raise ValueError( | |
'portfoliijuo_df has different schema than PORTFOLIO_DB_SCHEMA') | |
with create_engine(db_url).connect() as conn: | |
print("updating profile to db") | |
try: | |
portfolio_df[ts.PORTFOLIO_TABLE_SCHEMA.keys()].to_sql( | |
ts.PORTFOLIO_TABLE, con=conn, if_exists='append', index=False) | |
return True | |
except: | |
return False | |
# TODO trigger recomputation of analysis | |
def update_stock_price(): | |
'''get daily stocks price until today''' | |
# most recent profiles | |
p_portfolio = get_most_recent_profile('portfolio') | |
p_benchmark = get_most_recent_profile('benchmark') | |
# combine ticker | |
unique_tickers = pd.concat([p_portfolio, p_benchmark])[ | |
'ticker'].unique().tolist() | |
# fetch list of stock | |
# TODO: hard code delta time to 1 day | |
start_date = p_portfolio.date[0] + dt.timedelta(days=1) | |
end_date = utils.time_in_beijing() | |
freq = 'daily' | |
stock_df = api.fetch_stocks_price( | |
unique_tickers, start_date, end_date, freq) | |
stock_df['in_portfolio'] = stock_df['ticker'].isin( | |
p_portfolio['ticker'].unique().tolist()) | |
stock_df['in_benchmark'] = stock_df['ticker'].isin( | |
p_benchmark['ticker'].unique().tolist()) | |
return stock_df | |
def patch_stock_prices_db(window): | |
''' | |
patch stock price db with all daily stock price within window | |
Parameters | |
---------- | |
window: tuple | |
(start, end) date of the window | |
Returns | |
------- | |
None | |
''' | |
start, end = window | |
# all trading stock between start day and end date | |
with create_engine(db_url).connect() as conn: | |
all_stocks = pd.read_sql('stocks_details', con=conn) | |
selected_stocks = all_stocks[(all_stocks.start_date <= end) & ( | |
all_stocks.end_date >= start)] | |
tickers = selected_stocks.ticker.to_list() | |
# fetch stock price and append to db | |
stock_price = api.fetch_stocks_price(tickers, start, end, 'daily') | |
detailed_stock_df = add_details_to_stock_df(stock_price) | |
# drop where closing price is null | |
detailed_stock_df.dropna(subset=['close'], inplace=True) | |
with create_engine(db_url).connect() as conn: | |
detailed_stock_df.to_sql( | |
'stocks_price', con=conn, if_exists='append', index=False) | |
return detailed_stock_df | |
def update(): | |
''' | |
run only once, update stock price and benchmark profile | |
''' | |
print("Checking stock_price table") | |
# collect daily stock price until today in beijing time | |
if need_to_update_stocks_price(dt.timedelta(days=1)): | |
print("Updating stock_price table") | |
stock_df = update_stock_price() | |
stock_df = add_details_to_stock_df(stock_df) | |
save_stock_price_to_db(stock_df) | |
stock_price_stream.emit(stock_df) | |
async def run(): | |
''' | |
start the pipeline here to check update and fetch new data | |
''' | |
print("background_task running!") | |
# TODO: update benchmark_profile | |
# if (need_to_update_stocks_price()): | |
if True: | |
print("running update") | |
# TODO testing code get stock price df | |
with create_engine(db_url).connect() as conn: | |
stock_df = pd.read_sql('stocks_price', con=conn) | |
print('sending data!') | |
# print(stock_df) | |
stock_price_stream.emit(stock_df) | |
# # latest stock price | |
# stock_df = update_stocks_price() | |
# # add display name and sector to stock_df | |
# stock_df = add_details_to_stock_df(stock_df) | |
# save_stock_price_to_db(stock_df) | |
# stock_price_stream.emit(stock_df) | |
# update sotck_price | |
# send fetched data | |
# run processing | |
# send fetched data | |