strykerr / app.py
titanhacker's picture
Create app.py
6a9bd56 verified
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import gradio as gr
import plotly.graph_objects as go
# Function to create dataset for time series prediction
def create_dataset(dataset, lookback):
X, y = [], []
for i in range(len(dataset) - lookback):
feature = dataset[i:i + lookback]
target = dataset[i + 1:i + lookback + 1]
X.append(feature)
y.append(target)
X = np.array(X).reshape(-1, lookback, 1) # Reshape to 3D (samples, lookback, features)
y = np.array(y).reshape(-1, lookback, 1) # Reshape to 3D (samples, lookback, features)
return torch.tensor(X).float(), torch.tensor(y).float()
# Define LSTM model
class AirModel(nn.Module):
def __init__(self):
super(AirModel, self).__init__()
self.lstm = nn.LSTM(input_size=1, hidden_size=50, num_layers=1, batch_first=True)
self.linear = nn.Linear(50, 1)
def forward(self, x):
x, _ = self.lstm(x)
x = self.linear(x)
return x
# Training and prediction function
def train_and_predict(csv_file, lookback, epochs, batch_size):
# Load CSV
df = pd.read_csv(csv_file.name)
# Extract time series data
timeseries = df[["AmtNet Sales USD"]].values.astype('float32')
# Train-test split
train_size = int(len(timeseries) * 0.67)
test_size = len(timeseries) - train_size
train, test = timeseries[:train_size], timeseries[train_size:]
# Create datasets
X_train, y_train = create_dataset(train, lookback=lookback)
X_test, y_test = create_dataset(test, lookback=lookback)
if len(X_train) == 0 or len(X_test) == 0:
return "The lookback value is too large for the dataset. Please reduce the lookback value."
# DataLoader for batching
train_loader = data.DataLoader(data.TensorDataset(X_train, y_train), shuffle=True, batch_size=batch_size)
# Initialize model, optimizer, and loss function
model = AirModel()
optimizer = optim.Adam(model.parameters())
loss_fn = nn.MSELoss()
# Training loop
for epoch in range(epochs):
model.train()
for X_batch, y_batch in train_loader:
y_pred = model(X_batch)
loss = loss_fn(y_pred, y_batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Prediction
model.eval()
with torch.no_grad():
train_plot = np.ones_like(timeseries) * np.nan
train_plot[lookback:train_size] = model(X_train)[:, -1, :].numpy()
test_plot = np.ones_like(timeseries) * np.nan
test_plot[train_size + lookback:len(timeseries)] = model(X_test)[:, -1, :].numpy()
# Plot results with Plotly
fig = go.Figure()
fig.add_trace(go.Scatter(y=timeseries.squeeze(), mode='lines', name='Original Data'))
fig.add_trace(go.Scatter(y=train_plot.squeeze(), mode='lines', name='Train Prediction', line=dict(color='red')))
fig.add_trace(go.Scatter(y=test_plot.squeeze(), mode='lines', name='Test Prediction', line=dict(color='green')))
fig.update_layout(title="Time Series Prediction", xaxis_title="Time", yaxis_title="Sales")
# Calculate Mean Absolute Error (MAE)
mae = np.mean(np.abs(test_plot[train_size + lookback:len(timeseries)] - timeseries[train_size + lookback:len(timeseries)]))
return fig, f"Mean Absolute Error (MAE) on Test Data: {mae:.4f}"
# Gradio app interface using new API
interface = gr.Interface(
fn=train_and_predict,
inputs=[
gr.File(label="Upload your CSV file"),
gr.Slider(10, 365, step=1, value=100, label="Lookback"),
gr.Slider(100, 5000, step=100, value=1000, label="Epochs"),
gr.Slider(4, 32, step=1, value=8, label="Batch size")
],
outputs=[
gr.Plot(label="Prediction Plot"),
gr.Textbox(label="Error Metrics")
],
title="Time Series Prediction with LSTM",
description="Upload a CSV file with a 'Amount Net Sales' column and get time series predictions using an LSTM model.",
)
# Launch the app with a shareable link
interface.launch()