from fastapi import FastAPI import pandas as pd import numpy as np import matplotlib.pyplot as plt # import seaborn as sns import yfinance as yf import datetime as dt import xgboost as xgb from sklearn.metrics import mean_squared_error app = FastAPI() @app.get("/") def read_root(): return { "message": "Hello, Please type a ticker at the end of the URL to get LAST TRADING HOUR FORCAST.", "format": "https://yaakovy-lasthourforcast.hf.space/ticker/[TICKER]", "example": "https://yaakovy-lasthourforcast.hf.space/ticker/msft", } def get_data(ticker): # Define the ticker symbol tickerSymbol = ticker days_period = 300 # Get data on this ticker tickerData = yf.Ticker(tickerSymbol) start_date = dt.datetime.today() - dt.timedelta(days=days_period) end_date = dt.datetime.today() df_all = tickerData.history(start=start_date, end=end_date, interval="1h") df_all = df_all.drop(columns=["Dividends", "Stock Splits", "Volume"]) return df_all def get_last_date_missing_hours(df): # Assuming df is your DataFrame with the correct datetime index df.index = pd.to_datetime(df.index) # Ensure datetime format # Define the trading hours trading_start = "09:30:00" trading_end = "16:00:00" # Normalize the timezone if necessary, here assuming the data might be timezone aware df.index = df.index.tz_localize(None) # Find the latest date in your data latest_date = df.index.max().date() # Generate a full range of expected trading hours for the latest date, ensuring it's timezone-naive expected_hours = pd.date_range( start=f"{latest_date} {trading_start}", end=f"{latest_date} {trading_end}", freq="H", tz=None, ) # Extract actual timestamps for the latest date, also as timezone-naive actual_hours = df[df.index.date == latest_date].index.tz_localize(None) # Determine missing hours missing_hours = expected_hours.difference(actual_hours) # Add missing hours to the DataFrame as empty rows for hour in missing_hours: if hour not in df.index: df.loc[hour] = [pd.NA] * len(df.columns) # Initialize missing hours with NA # Sort the DataFrame after inserting new rows to maintain the chronological order df.sort_index(inplace=True) # forward filling # Ensure the index is in datetime format and normalized df.index = pd.to_datetime(df.index) df.index = df.index.tz_localize(None) # Find the latest date in your data latest_date = df.index.max().date() # Select only the data for the latest day latest_day_data = df[df.index.date == latest_date] # Perform forward filling on this latest day data latest_day_data_filled = latest_day_data.ffill() # Replace the original latest day data in the DataFrame with the filled data df.loc[df.index.date == latest_date] = latest_day_data_filled # Optionally, ensure the entire DataFrame is sorted by index df.sort_index(inplace=True) return df def prepare_df_for_model(df): df.index = pd.to_datetime(df.index) # Ensure the index is datetime # Extract date and time from the datetime index df["Date"] = df.index.date df["Time"] = df.index.time # Filter out data for hours from 09:30 to 14:30 and the target at 15:30 df_hours = df[ df["Time"].isin( [ pd.to_datetime("09:30:00").time(), pd.to_datetime("10:30:00").time(), pd.to_datetime("11:30:00").time(), pd.to_datetime("12:30:00").time(), pd.to_datetime("13:30:00").time(), pd.to_datetime("14:30:00").time(), ] ) ] df_target = df[df["Time"] == pd.to_datetime("15:30:00").time()][["Date", "Close"]] # Rename the target close column for clarity df_target.rename(columns={"Close": "Close_target"}, inplace=True) # Pivot the hours data to have one row per day with all the columns df_pivot = df_hours.pivot( index="Date", columns="Time", values=["Open", "High", "Low", "Close"] ) # Flatten the columns after pivoting and create a multi-level index df_pivot.columns = [ "{}_{}".format(feature, time.strftime("%H:%M")) for feature, time in df_pivot.columns ] # Join the pivot table with the target data df_final = df_pivot.join(df_target.set_index("Date")) # Convert the index back to datetime if it got changed to object type df_final.index = pd.to_datetime(df_final.index) df = df_final.dropna() return df def high_low_columns(df_final): # Extract columns for 'High' and 'Low' values high_columns = [col for col in df_final.columns if "High_" in col] low_columns = [col for col in df_final.columns if "Low_" in col] # Calculate 'max high' and 'min low' for each day df_final["MAX_high"] = df_final[high_columns].max(axis=1) df_final["MIN_low"] = df_final[low_columns].min(axis=1) return df_final def calc_percentage_change(df): # Convert index to datetime if necessary (if not already done) df.index = pd.to_datetime(df.index) # Calculate the percentage change relative to 'Open_09:30' for each column for column in df.columns: if column != "Open_09:30": df[column] = (df[column] - df["Open_09:30"]) / df["Open_09:30"] * 100 return df def create_features(df): """ Create time series features based on time series index. """ df = df.copy() df["dayofweek"] = df.index.dayofweek df["quarter"] = df.index.quarter df["month"] = df.index.month df["year"] = df.index.year df["dayofyear"] = df.index.dayofyear df["dayofmonth"] = df.index.day df["weekofyear"] = df.index.isocalendar().week df["weekofyear"] = df["weekofyear"].astype("Int32") return df def train_test_split(df): df.index = pd.to_datetime(df.index) # Define the number of test instances (e.g., last 30 days) num_test = 30 # Split data into features and target X = df.drop(columns=["Close_target"]) y = df["Close_target"] # Split the data into training and testing sets X_train, y_train = X[:-num_test], y[:-num_test] X_test, y_test = X[-num_test:], y[-num_test:] # Train indices are earlier, and test indices include the last date train_indices = df.index < df.index[-num_test] test_indices = df.index >= df.index[-num_test] return X_train, y_train, X_test, y_test def run_xgboost(df): X_train, y_train, X_test, y_test = train_test_split(df) # Define the model model = xgb.XGBRegressor( n_estimators=100, learning_rate=0.1, max_depth=3, subsample=0.8, colsample_bytree=0.8, objective="reg:squarederror", ) # Train the model with evaluation model.fit( X_train, y_train, eval_metric="rmse", eval_set=[(X_train, y_train), (X_test, y_test)], verbose=True, early_stopping_rounds=10, ) # Making predictions predictions = model.predict(X_test) # Prediction for the latest date latest_prediction = predictions[-1] # Calculate and print RMSE for the test set rmse = np.sqrt(mean_squared_error(y_test, predictions)) np_float = np.float32(rmse) return {"latest_prediction": latest_prediction, "RMSE": float(np_float)} @app.get("/ticker/{ticker}") def prcess_ticker(ticker: str): df = get_data(ticker) df = get_last_date_missing_hours(df) df = prepare_df_for_model(df) df = high_low_columns(df) df = calc_percentage_change(df) df = create_features(df) result = run_xgboost(df) return result