import torch import torch.nn as nn import torch.optim as optim import numpy as np from fastapi import FastAPI, UploadFile, File from sklearn.model_selection import KFold from sklearn.metrics import mean_squared_error import csv import io from joblib import load, dump # Define the DNN model class DNN(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(DNN, self).__init__() self.fc1 = nn.Linear(input_size, hidden_size) self.relu = nn.ReLU() self.fc2 = nn.Linear(hidden_size, output_size) def forward(self, x): x = self.fc1(x) x = self.relu(x) x = self.fc2(x) return x # Load the model model = DNN(input_size=4, hidden_size=16, output_size=1) # Create a new FastAPI app instance app = FastAPI(docs_url="/", redoc_url="/new_redoc") # Create a POST endpoint @app.get("/generate/{squareFeet}/{bedrooms}/{bathrooms}/{yearBuilt}") def generate( squareFeet: float, bedrooms: float, bathrooms: float, yearBuilt: float, ): global model input_data = torch.tensor( [[squareFeet, bedrooms, bathrooms, yearBuilt]], dtype=torch.float32 ) prediction = model(input_data) return {"output": prediction.item()} @app.post("/train") async def train(file: UploadFile = File(...)): global model contents = await file.read() data = list(csv.reader(io.StringIO(contents.decode("utf-8")))) data_np = np.array(data[1:], dtype=object) # Delete the fourth column data_np = np.delete(data_np, 3, axis=1) data_np = np.array(data_np, dtype=float) # All columns except the last X = data_np[:, :-1] # Only the last column y = data_np[:, -1] y = np.ravel(y) # Convert data to torch tensors X = torch.tensor(X, dtype=torch.float32) y = torch.tensor(y, dtype=torch.float32) # Define loss function and optimizer criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # Fit the model kf = KFold(n_splits=4) accuracies = [] for train_index, test_index in kf.split(X): X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] optimizer.zero_grad() # Forward pass outputs = model(X_train) loss = criterion(outputs, y_train.unsqueeze(1)) # Backward pass and optimization loss.backward() optimizer.step() predictions = model(X_test) rmse = np.sqrt(mean_squared_error(y_test, predictions.detach().numpy())) accuracies.append(rmse) average_rmse = sum(accuracies) / len(accuracies) print(f"Average RMSE: {average_rmse}") dump(model, "model.joblib") return {"filename": file.filename, "average_rmse": average_rmse}