Spaces:
Runtime error
Runtime error
| from asyncio.constants import LOG_THRESHOLD_FOR_CONNLOST_WRITES | |
| import yfinance as yf | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.graph_objs as go | |
| from stocks import * | |
| from transformers import AutoModelForSequenceClassification, pipeline, AutoTokenizer | |
| import os | |
| from random import random | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
| import tensorflow as tf | |
| import math | |
| import datetime | |
| import random | |
| import time | |
| #import kaleido | |
| from sklearn.preprocessing import MinMaxScaler | |
| import matplotlib.pyplot as plt | |
| #import warnings | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from keras.layers import Dropout, Activation | |
| from keras import layers | |
| from keras.callbacks import EarlyStopping | |
| from sklearn.metrics import r2_score | |
| import plotly.graph_objs as go | |
| import plotly.io as pio | |
| pio.templates | |
| model = AutoModelForSequenceClassification.from_pretrained("fine_tuned_FinBERT", from_tf=False, config="config.json") | |
| tokenizer = AutoTokenizer.from_pretrained("fine_tuned_FinBERT/tokenizer/") | |
| class Models(object): | |
| def __init__(self): | |
| self.stock_data = Stock_Data() | |
| def bollinger_bands_20d_2std(self, ticker): | |
| ''' | |
| This method calculates the Bollinger Bands with a Rolling average of the last 20 days and 2 standard deviations. In a plot, | |
| this would be represented as 3 lines: a rolling average, an upper bound (rolling average + 2 standard deviations) and a lower | |
| bound (rolling average - 2 standard deviations). When the price of a stock is between the rolling average and lower bound, it is | |
| considered as oversold, so it makes sense to buy, if it is between the roll. avg. and the upper bound, it is considered as | |
| overbought, so it makes sense to sell, if it is equal to the roll.avg. it is neutral and if it is outside the bounds, it is | |
| considered an Unusual Event. The function returns the outlook of the stock (either "Buy", or "Sell" or "Hold" or "Unusual Event") | |
| ''' | |
| if self.stock_data.status_getter(ticker) != "Open": | |
| return "Market Closed" | |
| else: | |
| data = self.stock_data.stock_data_getter(ticker) | |
| low_high_closing_df = pd.DataFrame(data) | |
| low_high_closing_df = data.iloc[:, 4:5] # Getting only the "Adj Close" column | |
| low_high_closing_df = low_high_closing_df.tail(40) # Getting the last 40 days | |
| low_high_closing_df["rolling_avg_20d"] = low_high_closing_df['Adj Close'].rolling(20, min_periods = 20).mean() | |
| low_high_closing_df["sd"] = low_high_closing_df["Adj Close"].rolling(20, min_periods = 20).std() | |
| low_high_closing_df = low_high_closing_df.tail(20) # Keeping the last 20 days only | |
| recent_data = low_high_closing_df.iloc[-1, :].to_list() # Creating a Series object with the most recent data (last row only) | |
| upper_bound = recent_data[1] + 2*recent_data[2] # Upper Bound | |
| lower_bound = recent_data[1] - 2*recent_data[2] # Lower Bound | |
| mean_20d = recent_data[1] # Rolling average of last 20 days | |
| if self.stock_data.current_price_getter(ticker) is None: | |
| return "Market Closed" | |
| else: | |
| message = "" | |
| if self.stock_data.current_price_getter(ticker) < mean_20d and self.stock_data.current_price_getter(ticker) >= lower_bound: | |
| message = "Buy" | |
| elif self.stock_data.current_price_getter(ticker) > mean_20d and self.stock_data.current_price_getter(ticker) <= upper_bound: | |
| message = "Sell" | |
| elif self.stock_data.current_price_getter(ticker) == mean_20d: | |
| message = "Hold" | |
| elif self.stock_data.current_price_getter(ticker) <= lower_bound or self.stock_data.current_price_getter(ticker) >= upper_bound: | |
| message = "Unusual Event" | |
| return message | |
| def bollinger_bands_10d_1point5std(self, ticker): | |
| ''' | |
| This method calculates the Bollinger Bands with a Rolling average of the last 10 days and 1.5 standard deviations. In a plot, | |
| this would be represented as 3 lines: a rolling average, an upper bound (rolling average + 1.5 standard deviations) and a lower | |
| bound (rolling average - 1.5 standard deviations). When the price of a stock is between the rolling average and lower bound, it is | |
| considered as oversold, so it makes sense to buy, if it is between the roll. avg. and the upper bound, it is considered as | |
| overbought, so it makes sense to sell, if it is equal to the roll.avg. it is neutral and if it is outside the bounds, it is | |
| considered an Unusual Event. The function returns the outlook of the stock (either "Buy", or "Sell" or "Hold" or "Unusual Event") | |
| ''' | |
| if self.stock_data.status_getter(ticker) != "Open": | |
| return "Market Closed" | |
| else: | |
| data = self.stock_data.stock_data_getter(ticker) | |
| low_high_closing_df = pd.DataFrame(data) | |
| low_high_closing_df = data.iloc[:, 4:5] # Getting only the "Adj Close" column | |
| low_high_closing_df = low_high_closing_df.tail(20) # Getting the last 20 days | |
| low_high_closing_df["rolling_avg_10d"] = low_high_closing_df['Adj Close'].rolling(10, min_periods = 10).mean() | |
| low_high_closing_df["sd"] = low_high_closing_df["Adj Close"].rolling(10, min_periods = 10).std() | |
| low_high_closing_df = low_high_closing_df.tail(10) # Keeping the last 10 days only | |
| recent_data = low_high_closing_df.iloc[-1, :].to_list() # Creating a Series object with the most recent data (last row only) | |
| upper_bound = recent_data[1] + 1.5*recent_data[2] # Upper Bound | |
| lower_bound = recent_data[1] - 1.5*recent_data[2] # Lower Bound | |
| mean_10d = recent_data[1] # Rolling average of last 10 days | |
| if self.stock_data.current_price_getter(ticker) is None: | |
| return "Market Closed" | |
| else: | |
| message = "" | |
| if self.stock_data.current_price_getter(ticker) < mean_10d and self.stock_data.current_price_getter(ticker) >= lower_bound: | |
| message = "Buy" | |
| elif self.stock_data.current_price_getter(ticker) > mean_10d and self.stock_data.current_price_getter(ticker) <= upper_bound: | |
| message = "Sell" | |
| elif self.stock_data.current_price_getter(ticker) == mean_10d: | |
| message = "Hold" | |
| elif self.stock_data.current_price_getter(ticker) <= lower_bound or self.stock_data.current_price_getter(ticker) >= upper_bound: | |
| message = "Unusual Event" | |
| return message | |
| def bollinger_bands_50d_3std(self, ticker): | |
| ''' | |
| This method calculates the Bollinger Bands with a Rolling average of the last 50 days and 3 standard deviations. In a plot, | |
| this would be represented as 3 lines: a rolling average, an upper bound (rolling average + 3 standard deviations) and a lower | |
| bound (rolling average - 3 standard deviations). When the price of a stock is between the rolling average and lower bound, it is | |
| considered as oversold, so it makes sense to buy, if it is between the roll. avg. and the upper bound, it is considered as | |
| overbought, so it makes sense to sell, if it is equal to the roll.avg. it is neutral and if it is outside the bounds, it is | |
| considered an Unusual Event. The function returns the outlook of the stock (either "Buy", or "Sell" or "Hold" or "Unusual Event") | |
| ''' | |
| if self.stock_data.status_getter(ticker) != "Open": | |
| return "Market Closed" | |
| else: | |
| data = self.stock_data.stock_data_getter(ticker) | |
| low_high_closing_df = pd.DataFrame(data) | |
| low_high_closing_df = data.iloc[:, 4:5] # Getting only the "Adj Close" column | |
| low_high_closing_df = low_high_closing_df.tail(100) # Getting the last 100 days | |
| low_high_closing_df["rolling_avg_50d"] = low_high_closing_df['Adj Close'].rolling(50, min_periods = 50).mean() | |
| low_high_closing_df["sd"] = low_high_closing_df["Adj Close"].rolling(50, min_periods = 50).std() | |
| low_high_closing_df = low_high_closing_df.tail(50) # Keeping the last 50 days only | |
| recent_data = low_high_closing_df.iloc[-1, :].to_list() # Creating a Series object with the most recent data (last row only) | |
| upper_bound = recent_data[1] + 3*recent_data[2] # Upper Bound | |
| lower_bound = recent_data[1] - 3*recent_data[2] # Lower Bound | |
| mean_50d = recent_data[1] # Rolling average of last 50 days | |
| # Finding the outlook dependent on the current price | |
| if self.stock_data.current_price_getter(ticker) is None: | |
| return "Market Closed" | |
| else: | |
| message = "" | |
| if self.stock_data.current_price_getter(ticker) < mean_50d and self.stock_data.current_price_getter(ticker) >= lower_bound: | |
| message = "Buy" | |
| elif self.stock_data.current_price_getter(ticker) > mean_50d and self.stock_data.current_price_getter(ticker) <= upper_bound: | |
| message = "Sell" | |
| elif self.stock_data.current_price_getter(ticker) == mean_50d: | |
| message = "Hold" | |
| elif self.stock_data.current_price_getter(ticker) <= lower_bound or self.stock_data.current_price_getter(ticker) >= upper_bound: | |
| message = "Unusual Event" | |
| return message | |
| def MACD(self, ticker): | |
| ''' | |
| This method calculates the MACD (Mean Average Convergence Divergence) for a stock. The decision of whether to buy or sell | |
| a stock when using this method, depends on the difference of two "lines". The 1st one is called "MACD" and is equal to the | |
| difference between the Exponential Moving Average of the adjusted closing price of the last 12 days, and the Moving Average | |
| of the adjusted closing price of the last 26 days. The 2nd line is the 9 day moving average of the adj. closing price. | |
| When MACD > 9 day M.A. it is considered that there is an uptrend, else, an downtrend. | |
| At last, when MACD line crosses the 9 day M.A. from "above", a "Sell" signal is given, | |
| while it crosses it from below, a "Buy" signal is given. | |
| ''' | |
| if self.stock_data.status_getter(ticker) != "Open": | |
| return "Market Closed" | |
| else: | |
| data = self.stock_data.stock_data_getter(ticker) | |
| low_high_closing_df = pd.DataFrame(data) | |
| low_high_closing_df = data.iloc[:, 4:5] # Getting only the "Adj Close" column | |
| low_high_closing_df = low_high_closing_df.tail(52) # Getting the last 52 days | |
| # Get the 12-day EMA of the closing price | |
| low_high_closing_df['EMA_12d'] = low_high_closing_df['Adj Close'].ewm(span=12, adjust=False, min_periods=12).mean() | |
| # Get the 26-day MA of the closing price | |
| low_high_closing_df['MA_26d'] = low_high_closing_df['Adj Close'].ewm(span=26, adjust=False, min_periods=26).mean() | |
| # Subtract the 26-day EMA from the 12-Day EMA to get the MACD | |
| low_high_closing_df['MACD'] = low_high_closing_df['EMA_12d'] - low_high_closing_df['MA_26d'] | |
| # Making the signal line | |
| low_high_closing_df['MA_9d'] = low_high_closing_df['MACD'].ewm(span=9, adjust=False, min_periods=9).mean() | |
| low_high_closing_df['Diff'] = low_high_closing_df['MACD'] - low_high_closing_df['MA_9d'] | |
| Diff = low_high_closing_df['Diff'].astype(float) | |
| if self.stock_data.current_price_getter(ticker) is None: | |
| return "Market Closed" | |
| else: | |
| message = "" | |
| if Diff.iloc[-1] < 0: | |
| if Diff.iloc[-2] >= 0: | |
| message = "Downtrend and sell signal" | |
| else: | |
| message = "Downtrend and no signal" | |
| else: | |
| if Diff.iloc[-2] <= 0: | |
| message = "Uptrend and buy signal" | |
| else: | |
| message = "Uptrend and no signal" | |
| return message | |
| def finbert_headlines_sentiment(self, ticker): | |
| ''' | |
| This method uses a the "weights" and the "tokenizer" of a fine-tuned Fin-BERT model, which is a BERT model that | |
| was furtherly trained on financial data. The "article_parser()" method scraps www.marketwatch.com and returns the | |
| last 17 headers of the chosen stock's articles. The, the FinBERT model classifies each one of them as either "Positive" | |
| or "Negative" or "Neutral", and a score is assigned to each header (+100, -100, and 0) correspondingly. At last, a | |
| rolling average of window size = 5 is used to "smooth" the sentiment line of the "plotly" plot that is returned. | |
| ''' | |
| articles_df = self.stock_data.article_parser(ticker) | |
| articles_list = articles_df["headline"].tolist() | |
| clf = pipeline("text-classification", model=model, tokenizer=tokenizer) | |
| outputs_list = clf(articles_list) | |
| sentiments = [] | |
| for item in outputs_list: | |
| sentiments.append(item["label"]) | |
| sentiments_df = pd.DataFrame(sentiments) | |
| sentiments_df.rename(columns = {0:'sentiment'}, inplace = True) | |
| sentiments_df["sentiment"] = sentiments_df["sentiment"].apply(lambda x: 100 if x == "positive" else -100 if x=="negative" else 0) | |
| sentiments_df["roll_avg"] = round(sentiments_df["sentiment"].rolling(5, min_periods = 1).mean(), 2) | |
| sentiments_df = sentiments_df.tail(12).reset_index() | |
| pd.options.plotting.backend = "plotly" | |
| fig = sentiments_df["roll_avg"].plot(title="Sentiment Analysis of the last 12 www.marketwatch.com articles about " + ticker, | |
| template="plotly_dark", | |
| labels=dict(index="12 most recent article headlines", value="sentiment score (rolling avg. of window size 5)")) | |
| fig.update_traces(line=dict(color="#3D9140", width=3)) | |
| fig.update_layout(yaxis_range=[-100,100]) | |
| fig.update_layout(xaxis_range=[0,12]) | |
| fig.update_layout(showlegend=False) | |
| fig.add_hline(y=0, line_width=1.5, line_color="black") | |
| current_sentiment = sentiments_df["roll_avg"].tail(1).values[0] | |
| return {'fig': fig, 'current_sentiment': current_sentiment} | |
| def LSTM_7_days_price_predictor(self, ticker): | |
| ''' | |
| This method predicts the price of a chosen stock for the next 7 days as of today, by using the daily adjusted closing | |
| prices for the last 2 years. At first, a 60-day window of historical prices (i-60) is created as our feature data (x_train) | |
| and the following 60-days window as label data (y_train). For every stock available, we have manually defined different | |
| parameters so that they fit as good as it gets to the model. Finally we combute the R2 metric and make the predictions. At | |
| last, we proceed with the predictions. The model looks back in our data (60 days back) and predicta for the following 7 days. | |
| ''' | |
| stock_data = self.stock_data.LSTM_stock_data_getter(ticker) | |
| stock_data=pd.DataFrame(data=stock_data).drop(['Open','High','Low','Close', 'Volume'],axis=1).reset_index() | |
| stock_data['Date'] = pd.to_datetime(stock_data['Date']) | |
| stock_data=stock_data.dropna() | |
| # Data Preprocessing | |
| random.seed(1997) | |
| close_prices = stock_data['Adj Close'] | |
| values = close_prices.values | |
| training_data_len = math.ceil(len(values)* 0.8) | |
| scaler = MinMaxScaler(feature_range=(0,1)) | |
| scaled_data = scaler.fit_transform(values.reshape(-1,1)) | |
| train_data = scaled_data[0: training_data_len, :] | |
| x_train = [] | |
| y_train = [] | |
| for i in range(60, len(train_data)): | |
| x_train.append(train_data[i-60:i, 0]) | |
| y_train.append(train_data[i, 0]) | |
| x_train, y_train = np.array(x_train), np.array(y_train) | |
| x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1)) | |
| # Preparation of test set | |
| test_data = scaled_data[training_data_len-60: , : ] | |
| x_test = [] | |
| y_test = values[training_data_len:] | |
| for i in range(60, len(test_data)): | |
| x_test.append(test_data[i-60:i, 0]) | |
| x_test = np.array(x_test) | |
| x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1)) | |
| ##### Setting Up LSTM Network Architecture and the Training of the LSTM Model | |
| def LSTM_trainer(seed, DROPOUT, LSTM_units,patience,batch_size, epochs): | |
| tf.random.set_seed(seed) | |
| DROPOUT = DROPOUT | |
| global model_lstm | |
| model_lstm = keras.Sequential() | |
| model_lstm.add(layers.LSTM(LSTM_units, return_sequences=True, input_shape=(x_train.shape[1], 1))) | |
| model_lstm.add(Dropout(rate=DROPOUT)) | |
| model_lstm.add(layers.LSTM(LSTM_units, return_sequences=False)) | |
| model_lstm.add(Dropout(rate=DROPOUT)) | |
| model_lstm.add(layers.Dense(25)) | |
| model_lstm.add(Dropout(rate=DROPOUT)) | |
| model_lstm.add(layers.Dense(1)) | |
| model_lstm.add(Activation('linear')) | |
| print('\n') | |
| print("Compiling the LSTM Model for the " + str(ticker) + " stock....\n") | |
| t0 = time.time() | |
| model_lstm.compile(optimizer='adam', loss='mean_squared_error',metrics=['mae']) | |
| callback=EarlyStopping(monitor='val_loss', | |
| min_delta=0, | |
| patience=patience, | |
| verbose=1, mode='auto') | |
| model_lstm.fit(x_train, | |
| y_train, | |
| batch_size= batch_size, | |
| epochs=epochs, | |
| validation_split=0.1,# ...holding out 10% of the data for validation | |
| shuffle=True,verbose=0,callbacks=[callback]) | |
| t1 = time.time() | |
| global ex_time | |
| ex_time = round(t1-t0, 2) | |
| print("Compiling took :",ex_time,"seconds") | |
| predictions = model_lstm.predict(x_test) | |
| predictions = scaler.inverse_transform(predictions) | |
| #rmse = np.sqrt(np.mean(((predictions - y_test) ** 2))) | |
| global r_squared_score | |
| global rmse | |
| r_squared_score = round(r2_score(y_test, predictions),2) | |
| rmse = np.sqrt(np.mean(((predictions - y_test) ** 2))) | |
| #print('Rmse Score: ', round(rmse),2) | |
| print('R2 Score: ', r_squared_score) | |
| if ticker == 'AAPL': | |
| LSTM_trainer(1, 0.2, 100,2, 20, 30) | |
| elif ticker == 'NVDA': | |
| LSTM_trainer(2, 0.2, 100,2, 30, 50) | |
| elif ticker == 'PYPL': | |
| LSTM_trainer(6, 0.2, 100,10,25, 30) | |
| elif ticker == 'MSFT': | |
| LSTM_trainer(4, 0.1, 80, 2,20, 40) | |
| elif ticker == 'TSLA': | |
| LSTM_trainer(5, 0.1, 120, 4,20, 25) | |
| elif ticker == 'AMZN': | |
| LSTM_trainer(6, 0.1, 120,2, 20, 25) | |
| elif ticker == 'SPOT': | |
| LSTM_trainer(9, 0.2, 200,5, 20, 40) | |
| #elif ticker == 'TWTR' : | |
| # LSTM_trainer(15, 0.2, 100,4,20, 40) | |
| elif ticker == 'UBER': | |
| LSTM_trainer(15, 0.2, 100,7,20, 40) | |
| elif ticker == 'IN:533096': | |
| LSTM_trainer(15, 0.2, 120,8,20, 40) | |
| elif ticker == 'GOOG': | |
| LSTM_trainer(15, 0.2, 100,3,20, 25) | |
| # Unseen Predictions for the next 7 days | |
| close_data = scaled_data | |
| look_back = 60 | |
| def predict(num_prediction, model): | |
| prediction_list = close_data[-look_back:] | |
| for _ in range(num_prediction): | |
| x = prediction_list[-look_back:] | |
| x = x.reshape((1, look_back, 1)) | |
| out = model.predict(x)[0][0] | |
| prediction_list = np.append(prediction_list, out) | |
| prediction_list = prediction_list[look_back-1:] | |
| return prediction_list | |
| def predict_dates(num_prediction): | |
| last_date = stock_data['Date'].values[-1] | |
| prediction_dates = pd.date_range(last_date, periods=num_prediction+1).tolist() | |
| return prediction_dates | |
| num_prediction = 7 | |
| forecast = predict(num_prediction, model_lstm) | |
| forecast_dates = predict_dates(num_prediction) | |
| plt.figure(figsize=(25,10)) | |
| forecast = forecast.reshape(-1, 1) | |
| forecast_inverse = scaler.inverse_transform(forecast) | |
| # Ploting the Actual Prices and the Predictions of them for the next 7 days | |
| base = stock_data['Date'].iloc[[-1]] # Here we create our base date (the last existing date with actual prices) | |
| testdata = pd.DataFrame(forecast_inverse)# Here we create a data frame that contains the prediction prices and an empty column for their dates | |
| testdata['Date'] = "" | |
| testdata.columns = ["Adj Close","Date"] | |
| testdata = testdata.iloc[1:,:] | |
| testdata["Label"] = "" # Let's add a column "Label" that would show if the respective price is a prediction or not | |
| testdata["Label"] = "Prediction" | |
| testdata = testdata[["Date", "Adj Close", "Label"]] | |
| date_list = [base + datetime.timedelta(days=x+1) for x in range(testdata.shape[0]+1)] | |
| date_list = pd.DataFrame(date_list) | |
| date_list.columns = ["Date"] | |
| date_list.reset_index(inplace = True) | |
| date_list.drop(["index"], axis = 1, inplace = True) | |
| date_list.index = date_list.index + 1 | |
| testdata.Date = date_list | |
| stock_data["Label"] = "" | |
| stock_data["Label"] = "Actual price" | |
| finaldf = pd.concat([stock_data,testdata], axis=0) # Here we concatenate the "testdata" and the original data frame "df" into a final one | |
| finaldf.reset_index(inplace = True) | |
| finaldf.drop(["index"], axis = 1, inplace = True) | |
| finaldf['Date'] = pd.to_datetime(finaldf['Date']) | |
| plt.rcParams["figure.figsize"] = (25,10) | |
| #We create two different data frames, one that contains the actual prices and one that has only the predictions | |
| finaldfPredictions = finaldf.iloc[-8:] | |
| finaldfActuals = finaldf.iloc[:-7] | |
| plot_1 = go.Scatter( | |
| x = finaldfActuals['Date'], | |
| y = finaldfActuals['Adj Close'], | |
| mode = 'lines', | |
| name = 'Historical Data (2 years)', | |
| line=dict(width=1,color='#3D9140')) | |
| plot_2 = go.Scatter( | |
| x = finaldfPredictions['Date'], | |
| y = finaldfPredictions['Adj Close'], | |
| mode = 'lines', | |
| name = '7-day Prediction', | |
| line=dict(width=1,color="#EE3B3B")) | |
| plot_3 = go.Scatter( | |
| x = finaldfPredictions['Date'][:1], | |
| y = finaldfPredictions['Adj Close'][:1], | |
| mode = 'markers', | |
| name = 'Latest Actual Closing Price', | |
| line=dict(width=1)) | |
| layout = go.Layout( | |
| title = 'Next 7 days stock price prediction of ' + str(ticker), | |
| xaxis = {'title' : "Date"}, | |
| yaxis = {'title' : "Price ($)"} | |
| ) | |
| fig = go.Figure(data=[plot_1, plot_2,plot_3], layout=layout) | |
| fig.update_layout(template='plotly_dark',autosize=True) | |
| fig.update_layout(legend=dict( | |
| orientation="h", | |
| yanchor="bottom", | |
| y=1.02, | |
| xanchor="right", | |
| x=1), | |
| annotations = [dict(x=0.5, | |
| y=0, | |
| xref='paper', | |
| yref='paper', | |
| text="Current In Sample R- Squared : " + str(r_squared_score*100) + " % \n", | |
| showarrow = False)], | |
| xaxis=dict(showgrid=False), | |
| yaxis=dict(showgrid=False) | |
| ) | |
| fig.add_annotation(x=0.5, | |
| y=0.05, | |
| xref='paper', | |
| yref='paper', | |
| text="Current In Sample Root Mean Square Error : " + str(round(rmse,2)) + " % ", | |
| showarrow=False) | |
| return fig | |