LastHourforcast / main.py
YaakovY's picture
4
455e772
from fastapi import FastAPI
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
# import seaborn as sns
import yfinance as yf
import datetime as dt
import xgboost as xgb
from sklearn.metrics import mean_squared_error
app = FastAPI()
@app.get("/")
def read_root():
return {
"message": "Hello, Please type a ticker at the end of the URL to get LAST TRADING HOUR FORCAST.",
"format": "https://yaakovy-lasthourforcast.hf.space/ticker/[TICKER]",
"example": "https://yaakovy-lasthourforcast.hf.space/ticker/msft",
}
def get_data(ticker):
# Define the ticker symbol
tickerSymbol = ticker
days_period = 300
# Get data on this ticker
tickerData = yf.Ticker(tickerSymbol)
start_date = dt.datetime.today() - dt.timedelta(days=days_period)
end_date = dt.datetime.today()
df_all = tickerData.history(start=start_date, end=end_date, interval="1h")
df_all = df_all.drop(columns=["Dividends", "Stock Splits", "Volume"])
return df_all
def get_last_date_missing_hours(df):
# Assuming df is your DataFrame with the correct datetime index
df.index = pd.to_datetime(df.index) # Ensure datetime format
# Define the trading hours
trading_start = "09:30:00"
trading_end = "16:00:00"
# Normalize the timezone if necessary, here assuming the data might be timezone aware
df.index = df.index.tz_localize(None)
# Find the latest date in your data
latest_date = df.index.max().date()
# Generate a full range of expected trading hours for the latest date, ensuring it's timezone-naive
expected_hours = pd.date_range(
start=f"{latest_date} {trading_start}",
end=f"{latest_date} {trading_end}",
freq="H",
tz=None,
)
# Extract actual timestamps for the latest date, also as timezone-naive
actual_hours = df[df.index.date == latest_date].index.tz_localize(None)
# Determine missing hours
missing_hours = expected_hours.difference(actual_hours)
# Add missing hours to the DataFrame as empty rows
for hour in missing_hours:
if hour not in df.index:
df.loc[hour] = [pd.NA] * len(df.columns) # Initialize missing hours with NA
# Sort the DataFrame after inserting new rows to maintain the chronological order
df.sort_index(inplace=True)
# forward filling
# Ensure the index is in datetime format and normalized
df.index = pd.to_datetime(df.index)
df.index = df.index.tz_localize(None)
# Find the latest date in your data
latest_date = df.index.max().date()
# Select only the data for the latest day
latest_day_data = df[df.index.date == latest_date]
# Perform forward filling on this latest day data
latest_day_data_filled = latest_day_data.ffill()
# Replace the original latest day data in the DataFrame with the filled data
df.loc[df.index.date == latest_date] = latest_day_data_filled
# Optionally, ensure the entire DataFrame is sorted by index
df.sort_index(inplace=True)
return df
def prepare_df_for_model(df):
df.index = pd.to_datetime(df.index) # Ensure the index is datetime
# Extract date and time from the datetime index
df["Date"] = df.index.date
df["Time"] = df.index.time
# Filter out data for hours from 09:30 to 14:30 and the target at 15:30
df_hours = df[
df["Time"].isin(
[
pd.to_datetime("09:30:00").time(),
pd.to_datetime("10:30:00").time(),
pd.to_datetime("11:30:00").time(),
pd.to_datetime("12:30:00").time(),
pd.to_datetime("13:30:00").time(),
pd.to_datetime("14:30:00").time(),
]
)
]
df_target = df[df["Time"] == pd.to_datetime("15:30:00").time()][["Date", "Close"]]
# Rename the target close column for clarity
df_target.rename(columns={"Close": "Close_target"}, inplace=True)
# Pivot the hours data to have one row per day with all the columns
df_pivot = df_hours.pivot(
index="Date", columns="Time", values=["Open", "High", "Low", "Close"]
)
# Flatten the columns after pivoting and create a multi-level index
df_pivot.columns = [
"{}_{}".format(feature, time.strftime("%H:%M"))
for feature, time in df_pivot.columns
]
# Join the pivot table with the target data
df_final = df_pivot.join(df_target.set_index("Date"))
# Convert the index back to datetime if it got changed to object type
df_final.index = pd.to_datetime(df_final.index)
df = df_final.dropna()
return df
def high_low_columns(df_final):
# Extract columns for 'High' and 'Low' values
high_columns = [col for col in df_final.columns if "High_" in col]
low_columns = [col for col in df_final.columns if "Low_" in col]
# Calculate 'max high' and 'min low' for each day
df_final["MAX_high"] = df_final[high_columns].max(axis=1)
df_final["MIN_low"] = df_final[low_columns].min(axis=1)
return df_final
def calc_percentage_change(df):
# Convert index to datetime if necessary (if not already done)
df.index = pd.to_datetime(df.index)
# Calculate the percentage change relative to 'Open_09:30' for each column
for column in df.columns:
if column != "Open_09:30":
df[column] = (df[column] - df["Open_09:30"]) / df["Open_09:30"] * 100
return df
def create_features(df):
"""
Create time series features based on time series index.
"""
df = df.copy()
df["dayofweek"] = df.index.dayofweek
df["quarter"] = df.index.quarter
df["month"] = df.index.month
df["year"] = df.index.year
df["dayofyear"] = df.index.dayofyear
df["dayofmonth"] = df.index.day
df["weekofyear"] = df.index.isocalendar().week
df["weekofyear"] = df["weekofyear"].astype("Int32")
return df
def train_test_split(df):
df.index = pd.to_datetime(df.index)
# Define the number of test instances (e.g., last 30 days)
num_test = 30
# Split data into features and target
X = df.drop(columns=["Close_target"])
y = df["Close_target"]
# Split the data into training and testing sets
X_train, y_train = X[:-num_test], y[:-num_test]
X_test, y_test = X[-num_test:], y[-num_test:]
# Train indices are earlier, and test indices include the last date
train_indices = df.index < df.index[-num_test]
test_indices = df.index >= df.index[-num_test]
return X_train, y_train, X_test, y_test
def run_xgboost(df):
X_train, y_train, X_test, y_test = train_test_split(df)
# Define the model
model = xgb.XGBRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
subsample=0.8,
colsample_bytree=0.8,
objective="reg:squarederror",
)
# Train the model with evaluation
model.fit(
X_train,
y_train,
eval_metric="rmse",
eval_set=[(X_train, y_train), (X_test, y_test)],
verbose=True,
early_stopping_rounds=10,
)
# Making predictions
predictions = model.predict(X_test)
# Prediction for the latest date
latest_prediction = predictions[-1]
# Calculate and print RMSE for the test set
rmse = np.sqrt(mean_squared_error(y_test, predictions))
np_float = np.float32(rmse)
return {"latest_prediction": latest_prediction, "RMSE": float(np_float)}
@app.get("/ticker/{ticker}")
def prcess_ticker(ticker: str):
df = get_data(ticker)
df = get_last_date_missing_hours(df)
df = prepare_df_for_model(df)
df = high_low_columns(df)
df = calc_percentage_change(df)
df = create_features(df)
result = run_xgboost(df)
return json.dumps(result, cls=NumpyEncoder)
class NumpyEncoder(json.JSONEncoder):
"""Custom encoder for numpy data types"""
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(
obj,
(
np.int_,
np.intc,
np.intp,
np.int8,
np.int16,
np.int32,
np.int64,
np.uint8,
np.uint16,
np.uint32,
np.uint64,
),
):
return int(obj)
if isinstance(obj, (np.float_, np.float16, np.float32, np.float64)):
return float(obj)
if isinstance(obj, (np.complex_, np.complex64, np.complex128)):
return {"real": obj.real, "imag": obj.imag}
return json.JSONEncoder.default(self, obj)