Spaces:
Runtime error
Runtime error
import datetime as dt | |
from sqlalchemy import create_engine, text | |
import pandas as pd | |
from streamz import Stream | |
import utils | |
import api | |
import pytz | |
import table_schema as ts | |
import db_operation as db | |
from log import Log | |
# import settings | |
# fetch new stock price | |
stock_price_stream = Stream() | |
# log | |
log = Log('instance/log.json') | |
# save stock price to db | |
# stock_price_stream.sink(save_stock_price) | |
# from dask.distributed import Client | |
# client = Client() | |
# import nest_asyncio | |
# nest_asyncio.apply() | |
# import settings | |
# run using --setup | |
db_url = 'sqlite:///instance/local.db' | |
def create_portfolio_profile_df(stocks: list[dict]): | |
profile_df = pd.DataFrame(stocks) | |
profile_df = add_details_to_stock_df(profile_df) | |
# check if there is duplicate ticker | |
if profile_df.ticker.duplicated().any(): | |
raise Exception( | |
'VALIDATION ERROR: cannot have duplicate ticker with the same date') | |
return profile_df | |
def need_to_update(table_name: str, freq: dt.datetime): | |
'''check table with table_name need to update | |
Return | |
------ | |
None if no need to update | |
(start_date, end_date, freq) if need to update | |
''' | |
with create_engine(db_url).connect() as conn: | |
max_date = conn.execute( | |
text(f"SELECT MAX(date) FROM {table_name}")).fetchone()[0] | |
max_date = utils.convert_string_to_datetime(max_date) | |
current_time = utils.time_in_beijing() | |
if current_time - max_date > freq: | |
return (max_date + freq, current_time, freq) | |
else: | |
return None | |
def need_to_forward_fill_stock_price(): | |
''' | |
check if need to pull new stock price from jq | |
RETURN | |
------ | |
(min_date, max_date) : if update is needed | |
the start and end date need to fetch new stock price | |
None if no need to fetch new stock price | |
''' | |
# TODO refactor to db | |
# get min date from portfolio_profile | |
with create_engine(db_url).connect() as conn: | |
table_name = 'portfolio_profile' | |
query = f"SELECT DISTINCT date FROM {table_name} ORDER BY date ASC LIMIT 1" | |
df = pd.read_sql(query, con=conn) | |
df.date = pd.to_datetime(df.date) | |
min_date = df.date[0] | |
# TODO refactor to db | |
# compare to min date from stocks_price | |
with create_engine(db_url).connect() as conn: | |
table_name = 'stocks_price' | |
query = f"SELECT DISTINCT time FROM {table_name} ORDER BY time ASC LIMIT 1" | |
df = pd.read_sql(query, con=conn) | |
df.time = pd.to_datetime(df.time) | |
# return | |
if min_date <= df.time[0]: | |
return (min_date, df.time[0] - dt.timedelta(days=1)) | |
else: | |
return None | |
def get_most_recent_profile(type): | |
table_name = 'benchmark_profile' if type == 'benchmark' else 'portfolio_profile' | |
query = f"SELECT * FROM {table_name} WHERE date = (SELECT MAX(date) FROM {table_name})" | |
with create_engine(db_url).connect() as conn: | |
df = pd.read_sql(query, con=conn) | |
# convert date to datetime object | |
df['date'] = pd.to_datetime(df['date']) | |
return df | |
def update_stocks_details_to_db(): | |
'''override stocks_details table with new stocks detail | |
Table Schema | |
------------ | |
'display_name', 'name', 'start_date', 'end_date', 'type', 'ticker', | |
'sector', 'aggregate_sector' | |
''' | |
df = api.get_all_stocks_detail() | |
# validation | |
if not _validate_schema(df, ts.STOCKS_DETAILS_TABLE_SCHEMA): | |
raise ValueError( | |
'df has different schema than STOCKS_DETAILS_TABLE_SCHEMA') | |
with create_engine(db_url).connect() as conn: | |
df.to_sql(ts.STOCKS_DETAILS_TABLE, con=conn, | |
if_exists='replace', index=False) | |
def need_to_update_stocks_price(delta_time): | |
# convert p_portfolio.date[0] to timezone-aware datetime object | |
tz = pytz.timezone('Asia/Shanghai') | |
# get stock price df | |
with create_engine(db_url).connect() as conn: | |
# check if a table exist | |
if not conn.dialect.has_table(conn, 'stocks_price'): | |
return True | |
else: | |
query = "SELECT * FROM stocks_price WHERE time = (SELECT MAX(time) FROM stocks_price)" | |
most_recent_price = pd.read_sql(query, con=conn) | |
most_recent_price.time = pd.to_datetime(most_recent_price.time) | |
date_time = tz.localize(most_recent_price.time[0].to_pydatetime()) | |
if utils.time_in_beijing() - date_time > delta_time: | |
return True | |
else: | |
return False | |
def processing(): | |
''' | |
run the whole processing pipeline here | |
''' | |
pass | |
def add_details_to_stock_df(stock_df): | |
with create_engine(db_url).connect() as conn: | |
detail_df = pd.read_sql(ts.STOCKS_DETAILS_TABLE, con=conn) | |
merged_df = pd.merge(stock_df, detail_df[ | |
['sector', 'name', | |
'aggregate_sector', | |
'display_name', | |
'ticker'] | |
], on='ticker', how='left') | |
merged_df['aggregate_sector'].fillna('其他', inplace=True) | |
return merged_df | |
def _validate_schema(df, schema): | |
''' | |
validate df has the same columns and data types as schema | |
Parameters | |
---------- | |
df: pd.DataFrame | |
schema: dict | |
{column_name: data_type} | |
Returns | |
------- | |
bool | |
True if df has the same columns and data types as schema | |
False otherwise | |
''' | |
# check if the DataFrame has the same columns as the schema | |
if set(df.columns) != set(schema.keys()): | |
return False | |
# check if the data types of the columns match the schema | |
# TODO: ignoring type check for now | |
# for col, dtype in schema.items(): | |
# if df[col].dtype != dtype: | |
# return False | |
return True | |
def save_stock_price_to_db(df: pd.DataFrame): | |
print('saving to stock to db') | |
with create_engine(db_url).connect() as conn: | |
df.to_sql('stocks_price', con=conn, if_exists='append', index=False) | |
def update_portfolio_profile_to_db(portfolio_df): | |
'''overwrite the portfolio profile table in db''' | |
if (_validate_schema(portfolio_df, ts.PORTFOLIO_TABLE_SCHEMA)): | |
raise ValueError( | |
'portfoliijuo_df has different schema than PORTFOLIO_DB_SCHEMA') | |
with create_engine(db_url).connect() as conn: | |
print("updating profile to db") | |
try: | |
portfolio_df[ts.PORTFOLIO_TABLE_SCHEMA.keys()].to_sql( | |
ts.PORTFOLIO_TABLE, con=conn, if_exists='append', index=False) | |
return True | |
except: | |
return False | |
# TODO trigger recomputation of analysis | |
def right_fill_stock_price(): | |
''' | |
update all stocks price until today. | |
if no portfolio, terminate without warning | |
default start date is the most recent date in portfolio | |
''' | |
most_recent_portfolio = db.get_most_recent_portfolio_profile() | |
most_recent_stocks_price = db.get_most_recent_stocks_price() | |
# fetch all stocks price until today | |
stocks_dates = most_recent_stocks_price.time | |
portfolio_dates = most_recent_portfolio.date | |
if len(portfolio_dates) == 0: | |
return | |
start = stocks_dates[0] if len(stocks_dates) > 0 else portfolio_dates[0] | |
end = utils.time_in_beijing() | |
# frequency is set to daily | |
if end - start > dt.timedelta(days=1): | |
new_stocks_price = fetch_all_stocks_price_between(start, end) | |
db.append_to_stocks_price_table(new_stocks_price) | |
def fetch_all_stocks_price_between(start, end): | |
''' | |
patch stock price db with all daily stock price within window | |
inclusive on both start and end date | |
Parameters | |
---------- | |
start : datetime | |
start date inclusive | |
end: datetime | |
end date inclusive | |
Returns | |
------- | |
None | |
''' | |
# all trading stocks available between start day and end date | |
all_stocks = db.get_all_stocks() | |
selected_stocks = all_stocks[(all_stocks.start_date <= end) & ( | |
all_stocks.end_date >= start)] | |
tickers = selected_stocks.ticker.to_list() | |
# fetch stock price and append to db | |
stock_price = api.fetch_stocks_price( | |
security=tickers, | |
start_date=start, | |
end_date=end, | |
frequency='daily', | |
skip_paused=True,) | |
# drop where closing price is null | |
stock_price.dropna(subset=['close'], inplace=True) | |
return stock_price | |
def right_fill_bechmark_profile(): | |
''' | |
right fill the benchmark profile table | |
fill any missing entries between the most recent date in benchmark profile and today | |
if no benchmark profile, fill from most recent date in portfolio profile to today | |
if no portfolio profile, terminate without warning | |
''' | |
# get most recent date in benchmark profile | |
b_ends = db.get_most_recent_benchmark_profile().date | |
# get todays date | |
today = utils.time_in_beijing() | |
# most recent portfolio dates | |
p_ends = db.get_most_recent_portfolio_profile().date | |
# if portfolio is empty, terminate | |
if len(p_ends) == 0: | |
return | |
# if no benchmark profile, start is the most recent date in benchmark profile, end is today | |
elif len(b_ends) == 0: | |
start = p_ends[0] | |
end = today | |
# start is most recent benchmark, end is today | |
else: | |
start = b_ends[0] | |
end = today | |
# fetch and update | |
new_entry = api.fetch_benchmark_profile(start, end) | |
detailed_new_entry = utils.add_details_to_stock_df(new_entry) | |
db.append_to_benchmark_profile(detailed_new_entry) | |
def left_fill_benchmark_profile(): | |
''' | |
left fill the benchmark profile table, | |
fill any missing entries between the earliest date in portfolio profile and the earliest date in benchmark profile | |
if no portfolio profile, terminate without warning | |
if no benchmark profile, the span would be from the earliest date in portfolio profile to the most recent date in portfolio profile | |
''' | |
# get starttime of benchmark profile | |
b_starts = db.get_oldest_benchmark_profile().date | |
# get starttime of portfolio profile | |
p_starts = db.get_oldest_portfolio_profile().date | |
# if no portfolio profile, terminate | |
if len(p_starts) == 0: | |
return | |
# use start and end date of portfolio profile if no benchmark profile entry | |
elif len(b_starts) == 0: | |
p_start = p_starts[0] | |
b_start = db.get_most_recent_portfolio_profile().date[0] | |
else: | |
b_start = b_starts[0] | |
p_start = p_starts[0] | |
if p_start < b_start: | |
# back fill benchmark profile | |
new_entry = api.fetch_benchmark_profile(p_start, b_start) | |
detailed_new_entry = utils.add_details_to_stock_df(new_entry) | |
# handle duplicate display_name | |
detailed_new_entry.drop(columns=['display_name_x'], inplace=True) | |
detailed_new_entry.rename( | |
columns={'display_name_y': 'display_name'}, inplace=True) | |
# append to db | |
db.append_to_benchmark_profile(detailed_new_entry) | |
# return detailed_new_entry | |
# else do nothing | |
def left_fill_stocks_price(): | |
''' | |
left fill stock price | |
fill missing entries between the oldest date in portfolio profile and the oldest date in stock price table | |
if no portfolio profile, terminate without warning | |
if no stock price table, the span would be from the oldest date in portfolio profile to the most recent date in portfolio profile | |
''' | |
# get oldest time in portfolio profile | |
p_start = db.get_oldest_portfolio_profile().date | |
# get oldest time in stock price table | |
stock_start = db.get_oldest_stocks_price().time | |
# if no portfolio profile, terminate | |
if len(p_start) == 0: | |
return | |
# no stock price, span the entire portfolio profile | |
elif len(stock_start) == 0: | |
start = p_start[0] | |
end = db.get_most_recent_portfolio_profile().date[0] | |
else: | |
start = p_start[0] | |
end = stock_start[0] | |
if start < end: | |
# fetch and update | |
new_entry = fetch_all_stocks_price_between(start, end) | |
db.append_to_stocks_price_table(new_entry) | |
def updaet_benchmark_to_db(): | |
''' | |
update daily benchmark weight | |
''' | |
pass | |
async def daily_update(): | |
last_update = log.get_time('daily_update') | |
if last_update is None or utils.time_in_beijing() - last_update >= dt.timedelta(days=1): | |
print("running daily update") | |
# check if need to update | |
# update stock price | |
left_fill_stocks_price() | |
right_fill_stock_price() | |
print("updated stocks price") | |
# update benchmark index | |
left_fill_benchmark_profile() | |
right_fill_bechmark_profile() | |
print("updated benchmark profile") | |
# update all stock detail | |
update_stocks_details_to_db() | |
print("updated stocks details") | |
log.update_log('daily_update') | |
else: | |
print("no update needed") | |
def update(): | |
''' | |
run only once, update stock price and benchmark profile | |
''' | |
print("Checking stock_price table") | |
# collect daily stock price until today in beijing time | |
if need_to_update_stocks_price(dt.timedelta(days=1)): | |
print("Updating stock_price table") | |
# stock_df = update_stock_price() | |
stock_df = add_details_to_stock_df(stock_df) | |
save_stock_price_to_db(stock_df) | |
stock_price_stream.emit(stock_df) | |
async def run(): | |
''' | |
start the pipeline here to check update and fetch new data | |
''' | |
print("background_task running!") | |
# TODO: update benchmark_profile | |
# if (need_to_update_stocks_price()): | |
if True: | |
print("running update") | |
# TODO testing code get stock price df | |
with create_engine(db_url).connect() as conn: | |
stock_df = pd.read_sql('stocks_price', con=conn) | |
print('sending data!') | |
# print(stock_df) | |
stock_price_stream.emit(stock_df) | |
# # latest stock price | |
# stock_df = update_stocks_price() | |
# # add display name and sector to stock_df | |
# stock_df = add_details_to_stock_df(stock_df) | |
# save_stock_price_to_db(stock_df) | |
# stock_price_stream.emit(stock_df) | |
# update sotck_price | |
# send fetched data | |
# run processing | |
# send fetched data | |