Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torch.utils.data as data | |
import gradio as gr | |
import plotly.graph_objects as go | |
# Function to create dataset for time series prediction | |
def create_dataset(dataset, lookback): | |
X, y = [], [] | |
for i in range(len(dataset) - lookback): | |
feature = dataset[i:i + lookback] | |
target = dataset[i + 1:i + lookback + 1] | |
X.append(feature) | |
y.append(target) | |
X = np.array(X).reshape(-1, lookback, 1) # Reshape to 3D (samples, lookback, features) | |
y = np.array(y).reshape(-1, lookback, 1) # Reshape to 3D (samples, lookback, features) | |
return torch.tensor(X).float(), torch.tensor(y).float() | |
# Define LSTM model | |
class AirModel(nn.Module): | |
def __init__(self): | |
super(AirModel, self).__init__() | |
self.lstm = nn.LSTM(input_size=1, hidden_size=50, num_layers=1, batch_first=True) | |
self.linear = nn.Linear(50, 1) | |
def forward(self, x): | |
x, _ = self.lstm(x) | |
x = self.linear(x) | |
return x | |
# Training and prediction function | |
def train_and_predict(csv_file, lookback, epochs, batch_size): | |
# Load CSV | |
df = pd.read_csv(csv_file.name) | |
# Extract time series data | |
timeseries = df[["AmtNet Sales USD"]].values.astype('float32') | |
# Train-test split | |
train_size = int(len(timeseries) * 0.67) | |
test_size = len(timeseries) - train_size | |
train, test = timeseries[:train_size], timeseries[train_size:] | |
# Create datasets | |
X_train, y_train = create_dataset(train, lookback=lookback) | |
X_test, y_test = create_dataset(test, lookback=lookback) | |
if len(X_train) == 0 or len(X_test) == 0: | |
return "The lookback value is too large for the dataset. Please reduce the lookback value." | |
# DataLoader for batching | |
train_loader = data.DataLoader(data.TensorDataset(X_train, y_train), shuffle=True, batch_size=batch_size) | |
# Initialize model, optimizer, and loss function | |
model = AirModel() | |
optimizer = optim.Adam(model.parameters()) | |
loss_fn = nn.MSELoss() | |
# Training loop | |
for epoch in range(epochs): | |
model.train() | |
for X_batch, y_batch in train_loader: | |
y_pred = model(X_batch) | |
loss = loss_fn(y_pred, y_batch) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
# Prediction | |
model.eval() | |
with torch.no_grad(): | |
train_plot = np.ones_like(timeseries) * np.nan | |
train_plot[lookback:train_size] = model(X_train)[:, -1, :].numpy() | |
test_plot = np.ones_like(timeseries) * np.nan | |
test_plot[train_size + lookback:len(timeseries)] = model(X_test)[:, -1, :].numpy() | |
# Plot results with Plotly | |
fig = go.Figure() | |
fig.add_trace(go.Scatter(y=timeseries.squeeze(), mode='lines', name='Original Data')) | |
fig.add_trace(go.Scatter(y=train_plot.squeeze(), mode='lines', name='Train Prediction', line=dict(color='red'))) | |
fig.add_trace(go.Scatter(y=test_plot.squeeze(), mode='lines', name='Test Prediction', line=dict(color='green'))) | |
fig.update_layout(title="Time Series Prediction", xaxis_title="Time", yaxis_title="Sales") | |
# Calculate Mean Absolute Error (MAE) | |
mae = np.mean(np.abs(test_plot[train_size + lookback:len(timeseries)] - timeseries[train_size + lookback:len(timeseries)])) | |
return fig, f"Mean Absolute Error (MAE) on Test Data: {mae:.4f}" | |
# Gradio app interface using new API | |
interface = gr.Interface( | |
fn=train_and_predict, | |
inputs=[ | |
gr.File(label="Upload your CSV file"), | |
gr.Slider(10, 365, step=1, value=100, label="Lookback"), | |
gr.Slider(100, 5000, step=100, value=1000, label="Epochs"), | |
gr.Slider(4, 32, step=1, value=8, label="Batch size") | |
], | |
outputs=[ | |
gr.Plot(label="Prediction Plot"), | |
gr.Textbox(label="Error Metrics") | |
], | |
title="Time Series Prediction with LSTM", | |
description="Upload a CSV file with a 'Amount Net Sales' column and get time series predictions using an LSTM model.", | |
) | |
# Launch the app with a shareable link | |
interface.launch() | |