import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim import torch.utils.data as data import gradio as gr import plotly.graph_objects as go from tqdm import tqdm # Progress bar # Function to create dataset for time series prediction def create_dataset(dataset, lookback): X, y = [], [] for i in range(len(dataset) - lookback): feature = dataset[i:i + lookback] target = dataset[i + 1:i + lookback + 1] X.append(feature) y.append(target) X = np.array(X).reshape(-1, lookback, 1) # Reshape to 3D (samples, lookback, features) y = np.array(y).reshape(-1, lookback, 1) # Reshape to 3D (samples, lookback, features) return torch.tensor(X).float(), torch.tensor(y).float() # Define LSTM model class AirModel(nn.Module): def __init__(self): super(AirModel, self).__init__() self.lstm = nn.LSTM(input_size=1, hidden_size=50, num_layers=1, batch_first=True) self.linear = nn.Linear(50, 1) def forward(self, x): x, _ = self.lstm(x) x = self.linear(x) return x # Training and prediction function def train_and_predict(csv_file, lookback, epochs, batch_size): # Load CSV df = pd.read_csv(csv_file.name) # Extract time series data timeseries = df[["AmtNet Sales USD"]].values.astype('float32') # Train-test split train_size = int(len(timeseries) * 0.67) test_size = len(timeseries) - train_size train, test = timeseries[:train_size], timeseries[train_size:] # Create datasets X_train, y_train = create_dataset(train, lookback=lookback) X_test, y_test = create_dataset(test, lookback=lookback) if len(X_train) == 0 or len(X_test) == 0: return "The lookback value is too large for the dataset. Please reduce the lookback value." # DataLoader for batching train_loader = data.DataLoader(data.TensorDataset(X_train, y_train), shuffle=True, batch_size=batch_size) # Initialize model, optimizer, and loss function model = AirModel() optimizer = optim.Adam(model.parameters()) loss_fn = nn.MSELoss() # Training loop with progress bar for epoch in tqdm(range(epochs), desc="Training Progress"): model.train() epoch_loss = 0 for X_batch, y_batch in train_loader: y_pred = model(X_batch) loss = loss_fn(y_pred, y_batch) optimizer.zero_grad() loss.backward() optimizer.step() epoch_loss += loss.item() # Prediction model.eval() with torch.no_grad(): train_plot = np.ones_like(timeseries) * np.nan train_plot[lookback:train_size] = model(X_train)[:, -1, :].numpy() test_plot = np.ones_like(timeseries) * np.nan test_plot[train_size + lookback:len(timeseries)] = model(X_test)[:, -1, :].numpy() # Plot results with Plotly fig = go.Figure() fig.add_trace(go.Scatter(y=timeseries.squeeze(), mode='lines', name='Original Data')) fig.add_trace(go.Scatter(y=train_plot.squeeze(), mode='lines', name='Train Prediction', line=dict(color='red'))) fig.add_trace(go.Scatter(y=test_plot.squeeze(), mode='lines', name='Test Prediction', line=dict(color='green'))) fig.update_layout(title="Time Series Prediction", xaxis_title="Time", yaxis_title="Sales") # Calculate Mean Absolute Error (MAE) mae = np.mean(np.abs(test_plot[train_size + lookback:len(timeseries)] - timeseries[train_size + lookback:len(timeseries)])) # Calculate Root Mean Squared Error (RMSE) rmse = np.sqrt(np.mean((test_plot[train_size + lookback:len(timeseries)] - timeseries[train_size + lookback:len(timeseries)])**2)) return fig, f"Mean Absolute Error (MAE) on Test Data: {mae:.4f}, Root Mean Squared Error (RMSE): {rmse:.4f}" # Gradio app interface using new API interface = gr.Interface( fn=train_and_predict, inputs=[ gr.File(label="Upload your CSV file"), gr.Slider(10, 365, step=1, value=100, label="Lookback"), gr.Slider(100, 5000, step=100, value=1000, label="Epochs"), gr.Slider(4, 32, step=1, value=8, label="Batch size") ], outputs=[ gr.Plot(label="Prediction Plot"), gr.Textbox(label="Error Metrics") ], title="Time Series Prediction with LSTM", description="Upload a CSV file with a 'Amount Net Sales' column and get time series predictions using an LSTM model.", ) # Launch the app with a shareable link interface.launch()