File size: 10,888 Bytes
976166f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import sys
# sys.path.append('/Users/lamonkey/Desktop/risk-monitor-dashboard')
import panel as pn
import datetime as dt
import asyncio
import random
from sqlalchemy import create_engine, text
import pandas as pd
from streamz import Stream
from datetime import timedelta
import settings
import os
import utils
import api
import numpy as np
import pytz
import table_schema as table_s
# fetch new stock price
stock_price_stream = Stream()

# save stock price to db
# stock_price_stream.sink(save_stock_price)
# from dask.distributed import Client
# client = Client()
# import nest_asyncio
# nest_asyncio.apply()
# import settings

# run using  --setup
db_url = "sqlite:///local.db"


def create_portfolio_profile_df(stocks: list[dict]):
    profile_df = pd.DataFrame(stocks)
    profile_df = add_details_to_stock_df(profile_df)

    # check if there is duplicate ticker
    if profile_df.ticker.duplicated().any():
        raise Exception(
            'VALIDATION ERROR: cannot have duplicate ticker with the same date')

    return profile_df


def need_to_update(table_name: str, freq: dt.datetime):
    '''check table with table_name need to update
    Return
    ------
    None if no need to update
    (start_date, end_date, freq) if need to update
    '''
    with create_engine(db_url).connect() as conn:
        max_date = conn.execute(
            text(f"SELECT MAX(date) FROM {table_name}")).fetchone()[0]
        max_date = utils.convert_string_to_datetime(max_date)
        current_time = utils.time_in_beijing()
        if current_time - max_date > freq:
            return (max_date + freq, current_time, freq)
        else:
            return None


def need_to_fetch_new_stock_price():
    '''
    check if need to pull new stock price from jq

    RETURN
    ------
    (min_date, max_date) : if update is needed
       the start and end date need to fetch new stock price
    None if no need to fetch new stock price

    '''
    # get min date from portfolio_profile
    with create_engine(db_url).connect() as conn:
        table_name = 'portfolio_profile'
        query = f"SELECT DISTINCT date FROM {table_name} ORDER BY date ASC LIMIT 1"
        df = pd.read_sql(query, con=conn)
        df.date = pd.to_datetime(df.date)
        min_date = df.date[0]

    # compare to min date from stocks_price
    with create_engine(db_url).connect() as conn:
        table_name = 'stocks_price'
        query = f"SELECT DISTINCT time FROM {table_name} ORDER BY time ASC LIMIT 1"
        df = pd.read_sql(query, con=conn)
        df.time = pd.to_datetime(df.time)

    # return
    if min_date <= df.time[0]:
        return (min_date, df.time[0] - dt.timedelta(days=1))
    else:
        return None


def get_most_recent_profile(type):
    table_name = 'benchmark_profile' if type == 'benchmark' else 'portfolio_profile'
    query = f"SELECT * FROM {table_name} WHERE date = (SELECT MAX(date) FROM {table_name})"
    with create_engine(db_url).connect() as conn:
        df = pd.read_sql(query, con=conn)
        # convert date to datetime object
        df['date'] = pd.to_datetime(df['date'])
        return df


def update_stocks_details_to_db():
    '''create table contain all stocks detail in db
    will override existing table if exists
    Table Schema
    ------------
    'display_name', 'name', 'start_date', 'end_date', 'type', 'ticker',
       'sector', 'aggregate_sector'
    '''
    df = api.get_all_stocks_detail()
    # validation
    if not _validate_schema(df, table_s.STOCKS_DETAILS_TABLE_SCHEMA):
        raise ValueError(
            'df has different schema than STOCKS_DETAILS_TABLE_SCHEMA')
    with create_engine(db_url).connect() as conn:
        df.to_sql(table_s.STOCKS_DETAILS_TABLE, con=conn,
                  if_exists='replace', index=False)


def fetch_new_stocks_price():
    '''
    get a df contain updated stock prices for both benchmark and portfolio,
    also indicate if the stock is in portfolio and benchmark
    '''
    # most recent profiles
    p_portfolio = get_most_recent_profile('portfolio')
    p_benchmark = get_most_recent_profile('benchmark')
    # combine ticker
    unique_tickers = pd.concat([p_portfolio, p_benchmark])[
        'ticker'].unique().tolist()
    # fetch list of stock
    # TODO: hard code delta time to 1 day
    start_date = p_portfolio.date[0] + dt.timedelta(days=1)
    end_date = utils.time_in_beijing()
    freq = 'daily'
    stock_df = api.fetch_stocks_price(
        unique_tickers, start_date, end_date, freq)
    stock_df['in_portfolio'] = stock_df['ticker'].isin(
        p_portfolio['ticker'].unique().tolist())
    stock_df['in_benchmark'] = stock_df['ticker'].isin(
        p_benchmark['ticker'].unique().tolist())
    return stock_df


def need_to_update_stocks_price(delta_time):
    # convert p_portfolio.date[0] to timezone-aware datetime object
    tz = pytz.timezone('Asia/Shanghai')
    # get stock price df
    with create_engine(db_url).connect() as conn:
        # check if a table exist
        if not conn.dialect.has_table(conn, 'stocks_price'):
            return True
        else:
            query = "SELECT * FROM stocks_price WHERE time = (SELECT MAX(time) FROM stocks_price)"
            most_recent_price = pd.read_sql(query, con=conn)
            most_recent_price.time = pd.to_datetime(most_recent_price.time)
            date_time = tz.localize(most_recent_price.time[0].to_pydatetime())
            if utils.time_in_beijing() - date_time > delta_time:
                return True
            else:
                return False


def processing():
    '''
    run the whole processing pipeline here
    '''
    pass


def add_details_to_stock_df(stock_df):
    with create_engine(db_url).connect() as conn:
        detail_df = pd.read_sql('stocks_details', con=conn)
        merged_df = pd.merge(stock_df, detail_df[
            ['sector', 'name',
             'aggregate_sector',
             'display_name',
             'ticker']
        ], on='ticker', how='left')
        merged_df['aggregate_sector'].fillna('其他', inplace=True)
        return merged_df


def _validate_schema(df, schema):
    '''
    validate df has the same columns and data types as schema

    Parameters
    ----------
    df: pd.DataFrame
    schema: dict
        {column_name: data_type}

    Returns
    -------
    bool
        True if df has the same columns and data types as schema
        False otherwise
    '''

    # check if the DataFrame has the same columns as the schema
    if set(df.columns) != set(schema.keys()):
        return False
    # check if the data types of the columns match the schema
    # TODO: ignoring type check for now
    # for col, dtype in schema.items():
    #     if df[col].dtype != dtype:
    #         return False
    return True


def save_stock_price_to_db(df: pd.DataFrame):
    print('saving to stock to db')
    with create_engine(db_url).connect() as conn:
        df.to_sql('stocks_price', con=conn, if_exists='append', index=False)


def update_portfolio_profile_to_db(portfolio_df):
    '''overwrite the portfolio profile table in db'''

    if (_validate_schema(portfolio_df, table_s.PORTFOLIO_TABLE_SCHEMA)):
        raise ValueError(
            'portfoliijuo_df has different schema than PORTFOLIO_DB_SCHEMA')

    with create_engine(db_url).connect() as conn:
        print("updating profile to db")
        try:
            portfolio_df[table_s.PORTFOLIO_TABLE_SCHEMA.keys()].to_sql(
                table_s.PORTFOLIO_TABLE, con=conn, if_exists='append', index=False)
            return True
        except:
            return False
    # TODO trigger recomputation of analysis


def update_stock_price():
    '''get daily stocks price until today'''
    # most recent profiles
    p_portfolio = get_most_recent_profile('portfolio')
    p_benchmark = get_most_recent_profile('benchmark')
    # combine ticker
    unique_tickers = pd.concat([p_portfolio, p_benchmark])[
        'ticker'].unique().tolist()
    # fetch list of stock
    # TODO: hard code delta time to 1 day
    start_date = p_portfolio.date[0] + dt.timedelta(days=1)
    end_date = utils.time_in_beijing()
    freq = 'daily'
    stock_df = api.fetch_stocks_price(
        unique_tickers, start_date, end_date, freq)
    stock_df['in_portfolio'] = stock_df['ticker'].isin(
        p_portfolio['ticker'].unique().tolist())
    stock_df['in_benchmark'] = stock_df['ticker'].isin(
        p_benchmark['ticker'].unique().tolist())
    return stock_df


def patch_stock_prices_db(window):
    '''
    patch stock price db with all daily stock price within window

    Parameters
    ----------
    window: tuple
        (start, end) date of the window

    Returns
    -------
    None
    '''
    start, end = window
    # all trading stock between start day and end date
    with create_engine(db_url).connect() as conn:
        all_stocks = pd.read_sql('stocks_details', con=conn)

    selected_stocks = all_stocks[(all_stocks.start_date <= end) & (
        all_stocks.end_date >= start)]
    tickers = selected_stocks.ticker.to_list()

    # fetch stock price and append to db
    stock_price = api.fetch_stocks_price(tickers, start, end, 'daily')
    detailed_stock_df = add_details_to_stock_df(stock_price)
    # drop where closing price is null
    detailed_stock_df.dropna(subset=['close'], inplace=True)
    with create_engine(db_url).connect() as conn:
        detailed_stock_df.to_sql(
            'stocks_price', con=conn, if_exists='append', index=False)
    return detailed_stock_df


def update():
    '''
    run only once, update stock price and benchmark profile
    '''
    print("Checking stock_price table")
    # collect daily stock price until today in beijing time
    if need_to_update_stocks_price(dt.timedelta(days=1)):
        print("Updating stock_price table")
        stock_df = update_stock_price()
        stock_df = add_details_to_stock_df(stock_df)
        save_stock_price_to_db(stock_df)
        stock_price_stream.emit(stock_df)


async def run():
    '''
    start the pipeline here to check update and fetch new data
    '''
    print("background_task running!")
    # TODO: update benchmark_profile
    # if (need_to_update_stocks_price()):
    if True:
        print("running update")
        # TODO testing code get stock price df
        with create_engine(db_url).connect() as conn:
            stock_df = pd.read_sql('stocks_price', con=conn)
            print('sending data!')
            # print(stock_df)
            stock_price_stream.emit(stock_df)

        # # latest stock price
        # stock_df = update_stocks_price()
        # # add display name and sector to stock_df
        # stock_df = add_details_to_stock_df(stock_df)
        # save_stock_price_to_db(stock_df)
        # stock_price_stream.emit(stock_df)
    # update sotck_price

    # send fetched data

    # run processing

    # send fetched data