|
from fastapi import FastAPI, HTTPException |
|
from pydantic import BaseModel |
|
import pandas as pd |
|
import numpy as np |
|
import tensorflow as tf |
|
from yahoo_fin.stock_info import get_data |
|
from sklearn.preprocessing import MinMaxScaler |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
from pytorch_forecasting import TemporalFusionTransformer |
|
from bs4 import BeautifulSoup |
|
import requests |
|
from dotenv import load_dotenv |
|
import os |
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = "-1" |
|
|
|
import torch |
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
|
MODEL_PATH = "lib/20_lstm_model.h5" |
|
model = tf.keras.models.load_model(MODEL_PATH) |
|
|
|
model_name_news = "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis" |
|
tokenizer = AutoTokenizer.from_pretrained(model_name_news) |
|
sentiment_model = AutoModelForSequenceClassification.from_pretrained( |
|
model_name_news).to(device) |
|
|
|
best_model_path = 'lib/tft_pred.ckpt' |
|
|
|
best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path).to(device) |
|
|
|
app = FastAPI() |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["GET", "POST", "PUT", "DELETE"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
class TickerRequest(BaseModel): |
|
ticker: str |
|
start_date: str |
|
end_date: str |
|
interval: str = "1d" |
|
|
|
def fetch_and_process_ticker_data(ticker, start_date, end_date, interval="1d"): |
|
df = pd.DataFrame() |
|
try: |
|
temp = get_data(ticker, start_date=start_date, |
|
end_date=end_date, index_as_date=True, interval=interval) |
|
temp = temp.drop(columns="close") |
|
temp["revenue"] = temp["adjclose"] * temp["volume"] |
|
temp["daily_profit"] = temp["adjclose"] - temp["open"] |
|
df = pd.concat([df, temp], axis=0) |
|
|
|
except Exception as error: |
|
raise HTTPException( |
|
status_code=500, detail=f"Error processing ticker {ticker}: {error}") |
|
return df |
|
|
|
|
|
def ticker_encoded(df): |
|
label_map = {'ATOM': 0, 'HBIO': 1, 'IBEX': 2, 'MYFW': 3, 'NATH': 4} |
|
|
|
ticker_encoded = [] |
|
|
|
for i in df.iloc(): |
|
|
|
ticker_name = i['ticker'] |
|
|
|
encoded_ticker = label_map[ticker_name] |
|
|
|
ticker_encoded.append(encoded_ticker) |
|
df['ticker_encoded'] = ticker_encoded |
|
|
|
return df |
|
|
|
|
|
def normalize(df): |
|
price_scaler = MinMaxScaler() |
|
volume_revenue_scaler = MinMaxScaler() |
|
profit_scaler = MinMaxScaler() |
|
|
|
df[["open", "high", "low", "adjclose"]] = price_scaler.fit_transform( |
|
df[["open", "high", "low", "adjclose"]]) |
|
df[["volume", "revenue"]] = volume_revenue_scaler.fit_transform( |
|
df[["volume", "revenue"]]) |
|
df[["daily_profit"]] = profit_scaler.fit_transform(df[["daily_profit"]]) |
|
|
|
return df, price_scaler |
|
|
|
|
|
def create_sequence(dataset): |
|
sequences = [] |
|
labels = [] |
|
dates = [] |
|
stock = [] |
|
|
|
df_copy = dataset.drop(columns=["date"]) |
|
|
|
start_idx = 0 |
|
for stop_idx in range(20, len(dataset)): |
|
set_ = set(dataset.iloc[start_idx:stop_idx]["ticker_encoded"].values) |
|
target_day_ticker_class = dataset.iloc[stop_idx]["ticker_encoded"] |
|
|
|
if len(set_) == 1 and target_day_ticker_class == list(set_)[0]: |
|
sequences.append(df_copy.iloc[start_idx:stop_idx].values) |
|
labels.append(df_copy.iloc[stop_idx][["open", "adjclose"]]) |
|
date_string = dataset.iloc[stop_idx]["date"].strftime('%Y-%m-%d') |
|
dates.append(date_string) |
|
stock.append(str(dataset.iloc[stop_idx]["ticker_encoded"])) |
|
|
|
start_idx += 1 |
|
|
|
return np.array(sequences), np.array(labels), dates, stock |
|
|
|
|
|
def scaling_predictions(price_scaler, combined_dataset_prediction): |
|
|
|
price_scaler.min_ = np.array([price_scaler.min_[0], price_scaler.min_[3]]) |
|
|
|
price_scaler.scale_ = np.array( |
|
[price_scaler.scale_[0], price_scaler.scale_[3]]) |
|
|
|
combined_dataset_prediction_inverse = price_scaler.inverse_transform( |
|
combined_dataset_prediction) |
|
|
|
return combined_dataset_prediction_inverse |
|
|
|
|
|
def storing_predictions(df, dates, stock, combined_dataset_prediction_inverse): |
|
|
|
df['pred_open'] = np.nan |
|
|
|
df['pred_closing'] = np.nan |
|
|
|
for idx, row in df.iterrows(): |
|
|
|
current_row_date = row.date.strftime('%Y-%m-%d') |
|
|
|
current_row_ticker = str(row.ticker_encoded) |
|
|
|
for i in range(len(dates)): |
|
|
|
if current_row_date == dates[i] and stock[i] == current_row_ticker: |
|
|
|
opening_price = combined_dataset_prediction_inverse[i][0] |
|
closing_price = combined_dataset_prediction_inverse[i][1] |
|
df.loc[idx, 'pred_open'] = opening_price |
|
df.loc[idx, 'pred_closing'] = closing_price |
|
|
|
break |
|
df = df.dropna(subset=['pred_open', 'pred_closing']).reset_index(drop=True) |
|
|
|
return df |
|
|
|
|
|
def scrape_news(ticker_name): |
|
|
|
columns = ['datatime', 'title', 'source', |
|
'link', 'top_sentiment', 'sentiment_score'] |
|
df = pd.DataFrame(columns=columns) |
|
|
|
for i in range(1, 3): |
|
|
|
url = f'https://markets.businessinsider.com/news/{ticker_name}-stock?p={i}' |
|
response = requests.get(url) |
|
html = response.text |
|
soup = BeautifulSoup(html, 'lxml') |
|
|
|
articles = soup.find_all('div', class_='latest-news__story') |
|
|
|
for article in articles: |
|
datatime = article.find( |
|
'time', class_='latest-news__date').get('datetime') |
|
|
|
title = article.find('a', class_='news-link').text |
|
|
|
source = article.find('span', class_='latest-news__source').text |
|
|
|
link = article.find('a', class_='news-link').get('href') |
|
|
|
top_sentiment = '' |
|
|
|
sentiment_score = 0 |
|
|
|
temp = pd.DataFrame( |
|
[[datatime, title, source, link, top_sentiment, sentiment_score]], columns=df.columns) |
|
|
|
df = pd.concat([temp, df], axis=0) |
|
|
|
return df |
|
|
|
|
|
def add_recent_news(main_df, news_df, lookback_days=10): |
|
|
|
news_df.drop(columns=['top_sentiment', 'sentiment_score'], inplace=True) |
|
|
|
main_df['date'] = pd.to_datetime(main_df['date']) |
|
news_df['datatime'] = pd.to_datetime(news_df['datatime']) |
|
|
|
news_list = [] |
|
last_available_news = '' |
|
|
|
for _, row in main_df.iterrows(): |
|
current_date = row['date'] |
|
current_ticker = row['ticker'] |
|
news_articles = '' |
|
|
|
for _, news_row in news_df.iterrows(): |
|
extracted_date = news_row['datatime'] |
|
|
|
if (current_date - extracted_date).days <= lookback_days and extracted_date < current_date: |
|
news_articles += news_row['title'] + " " |
|
|
|
if not news_articles.strip(): |
|
for _, news_row in news_df[::-1].iterrows(): |
|
if news_row['datatime'] < current_date: |
|
news_articles = news_row['title'] |
|
break |
|
|
|
last_available_news = news_articles.strip() or last_available_news |
|
news_list.append(last_available_news) |
|
|
|
main_df['news'] = news_list |
|
|
|
return main_df |
|
|
|
|
|
def news_sentiment(df): |
|
|
|
news_column_name = 'news' |
|
texts = df[news_column_name].tolist() |
|
|
|
inputs = tokenizer(texts, padding=True, |
|
truncation=True, return_tensors="pt") |
|
inputs = {key: val.to(device) for key, val in inputs.items()} |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = sentiment_model(**inputs) |
|
|
|
logits = outputs.logits |
|
probs = torch.softmax(logits, dim=-1) |
|
|
|
labels = ["negative", "neutral", "positive"] |
|
|
|
predictions = torch.argmax(probs, dim=-1) |
|
|
|
df['predicted_sentiment'] = pd.Series( |
|
[labels[pred] for pred in predictions], index=df[df[news_column_name].notna()].index) |
|
|
|
sentiment_map = { |
|
'positive': 1, |
|
'neutral': 0, |
|
'negative': -1 |
|
} |
|
|
|
df['sentiment_score'] = df['predicted_sentiment'].map(sentiment_map) |
|
|
|
df = df.drop(columns=['news']) |
|
|
|
return df |
|
|
|
|
|
def get_tft_predictions(df): |
|
for i in range(1, 21): |
|
df[f'open_lag_{i}'] = df.groupby('ticker')['open'].shift(i) |
|
df[f'adjclose_lag_{i}'] = df.groupby('ticker')['adjclose'].shift(i) |
|
|
|
lag_columns = [f'open_lag_{i}' for i in range( |
|
1, 21)] + [f'adjclose_lag_{i}' for i in range(1, 21)] |
|
|
|
df.dropna(subset=lag_columns, inplace=True) |
|
|
|
predictions = best_tft.predict(df.to(device), mode="quantiles") |
|
|
|
return predictions |
|
|
|
|
|
@app.get("/fetch-ticker-data/{ticker_name}/{start_date}/{end_date}/{interval}") |
|
async def fetch_ticker_data(ticker_name: str, start_date: str, end_date: str, interval: str): |
|
try: |
|
result_df = fetch_and_process_ticker_data( |
|
ticker=ticker_name, |
|
start_date=start_date, |
|
end_date=end_date, |
|
interval=interval |
|
) |
|
return result_df.to_dict(orient="records") |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/predict-prices/{ticker_name}/{start_date}/{end_date}/{interval}") |
|
async def predict_prices(ticker_name: str, start_date: str, end_date: str, interval: str): |
|
try: |
|
result_df = fetch_and_process_ticker_data( |
|
ticker=ticker_name, |
|
start_date=start_date, |
|
end_date=end_date, |
|
interval=interval |
|
) |
|
|
|
raw_data = result_df.tail(60) |
|
raw_data = raw_data.reset_index() |
|
|
|
raw_data.rename(columns={"index": "date"}, inplace=True) |
|
raw_data = ticker_encoded(raw_data) |
|
|
|
temp_df = raw_data.copy() |
|
|
|
normalized_data, scaler = normalize(raw_data) |
|
normalized_data = normalized_data.drop(columns=['ticker']) |
|
|
|
sequences, _, dates, stock = create_sequence(normalized_data) |
|
combined_dataset_prediction = model.predict(sequences) |
|
combined_dataset_prediction_inverse = scaling_predictions( |
|
scaler, combined_dataset_prediction) |
|
|
|
lstm_pred_df = storing_predictions( |
|
temp_df, dates, stock, combined_dataset_prediction_inverse) |
|
news_df = scrape_news(ticker_name=ticker_name) |
|
|
|
combined_with_news_df = add_recent_news(lstm_pred_df, news_df) |
|
sentiment_df = news_sentiment(combined_with_news_df) |
|
|
|
sentiment_df['time_idx'] = range(1, len(sentiment_df) + 1) |
|
|
|
predicted_values = get_tft_predictions(sentiment_df) |
|
|
|
final_pred_open_price = predicted_values[0].item() |
|
final_pred_closing_price = predicted_values[1].item() |
|
|
|
return {"open": final_pred_open_price, 'close': final_pred_closing_price} |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|