test / api.py
Aviral Jain
Update api.py
d2c1dba verified
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
import numpy as np
import tensorflow as tf
from yahoo_fin.stock_info import get_data
from sklearn.preprocessing import MinMaxScaler
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from pytorch_forecasting import TemporalFusionTransformer
from bs4 import BeautifulSoup
import requests
from dotenv import load_dotenv
import os
from fastapi.middleware.cors import CORSMiddleware
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
MODEL_PATH = "lib/20_lstm_model.h5"
model = tf.keras.models.load_model(MODEL_PATH)
model_name_news = "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis"
tokenizer = AutoTokenizer.from_pretrained(model_name_news)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(
model_name_news).to(device)
best_model_path = 'lib/tft_pred.ckpt'
best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path).to(device)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
class TickerRequest(BaseModel):
ticker: str
start_date: str
end_date: str
interval: str = "1d"
def fetch_and_process_ticker_data(ticker, start_date, end_date, interval="1d"):
df = pd.DataFrame()
try:
temp = get_data(ticker, start_date=start_date,
end_date=end_date, index_as_date=True, interval=interval)
temp = temp.drop(columns="close")
temp["revenue"] = temp["adjclose"] * temp["volume"]
temp["daily_profit"] = temp["adjclose"] - temp["open"]
df = pd.concat([df, temp], axis=0)
except Exception as error:
raise HTTPException(
status_code=500, detail=f"Error processing ticker {ticker}: {error}")
return df
def ticker_encoded(df):
label_map = {'ATOM': 0, 'HBIO': 1, 'IBEX': 2, 'MYFW': 3, 'NATH': 4}
ticker_encoded = []
for i in df.iloc():
ticker_name = i['ticker']
encoded_ticker = label_map[ticker_name]
ticker_encoded.append(encoded_ticker)
df['ticker_encoded'] = ticker_encoded
return df
def normalize(df):
price_scaler = MinMaxScaler()
volume_revenue_scaler = MinMaxScaler()
profit_scaler = MinMaxScaler()
df[["open", "high", "low", "adjclose"]] = price_scaler.fit_transform(
df[["open", "high", "low", "adjclose"]])
df[["volume", "revenue"]] = volume_revenue_scaler.fit_transform(
df[["volume", "revenue"]])
df[["daily_profit"]] = profit_scaler.fit_transform(df[["daily_profit"]])
return df, price_scaler
def create_sequence(dataset):
sequences = []
labels = []
dates = []
stock = []
df_copy = dataset.drop(columns=["date"])
start_idx = 0
for stop_idx in range(20, len(dataset)):
set_ = set(dataset.iloc[start_idx:stop_idx]["ticker_encoded"].values)
target_day_ticker_class = dataset.iloc[stop_idx]["ticker_encoded"]
if len(set_) == 1 and target_day_ticker_class == list(set_)[0]:
sequences.append(df_copy.iloc[start_idx:stop_idx].values)
labels.append(df_copy.iloc[stop_idx][["open", "adjclose"]])
date_string = dataset.iloc[stop_idx]["date"].strftime('%Y-%m-%d')
dates.append(date_string)
stock.append(str(dataset.iloc[stop_idx]["ticker_encoded"]))
start_idx += 1
return np.array(sequences), np.array(labels), dates, stock
def scaling_predictions(price_scaler, combined_dataset_prediction):
price_scaler.min_ = np.array([price_scaler.min_[0], price_scaler.min_[3]])
price_scaler.scale_ = np.array(
[price_scaler.scale_[0], price_scaler.scale_[3]])
combined_dataset_prediction_inverse = price_scaler.inverse_transform(
combined_dataset_prediction)
return combined_dataset_prediction_inverse
def storing_predictions(df, dates, stock, combined_dataset_prediction_inverse):
df['pred_open'] = np.nan
df['pred_closing'] = np.nan
for idx, row in df.iterrows():
current_row_date = row.date.strftime('%Y-%m-%d')
current_row_ticker = str(row.ticker_encoded)
for i in range(len(dates)):
if current_row_date == dates[i] and stock[i] == current_row_ticker:
opening_price = combined_dataset_prediction_inverse[i][0]
closing_price = combined_dataset_prediction_inverse[i][1]
df.loc[idx, 'pred_open'] = opening_price
df.loc[idx, 'pred_closing'] = closing_price
break
df = df.dropna(subset=['pred_open', 'pred_closing']).reset_index(drop=True)
return df
def scrape_news(ticker_name):
columns = ['datatime', 'title', 'source',
'link', 'top_sentiment', 'sentiment_score']
df = pd.DataFrame(columns=columns)
for i in range(1, 3):
url = f'https://markets.businessinsider.com/news/{ticker_name}-stock?p={i}'
response = requests.get(url)
html = response.text
soup = BeautifulSoup(html, 'lxml')
articles = soup.find_all('div', class_='latest-news__story')
for article in articles:
datatime = article.find(
'time', class_='latest-news__date').get('datetime')
title = article.find('a', class_='news-link').text
source = article.find('span', class_='latest-news__source').text
link = article.find('a', class_='news-link').get('href')
top_sentiment = ''
sentiment_score = 0
temp = pd.DataFrame(
[[datatime, title, source, link, top_sentiment, sentiment_score]], columns=df.columns)
df = pd.concat([temp, df], axis=0)
return df
def add_recent_news(main_df, news_df, lookback_days=10):
news_df.drop(columns=['top_sentiment', 'sentiment_score'], inplace=True)
main_df['date'] = pd.to_datetime(main_df['date'])
news_df['datatime'] = pd.to_datetime(news_df['datatime'])
news_list = []
last_available_news = ''
for _, row in main_df.iterrows():
current_date = row['date']
current_ticker = row['ticker']
news_articles = ''
for _, news_row in news_df.iterrows():
extracted_date = news_row['datatime']
if (current_date - extracted_date).days <= lookback_days and extracted_date < current_date:
news_articles += news_row['title'] + " "
if not news_articles.strip():
for _, news_row in news_df[::-1].iterrows():
if news_row['datatime'] < current_date:
news_articles = news_row['title']
break
last_available_news = news_articles.strip() or last_available_news
news_list.append(last_available_news)
main_df['news'] = news_list
return main_df
def news_sentiment(df):
news_column_name = 'news'
texts = df[news_column_name].tolist()
inputs = tokenizer(texts, padding=True,
truncation=True, return_tensors="pt")
inputs = {key: val.to(device) for key, val in inputs.items()}
with torch.no_grad():
outputs = sentiment_model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=-1)
labels = ["negative", "neutral", "positive"]
predictions = torch.argmax(probs, dim=-1)
df['predicted_sentiment'] = pd.Series(
[labels[pred] for pred in predictions], index=df[df[news_column_name].notna()].index)
sentiment_map = {
'positive': 1,
'neutral': 0,
'negative': -1
}
df['sentiment_score'] = df['predicted_sentiment'].map(sentiment_map)
df = df.drop(columns=['news'])
return df
def get_tft_predictions(df):
for i in range(1, 21):
df[f'open_lag_{i}'] = df.groupby('ticker')['open'].shift(i)
df[f'adjclose_lag_{i}'] = df.groupby('ticker')['adjclose'].shift(i)
lag_columns = [f'open_lag_{i}' for i in range(
1, 21)] + [f'adjclose_lag_{i}' for i in range(1, 21)]
df.dropna(subset=lag_columns, inplace=True)
predictions = best_tft.predict(df.to(device), mode="quantiles")
return predictions
@app.get("/fetch-ticker-data/{ticker_name}/{start_date}/{end_date}/{interval}")
async def fetch_ticker_data(ticker_name: str, start_date: str, end_date: str, interval: str):
try:
result_df = fetch_and_process_ticker_data(
ticker=ticker_name,
start_date=start_date,
end_date=end_date,
interval=interval
)
return result_df.to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/predict-prices/{ticker_name}/{start_date}/{end_date}/{interval}")
async def predict_prices(ticker_name: str, start_date: str, end_date: str, interval: str):
try:
result_df = fetch_and_process_ticker_data(
ticker=ticker_name,
start_date=start_date,
end_date=end_date,
interval=interval
)
raw_data = result_df.tail(60)
raw_data = raw_data.reset_index()
raw_data.rename(columns={"index": "date"}, inplace=True)
raw_data = ticker_encoded(raw_data)
temp_df = raw_data.copy()
normalized_data, scaler = normalize(raw_data)
normalized_data = normalized_data.drop(columns=['ticker'])
sequences, _, dates, stock = create_sequence(normalized_data)
combined_dataset_prediction = model.predict(sequences)
combined_dataset_prediction_inverse = scaling_predictions(
scaler, combined_dataset_prediction)
lstm_pred_df = storing_predictions(
temp_df, dates, stock, combined_dataset_prediction_inverse)
news_df = scrape_news(ticker_name=ticker_name)
combined_with_news_df = add_recent_news(lstm_pred_df, news_df)
sentiment_df = news_sentiment(combined_with_news_df)
sentiment_df['time_idx'] = range(1, len(sentiment_df) + 1)
predicted_values = get_tft_predictions(sentiment_df)
final_pred_open_price = predicted_values[0].item()
final_pred_closing_price = predicted_values[1].item()
return {"open": final_pred_open_price, 'close': final_pred_closing_price}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))