portfolio_management / pipeline.py
huggingface112's picture
update db with daily stocks price
5fd74e2
raw
history blame
11.6 kB
import sys
# sys.path.append('/Users/lamonkey/Desktop/risk-monitor-dashboard')
import panel as pn
import datetime as dt
import asyncio
import random
from sqlalchemy import create_engine, text
import pandas as pd
from streamz import Stream
from datetime import timedelta
import settings
import os
import utils
import api
import numpy as np
import pytz
import table_schema as ts
import db_operation as db
# fetch new stock price
stock_price_stream = Stream()
# save stock price to db
# stock_price_stream.sink(save_stock_price)
# from dask.distributed import Client
# client = Client()
# import nest_asyncio
# nest_asyncio.apply()
# import settings
# run using --setup
db_url = 'sqlite:///instance/local.db'
def create_portfolio_profile_df(stocks: list[dict]):
profile_df = pd.DataFrame(stocks)
profile_df = add_details_to_stock_df(profile_df)
# check if there is duplicate ticker
if profile_df.ticker.duplicated().any():
raise Exception(
'VALIDATION ERROR: cannot have duplicate ticker with the same date')
return profile_df
def need_to_update(table_name: str, freq: dt.datetime):
'''check table with table_name need to update
Return
------
None if no need to update
(start_date, end_date, freq) if need to update
'''
with create_engine(db_url).connect() as conn:
max_date = conn.execute(
text(f"SELECT MAX(date) FROM {table_name}")).fetchone()[0]
max_date = utils.convert_string_to_datetime(max_date)
current_time = utils.time_in_beijing()
if current_time - max_date > freq:
return (max_date + freq, current_time, freq)
else:
return None
def need_to_fetch_new_stock_price():
'''
check if need to pull new stock price from jq
RETURN
------
(min_date, max_date) : if update is needed
the start and end date need to fetch new stock price
None if no need to fetch new stock price
'''
# get min date from portfolio_profile
with create_engine(db_url).connect() as conn:
table_name = 'portfolio_profile'
query = f"SELECT DISTINCT date FROM {table_name} ORDER BY date ASC LIMIT 1"
df = pd.read_sql(query, con=conn)
df.date = pd.to_datetime(df.date)
min_date = df.date[0]
# compare to min date from stocks_price
with create_engine(db_url).connect() as conn:
table_name = 'stocks_price'
query = f"SELECT DISTINCT time FROM {table_name} ORDER BY time ASC LIMIT 1"
df = pd.read_sql(query, con=conn)
df.time = pd.to_datetime(df.time)
# return
if min_date <= df.time[0]:
return (min_date, df.time[0] - dt.timedelta(days=1))
else:
return None
def get_most_recent_profile(type):
table_name = 'benchmark_profile' if type == 'benchmark' else 'portfolio_profile'
query = f"SELECT * FROM {table_name} WHERE date = (SELECT MAX(date) FROM {table_name})"
with create_engine(db_url).connect() as conn:
df = pd.read_sql(query, con=conn)
# convert date to datetime object
df['date'] = pd.to_datetime(df['date'])
return df
def update_stocks_details_to_db():
'''create table contain all stocks detail in db
will override existing table if exists
Table Schema
------------
'display_name', 'name', 'start_date', 'end_date', 'type', 'ticker',
'sector', 'aggregate_sector'
'''
df = api.get_all_stocks_detail()
# validation
if not _validate_schema(df, ts.STOCKS_DETAILS_TABLE_SCHEMA):
raise ValueError(
'df has different schema than STOCKS_DETAILS_TABLE_SCHEMA')
with create_engine(db_url).connect() as conn:
df.to_sql(ts.STOCKS_DETAILS_TABLE, con=conn,
if_exists='replace', index=False)
def fetch_new_stocks_price():
'''
get a df contain updated stock prices for both benchmark and portfolio,
also indicate if the stock is in portfolio and benchmark
'''
# most recent profiles
p_portfolio = get_most_recent_profile('portfolio')
p_benchmark = get_most_recent_profile('benchmark')
# combine ticker
unique_tickers = pd.concat([p_portfolio, p_benchmark])[
'ticker'].unique().tolist()
# fetch list of stock
# TODO: hard code delta time to 1 day
start_date = p_portfolio.date[0] + dt.timedelta(days=1)
end_date = utils.time_in_beijing()
freq = 'daily'
stock_df = api.fetch_stocks_price(
unique_tickers, start_date, end_date, freq)
stock_df['in_portfolio'] = stock_df['ticker'].isin(
p_portfolio['ticker'].unique().tolist())
stock_df['in_benchmark'] = stock_df['ticker'].isin(
p_benchmark['ticker'].unique().tolist())
return stock_df
def need_to_update_stocks_price(delta_time):
# convert p_portfolio.date[0] to timezone-aware datetime object
tz = pytz.timezone('Asia/Shanghai')
# get stock price df
with create_engine(db_url).connect() as conn:
# check if a table exist
if not conn.dialect.has_table(conn, 'stocks_price'):
return True
else:
query = "SELECT * FROM stocks_price WHERE time = (SELECT MAX(time) FROM stocks_price)"
most_recent_price = pd.read_sql(query, con=conn)
most_recent_price.time = pd.to_datetime(most_recent_price.time)
date_time = tz.localize(most_recent_price.time[0].to_pydatetime())
if utils.time_in_beijing() - date_time > delta_time:
return True
else:
return False
def processing():
'''
run the whole processing pipeline here
'''
pass
def add_details_to_stock_df(stock_df):
with create_engine(db_url).connect() as conn:
detail_df = pd.read_sql(ts.STOCKS_DETAILS_TABLE, con=conn)
merged_df = pd.merge(stock_df, detail_df[
['sector', 'name',
'aggregate_sector',
'display_name',
'ticker']
], on='ticker', how='left')
merged_df['aggregate_sector'].fillna('其他', inplace=True)
return merged_df
def _validate_schema(df, schema):
'''
validate df has the same columns and data types as schema
Parameters
----------
df: pd.DataFrame
schema: dict
{column_name: data_type}
Returns
-------
bool
True if df has the same columns and data types as schema
False otherwise
'''
# check if the DataFrame has the same columns as the schema
if set(df.columns) != set(schema.keys()):
return False
# check if the data types of the columns match the schema
# TODO: ignoring type check for now
# for col, dtype in schema.items():
# if df[col].dtype != dtype:
# return False
return True
def save_stock_price_to_db(df: pd.DataFrame):
print('saving to stock to db')
with create_engine(db_url).connect() as conn:
df.to_sql('stocks_price', con=conn, if_exists='append', index=False)
def update_portfolio_profile_to_db(portfolio_df):
'''overwrite the portfolio profile table in db'''
if (_validate_schema(portfolio_df, ts.PORTFOLIO_TABLE_SCHEMA)):
raise ValueError(
'portfoliijuo_df has different schema than PORTFOLIO_DB_SCHEMA')
with create_engine(db_url).connect() as conn:
print("updating profile to db")
try:
portfolio_df[ts.PORTFOLIO_TABLE_SCHEMA.keys()].to_sql(
ts.PORTFOLIO_TABLE, con=conn, if_exists='append', index=False)
return True
except:
return False
# TODO trigger recomputation of analysis
def update_daily_stocks_price():
'''
update all stocks price until today. used for fetching new stock price
if no portfolio, terminate without warning
default start date is the most recent date in portfolio
'''
most_recent_portfolio = db.get_most_recent_portfolio_profile()
most_recent_stocks_price = db.get_most_recent_stocks_price()
# fetch all stocks price until today
stocks_dates = most_recent_stocks_price.time
portfolio_dates = most_recent_portfolio.date
if len(portfolio_dates) == 0:
return
start = stocks_dates[0] if len(stocks_dates) > 0 else portfolio_dates[0]
end = utils.time_in_beijing()
# frequency is set to daily
if end - start > dt.timedelta(days=1):
new_stocks_price = fetch_all_stocks_price_between(start, end)
db.append_to_stocks_price_table(new_stocks_price)
def update_stock_price():
'''get daily stocks price until today'''
# most recent profiles
p_portfolio = get_most_recent_profile('portfolio')
p_benchmark = get_most_recent_profile('benchmark')
# combine ticker
unique_tickers = pd.concat([p_portfolio, p_benchmark])[
'ticker'].unique().tolist()
# fetch list of stock
# TODO: hard code delta time to 1 day
start_date = p_portfolio.date[0] + dt.timedelta(days=1)
end_date = utils.time_in_beijing()
freq = 'daily'
stock_df = api.fetch_stocks_price(
unique_tickers, start_date, end_date, freq)
stock_df['in_portfolio'] = stock_df['ticker'].isin(
p_portfolio['ticker'].unique().tolist())
stock_df['in_benchmark'] = stock_df['ticker'].isin(
p_benchmark['ticker'].unique().tolist())
return stock_df
def fetch_all_stocks_price_between(start, end):
'''
patch stock price db with all daily stock price within window
inclusive on both start and end date
Parameters
----------
window: tuple
(start, end) date of the window
Returns
-------
None
'''
# all trading stocks available between start day and end date
all_stocks = db.get_all_stocks()
selected_stocks = all_stocks[(all_stocks.start_date <= end) & (
all_stocks.end_date >= start)]
tickers = selected_stocks.ticker.to_list()
# fetch stock price and append to db
stock_price = api.fetch_stocks_price(
security=tickers, start_date=start, end_date=end, frequency='daily')
# drop where closing price is null
stock_price.dropna(subset=['close'], inplace=True)
return stock_price
def update():
'''
run only once, update stock price and benchmark profile
'''
print("Checking stock_price table")
# collect daily stock price until today in beijing time
if need_to_update_stocks_price(dt.timedelta(days=1)):
print("Updating stock_price table")
stock_df = update_stock_price()
stock_df = add_details_to_stock_df(stock_df)
save_stock_price_to_db(stock_df)
stock_price_stream.emit(stock_df)
async def run():
'''
start the pipeline here to check update and fetch new data
'''
print("background_task running!")
# TODO: update benchmark_profile
# if (need_to_update_stocks_price()):
if True:
print("running update")
# TODO testing code get stock price df
with create_engine(db_url).connect() as conn:
stock_df = pd.read_sql('stocks_price', con=conn)
print('sending data!')
# print(stock_df)
stock_price_stream.emit(stock_df)
# # latest stock price
# stock_df = update_stocks_price()
# # add display name and sector to stock_df
# stock_df = add_details_to_stock_df(stock_df)
# save_stock_price_to_db(stock_df)
# stock_price_stream.emit(stock_df)
# update sotck_price
# send fetched data
# run processing
# send fetched data