Spaces:
Sleeping
Sleeping
File size: 7,737 Bytes
36844c0 61c35dc cbb5bad 61c35dc 36844c0 81c8033 0f9f1bf 36844c0 61c35dc 63bc08c 61c35dc e0d5ca2 2262d7f e0d5ca2 61c35dc e0d5ca2 61c35dc e0d5ca2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
from fastapi import FastAPI
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# import seaborn as sns
import yfinance as yf
import datetime as dt
import xgboost as xgb
from sklearn.metrics import mean_squared_error
app = FastAPI()
@app.get("/")
def read_root():
return {
"message": "Hello, Please type a ticker at the end of the URL to get LAST TRADING HOUR FORCAST.",
"format": "https://yaakovy-lasthourforcast.hf.space/ticker/[TICKER]",
"example": "https://yaakovy-lasthourforcast.hf.space/ticker/msft",
}
def get_data(ticker):
# Define the ticker symbol
tickerSymbol = ticker
days_period = 300
# Get data on this ticker
tickerData = yf.Ticker(tickerSymbol)
start_date = dt.datetime.today() - dt.timedelta(days=days_period)
end_date = dt.datetime.today()
df_all = tickerData.history(start=start_date, end=end_date, interval="1h")
df_all = df_all.drop(columns=["Dividends", "Stock Splits", "Volume"])
return df_all
def get_last_date_missing_hours(df):
# Assuming df is your DataFrame with the correct datetime index
df.index = pd.to_datetime(df.index) # Ensure datetime format
# Define the trading hours
trading_start = "09:30:00"
trading_end = "16:00:00"
# Normalize the timezone if necessary, here assuming the data might be timezone aware
df.index = df.index.tz_localize(None)
# Find the latest date in your data
latest_date = df.index.max().date()
# Generate a full range of expected trading hours for the latest date, ensuring it's timezone-naive
expected_hours = pd.date_range(
start=f"{latest_date} {trading_start}",
end=f"{latest_date} {trading_end}",
freq="H",
tz=None,
)
# Extract actual timestamps for the latest date, also as timezone-naive
actual_hours = df[df.index.date == latest_date].index.tz_localize(None)
# Determine missing hours
missing_hours = expected_hours.difference(actual_hours)
# Add missing hours to the DataFrame as empty rows
for hour in missing_hours:
if hour not in df.index:
df.loc[hour] = [pd.NA] * len(df.columns) # Initialize missing hours with NA
# Sort the DataFrame after inserting new rows to maintain the chronological order
df.sort_index(inplace=True)
# forward filling
# Ensure the index is in datetime format and normalized
df.index = pd.to_datetime(df.index)
df.index = df.index.tz_localize(None)
# Find the latest date in your data
latest_date = df.index.max().date()
# Select only the data for the latest day
latest_day_data = df[df.index.date == latest_date]
# Perform forward filling on this latest day data
latest_day_data_filled = latest_day_data.ffill()
# Replace the original latest day data in the DataFrame with the filled data
df.loc[df.index.date == latest_date] = latest_day_data_filled
# Optionally, ensure the entire DataFrame is sorted by index
df.sort_index(inplace=True)
return df
def prepare_df_for_model(df):
df.index = pd.to_datetime(df.index) # Ensure the index is datetime
# Extract date and time from the datetime index
df["Date"] = df.index.date
df["Time"] = df.index.time
# Filter out data for hours from 09:30 to 14:30 and the target at 15:30
df_hours = df[
df["Time"].isin(
[
pd.to_datetime("09:30:00").time(),
pd.to_datetime("10:30:00").time(),
pd.to_datetime("11:30:00").time(),
pd.to_datetime("12:30:00").time(),
pd.to_datetime("13:30:00").time(),
pd.to_datetime("14:30:00").time(),
]
)
]
df_target = df[df["Time"] == pd.to_datetime("15:30:00").time()][["Date", "Close"]]
# Rename the target close column for clarity
df_target.rename(columns={"Close": "Close_target"}, inplace=True)
# Pivot the hours data to have one row per day with all the columns
df_pivot = df_hours.pivot(
index="Date", columns="Time", values=["Open", "High", "Low", "Close"]
)
# Flatten the columns after pivoting and create a multi-level index
df_pivot.columns = [
"{}_{}".format(feature, time.strftime("%H:%M"))
for feature, time in df_pivot.columns
]
# Join the pivot table with the target data
df_final = df_pivot.join(df_target.set_index("Date"))
# Convert the index back to datetime if it got changed to object type
df_final.index = pd.to_datetime(df_final.index)
df = df_final.dropna()
return df
def high_low_columns(df_final):
# Extract columns for 'High' and 'Low' values
high_columns = [col for col in df_final.columns if "High_" in col]
low_columns = [col for col in df_final.columns if "Low_" in col]
# Calculate 'max high' and 'min low' for each day
df_final["MAX_high"] = df_final[high_columns].max(axis=1)
df_final["MIN_low"] = df_final[low_columns].min(axis=1)
return df_final
def calc_percentage_change(df):
# Convert index to datetime if necessary (if not already done)
df.index = pd.to_datetime(df.index)
# Calculate the percentage change relative to 'Open_09:30' for each column
for column in df.columns:
if column != "Open_09:30":
df[column] = (df[column] - df["Open_09:30"]) / df["Open_09:30"] * 100
return df
def create_features(df):
"""
Create time series features based on time series index.
"""
df = df.copy()
df["dayofweek"] = df.index.dayofweek
df["quarter"] = df.index.quarter
df["month"] = df.index.month
df["year"] = df.index.year
df["dayofyear"] = df.index.dayofyear
df["dayofmonth"] = df.index.day
df["weekofyear"] = df.index.isocalendar().week
df["weekofyear"] = df["weekofyear"].astype("Int32")
return df
def train_test_split(df):
df.index = pd.to_datetime(df.index)
# Define the number of test instances (e.g., last 30 days)
num_test = 30
# Split data into features and target
X = df.drop(columns=["Close_target"])
y = df["Close_target"]
# Split the data into training and testing sets
X_train, y_train = X[:-num_test], y[:-num_test]
X_test, y_test = X[-num_test:], y[-num_test:]
# Train indices are earlier, and test indices include the last date
train_indices = df.index < df.index[-num_test]
test_indices = df.index >= df.index[-num_test]
return X_train, y_train, X_test, y_test
def run_xgboost(df):
X_train, y_train, X_test, y_test = train_test_split(df)
# Define the model
model = xgb.XGBRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
subsample=0.8,
colsample_bytree=0.8,
objective="reg:squarederror",
)
# Train the model with evaluation
model.fit(
X_train,
y_train,
eval_metric="rmse",
eval_set=[(X_train, y_train), (X_test, y_test)],
verbose=True,
early_stopping_rounds=10,
)
# Making predictions
predictions = model.predict(X_test)
# Prediction for the latest date
latest_prediction = predictions[-1]
# Calculate and print RMSE for the test set
rmse = np.sqrt(mean_squared_error(y_test, predictions))
np_float = np.float32(rmse)
return {"latest_prediction": latest_prediction, "RMSE": float(np_float)}
@app.get("/ticker/{ticker}")
def prcess_ticker(ticker: str):
df = get_data(ticker)
df = get_last_date_missing_hours(df)
df = prepare_df_for_model(df)
df = high_low_columns(df)
df = calc_percentage_change(df)
df = create_features(df)
result = run_xgboost(df)
return result
|