Spaces:
Sleeping
Sleeping
from fastapi import FastAPI | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import json | |
# import seaborn as sns | |
import yfinance as yf | |
import datetime as dt | |
import xgboost as xgb | |
from sklearn.metrics import mean_squared_error | |
app = FastAPI() | |
def read_root(): | |
return { | |
"message": "Hello, Please type a ticker at the end of the URL to get LAST TRADING HOUR FORCAST.", | |
"format": "https://yaakovy-lasthourforcast.hf.space/ticker/[TICKER]", | |
"example": "https://yaakovy-lasthourforcast.hf.space/ticker/msft", | |
} | |
def get_data(ticker): | |
# Define the ticker symbol | |
tickerSymbol = ticker | |
days_period = 300 | |
# Get data on this ticker | |
tickerData = yf.Ticker(tickerSymbol) | |
start_date = dt.datetime.today() - dt.timedelta(days=days_period) | |
end_date = dt.datetime.today() | |
df_all = tickerData.history(start=start_date, end=end_date, interval="1h") | |
df_all = df_all.drop(columns=["Dividends", "Stock Splits", "Volume"]) | |
return df_all | |
def get_last_date_missing_hours(df): | |
# Assuming df is your DataFrame with the correct datetime index | |
df.index = pd.to_datetime(df.index) # Ensure datetime format | |
# Define the trading hours | |
trading_start = "09:30:00" | |
trading_end = "16:00:00" | |
# Normalize the timezone if necessary, here assuming the data might be timezone aware | |
df.index = df.index.tz_localize(None) | |
# Find the latest date in your data | |
latest_date = df.index.max().date() | |
# Generate a full range of expected trading hours for the latest date, ensuring it's timezone-naive | |
expected_hours = pd.date_range( | |
start=f"{latest_date} {trading_start}", | |
end=f"{latest_date} {trading_end}", | |
freq="H", | |
tz=None, | |
) | |
# Extract actual timestamps for the latest date, also as timezone-naive | |
actual_hours = df[df.index.date == latest_date].index.tz_localize(None) | |
# Determine missing hours | |
missing_hours = expected_hours.difference(actual_hours) | |
# Add missing hours to the DataFrame as empty rows | |
for hour in missing_hours: | |
if hour not in df.index: | |
df.loc[hour] = [pd.NA] * len(df.columns) # Initialize missing hours with NA | |
# Sort the DataFrame after inserting new rows to maintain the chronological order | |
df.sort_index(inplace=True) | |
# forward filling | |
# Ensure the index is in datetime format and normalized | |
df.index = pd.to_datetime(df.index) | |
df.index = df.index.tz_localize(None) | |
# Find the latest date in your data | |
latest_date = df.index.max().date() | |
# Select only the data for the latest day | |
latest_day_data = df[df.index.date == latest_date] | |
# Perform forward filling on this latest day data | |
latest_day_data_filled = latest_day_data.ffill() | |
# Replace the original latest day data in the DataFrame with the filled data | |
df.loc[df.index.date == latest_date] = latest_day_data_filled | |
# Optionally, ensure the entire DataFrame is sorted by index | |
df.sort_index(inplace=True) | |
return df | |
def prepare_df_for_model(df): | |
df.index = pd.to_datetime(df.index) # Ensure the index is datetime | |
# Extract date and time from the datetime index | |
df["Date"] = df.index.date | |
df["Time"] = df.index.time | |
# Filter out data for hours from 09:30 to 14:30 and the target at 15:30 | |
df_hours = df[ | |
df["Time"].isin( | |
[ | |
pd.to_datetime("09:30:00").time(), | |
pd.to_datetime("10:30:00").time(), | |
pd.to_datetime("11:30:00").time(), | |
pd.to_datetime("12:30:00").time(), | |
pd.to_datetime("13:30:00").time(), | |
pd.to_datetime("14:30:00").time(), | |
] | |
) | |
] | |
df_target = df[df["Time"] == pd.to_datetime("15:30:00").time()][["Date", "Close"]] | |
# Rename the target close column for clarity | |
df_target.rename(columns={"Close": "Close_target"}, inplace=True) | |
# Pivot the hours data to have one row per day with all the columns | |
df_pivot = df_hours.pivot( | |
index="Date", columns="Time", values=["Open", "High", "Low", "Close"] | |
) | |
# Flatten the columns after pivoting and create a multi-level index | |
df_pivot.columns = [ | |
"{}_{}".format(feature, time.strftime("%H:%M")) | |
for feature, time in df_pivot.columns | |
] | |
# Join the pivot table with the target data | |
df_final = df_pivot.join(df_target.set_index("Date")) | |
# Convert the index back to datetime if it got changed to object type | |
df_final.index = pd.to_datetime(df_final.index) | |
df = df_final.dropna() | |
return df | |
def high_low_columns(df_final): | |
# Extract columns for 'High' and 'Low' values | |
high_columns = [col for col in df_final.columns if "High_" in col] | |
low_columns = [col for col in df_final.columns if "Low_" in col] | |
# Calculate 'max high' and 'min low' for each day | |
df_final["MAX_high"] = df_final[high_columns].max(axis=1) | |
df_final["MIN_low"] = df_final[low_columns].min(axis=1) | |
return df_final | |
def calc_percentage_change(df): | |
# Convert index to datetime if necessary (if not already done) | |
df.index = pd.to_datetime(df.index) | |
# Calculate the percentage change relative to 'Open_09:30' for each column | |
for column in df.columns: | |
if column != "Open_09:30": | |
df[column] = (df[column] - df["Open_09:30"]) / df["Open_09:30"] * 100 | |
return df | |
def create_features(df): | |
""" | |
Create time series features based on time series index. | |
""" | |
df = df.copy() | |
df["dayofweek"] = df.index.dayofweek | |
df["quarter"] = df.index.quarter | |
df["month"] = df.index.month | |
df["year"] = df.index.year | |
df["dayofyear"] = df.index.dayofyear | |
df["dayofmonth"] = df.index.day | |
df["weekofyear"] = df.index.isocalendar().week | |
df["weekofyear"] = df["weekofyear"].astype("Int32") | |
return df | |
def train_test_split(df): | |
df.index = pd.to_datetime(df.index) | |
# Define the number of test instances (e.g., last 30 days) | |
num_test = 30 | |
# Split data into features and target | |
X = df.drop(columns=["Close_target"]) | |
y = df["Close_target"] | |
# Split the data into training and testing sets | |
X_train, y_train = X[:-num_test], y[:-num_test] | |
X_test, y_test = X[-num_test:], y[-num_test:] | |
# Train indices are earlier, and test indices include the last date | |
train_indices = df.index < df.index[-num_test] | |
test_indices = df.index >= df.index[-num_test] | |
return X_train, y_train, X_test, y_test | |
def run_xgboost(df): | |
X_train, y_train, X_test, y_test = train_test_split(df) | |
# Define the model | |
model = xgb.XGBRegressor( | |
n_estimators=100, | |
learning_rate=0.1, | |
max_depth=3, | |
subsample=0.8, | |
colsample_bytree=0.8, | |
objective="reg:squarederror", | |
) | |
# Train the model with evaluation | |
model.fit( | |
X_train, | |
y_train, | |
eval_metric="rmse", | |
eval_set=[(X_train, y_train), (X_test, y_test)], | |
verbose=True, | |
early_stopping_rounds=10, | |
) | |
# Making predictions | |
predictions = model.predict(X_test) | |
# Prediction for the latest date | |
latest_prediction = predictions[-1] | |
# Calculate and print RMSE for the test set | |
rmse = np.sqrt(mean_squared_error(y_test, predictions)) | |
np_float = np.float32(rmse) | |
return {"latest_prediction": latest_prediction, "RMSE": float(np_float)} | |
def prcess_ticker(ticker: str): | |
df = get_data(ticker) | |
df = get_last_date_missing_hours(df) | |
df = prepare_df_for_model(df) | |
df = high_low_columns(df) | |
df = calc_percentage_change(df) | |
df = create_features(df) | |
result = run_xgboost(df) | |
return json.dumps(result, cls=NumpyEncoder) | |