AIIASpace / app.py
ArxAlfa
Refactor DNN model to support variable number of
66f8548
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from fastapi import FastAPI, UploadFile, File
from sklearn.metrics import mean_squared_error
import pandas as pd
from sklearn.model_selection import train_test_split
import csv
import io
# Define the DNN model
class DNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_hidden_layers):
super(DNN, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu1 = nn.ReLU()
self.hidden_layers = nn.ModuleList()
for _ in range(num_hidden_layers):
self.hidden_layers.append(nn.Linear(hidden_size, hidden_size))
self.hidden_layers.append(nn.ReLU())
self.fc3 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu1(x)
for layer in self.hidden_layers:
x = layer(x)
x = self.fc3(x)
return x
# Load the model
model = DNN(input_size=6, hidden_size=64, output_size=1, num_hidden_layers=32)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.load_state_dict(torch.load("model_weights.pth", map_location=device))
# Create a new FastAPI app instance
app = FastAPI(docs_url="/", redoc_url="/new_redoc")
# Create a POST endpoint
@app.get(
"/generate/{Soil_Quality}/{Seed_Variety}/{Fertilizer_Amount_kg_per_hectare}/{Sunny_Days}/{Rainfall_mm}/{Irrigation_Schedule}"
)
def generate(
Soil_Quality: float,
Seed_Variety: float,
Fertilizer_Amount_kg_per_hectare: float,
Sunny_Days: float,
Rainfall_mm: float,
Irrigation_Schedule: float,
):
global model
# Combine all inputs
input_data = [
Soil_Quality,
Seed_Variety,
Fertilizer_Amount_kg_per_hectare,
Sunny_Days,
Rainfall_mm,
Irrigation_Schedule,
]
input_data = torch.tensor([input_data], dtype=torch.float32)
input_data = input_data.to(device)
prediction = model(input_data)
return {"prediction": prediction.item()}
@app.post("/train")
async def train(
trainDatafile: UploadFile = File(...),
testDatafile: UploadFile = File(...),
epochs: int = 100,
):
global model
contents1 = await trainDatafile.read()
train_data = pd.read_csv(io.StringIO(contents1.decode("utf-8")))
contents2 = await testDatafile.read()
test_data = pd.read_csv(io.StringIO(contents2.decode("utf-8")))
# Convert data to numpy arrays
X_train = train_data.drop("Yield_kg_per_hectare", axis=1).values
y_train = train_data["Yield_kg_per_hectare"].values
X_test = test_data.drop("Yield_kg_per_hectare", axis=1).values
y_test = test_data["Yield_kg_per_hectare"].values
# Convert data to torch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
X_train = X_train.to(device)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_train = y_train.to(device)
X_test = torch.tensor(X_test, dtype=torch.float32)
X_test = X_test.to(device)
y_test = torch.tensor(y_test, dtype=torch.float32)
# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
rmseList = []
for epoch in range(epochs):
optimizer.zero_grad()
# Forward pass
outputs = model(X_train)
loss = criterion(outputs, y_train.unsqueeze(1))
# Backward pass and optimization
loss.backward()
optimizer.step()
predictions = model(X_test)
rmse = np.sqrt(
mean_squared_error(
y_test.cpu().detach().numpy(), predictions.cpu().detach().numpy()
)
)
print(
f"Epoch: {epoch+1}, RMSE: {float(rmse)}, Loss: {float(np.sqrt(loss.cpu().detach().numpy()))}"
)
rmseList.append(float(rmse))
torch.save(model.state_dict(), "model_weights.pth")
return {"rmse": rmseList}