File size: 6,662 Bytes
10b50ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import gradio as gr
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
import os

# --- Constants ---
TIME_STEP = 100
MODEL_PATH = 'stock_prediction_model.h5'

# --- Model Architecture Definition ---
def create_lstm_model():
    """Defines the Bidirectional LSTM model architecture used for training."""
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True), input_shape=(TIME_STEP, 1)))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True)))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32)))
    model.add(tf.keras.layers.Dense(64, activation='relu'))
    model.add(tf.keras.layers.Dense(1))
    model.compile(loss='mse', optimizer='adam')
    return model

# --- Model Loading ---
try:
    # Attempt to load the pre-trained model
    model = tf.keras.models.load_model(MODEL_PATH)
    print(f"Successfully loaded model from {MODEL_PATH}")
except Exception as e:
    # If loading fails (e.g., in a local test where the file is missing),
    # create a dummy model for structure checking, but warn the user.
    print(f"Warning: Could not load {MODEL_PATH}. Error: {e}")
    print("Initializing a dummy model. Please ensure your 'stock_prediction_model.h5' is uploaded.")
    model = create_lstm_model() # Create architecture
    # NOTE: The dummy model has random weights and will give poor predictions until the H5 file is provided.


# --- Prediction Logic Function ---
def forecast_stock(csv_file, days_to_predict=30):
    """
    Takes an uploaded CSV file containing stock data, extracts 'Close' prices,
    and forecasts the next 'days_to_predict' using the loaded LSTM model.
    """
    if csv_file is None:
        return None, "Error: Please upload a CSV file.", None

    try:
        # Load the uploaded data
        df = pd.read_csv(csv_file.name)
        
        # Ensure 'Close' column exists
        if 'Close' not in df.columns:
            return None, "Error: CSV must contain a 'Close' price column.", None

        # Extract and scale the 'Close' prices (fit the scaler on the entire provided dataset)
        ds_close = df.reset_index()['Close'].values.reshape(-1, 1)
        scaler = MinMaxScaler(feature_range=(0, 1))
        ds_close_scaled = scaler.fit_transform(ds_close)
        
        # Get the last TIME_STEP (100) values to use as the initial input for forecasting
        if len(ds_close_scaled) < TIME_STEP:
            return None, f"Error: Dataset must contain at least {TIME_STEP} entries for initial prediction.", None

        # The input data is the last 100 scaled data points
        x_input = ds_close_scaled[-TIME_STEP:].reshape(1, -1)
        temp_input = list(x_input[0])

        lst_output = []
        i = 0
        
        # Iterative prediction loop (predict 1 day, append, use result for next prediction)
        while i < days_to_predict:
            if len(temp_input) > TIME_STEP:
                # Get the last 100 steps
                x_input = np.array(temp_input[1:])
                x_input = x_input.reshape(1, TIME_STEP, 1)
                temp_input = temp_input[1:]
            else:
                x_input = np.array(temp_input).reshape(1, TIME_STEP, 1)

            # Predict the next step
            yhat = model.predict(x_input, verbose=0)
            
            # Append prediction to the input sequence and to the output list
            temp_input.extend(yhat[0].tolist())
            lst_output.extend(yhat.tolist())
            i = i + 1

        # Inverse transform the forecasted data to get actual prices
        predicted_prices = scaler.inverse_transform(lst_output)
        
        # Create a plot for visualization
        plt.figure(figsize=(10, 6))
        
        # Actual Data Plot
        actual_prices = scaler.inverse_transform(ds_close_scaled)
        day_actual = np.arange(len(actual_prices) - TIME_STEP, len(actual_prices))
        
        plt.plot(day_actual, actual_prices[-TIME_STEP:], label='Last 100 Actual Days', color='blue')
        
        # Predicted Data Plot
        day_pred = np.arange(TIME_STEP, TIME_STEP + days_to_predict)
        plt.plot(day_pred, predicted_prices, label=f'Forecasted {days_to_predict} Days', color='red', linestyle='--')
        
        # Connect the last actual point to the first predicted point
        plt.plot([day_actual[-1], day_pred[0]], [actual_prices[-1], predicted_prices[0]], color='red', linestyle='--')

        plt.title('Stock Price Forecast (LSTM)')
        plt.xlabel('Days')
        plt.ylabel('Close Price')
        plt.legend()
        plt.grid(True)
        plot_output = plt
        
        # Create a DataFrame for output
        # Get the date of the last actual entry
        last_date = pd.to_datetime(df.iloc[-1]['Date']) if 'Date' in df.columns else pd.to_datetime(df.index[-1])
        
        # Generate future dates
        future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=days_to_predict)
        
        df_forecast = pd.DataFrame({
            'Date': future_dates.strftime('%Y-%m-%d'),
            'Forecasted Price': np.round(predicted_prices.flatten(), 2)
        })

        return plot_output, "Forecast successful!", df_forecast
        
    except Exception as e:
        return None, f"An unexpected error occurred: {e}", None


# --- Gradio Interface ---

# Define the inputs
input_csv = gr.File(label="1. Upload Historical Stock CSV (must include 'Date' and 'Close' columns)")
input_days = gr.Slider(minimum=10, maximum=180, value=30, step=1, label="2. Number of days to forecast")

# Define the outputs
output_plot = gr.Plot(label="Price Forecast Visualization")
output_message = gr.Textbox(label="Status / Notes", value="Waiting for file upload...")
output_df = gr.Dataframe(label="Forecasted Prices Table")

# Create the Gradio Interface
iface = gr.Interface(
    fn=forecast_stock,
    inputs=[input_csv, input_days],
    outputs=[output_plot, output_message, output_df],
    title="LSTM Stock Price Prediction",
    description="Upload a CSV file of historical stock data and use the pre-trained Bidirectional LSTM model to forecast future closing prices. The model requires the latest 100 data points to make the initial forecast.",
    allow_flagging='never'
)

# Launch the app for local testing (Hugging Face Spaces will ignore this and use 'iface.launch()' internally)
if __name__ == "__main__":
    iface.launch()