import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from fastapi import FastAPI, UploadFile, File
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import OneHotEncoder
import csv
import io
from joblib import load, dump
# Define the DNN model
class DNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(DNN, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu1 = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.relu2 = nn.ReLU()
self.fc3 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu1(x)
x = self.fc2(x)
x = self.relu2(x)
x = self.fc3(x)
return x
# Load the model
model = DNN(input_size=7, hidden_size=128, output_size=1)
# Initialize the OneHotEncoder
encoder = OneHotEncoder(handle_unknown="ignore")
# Create a new FastAPI app instance
app = FastAPI(docs_url="/", redoc_url="/new_redoc")
# Create a POST endpoint
def generate(
squareFeet: float,
bedrooms: float,
bathrooms: float,
neighborhood: str,
yearBuilt: float,
global model, encoder
# Apply the encoder to the neighborhood input
neighborhood_encoded = encoder.transform([[neighborhood]]).toarray()[0]
# Combine all inputs
input_data = [squareFeet, bedrooms, bathrooms, *neighborhood_encoded, yearBuilt]
input_data = torch.tensor([input_data], dtype=torch.float32)
prediction = model(input_data)
return {"output": prediction.item()}"/train")
async def train(file: UploadFile = File(...)):
global model, encoder
contents = await
data = list(csv.reader(io.StringIO(contents.decode("utf-8"))))
data_np = np.array(data[1:], dtype=object)
# Delete the fourth column
encoded_columns = encoder.fit_transform(data_np[:, 3].reshape(-1, 1))
data_np = np.delete(data_np, 3, axis=1)
data_np = np.concatenate((data_np, encoded_columns.toarray()), axis=1)
data_np = np.array(data_np, dtype=float)
# All columns except the last
X = data_np[:, :-1]
# Only the last column
y = data_np[:, -1]
y = np.ravel(y)
# Convert data to torch tensors
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)
# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
# Fit the model
kf = KFold(n_splits=4)
accuracies = []
epochs = 25 # Define the number of epochs
for epoch in range(epochs):
for train_index, test_index in kf.split(X):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
# Forward pass
outputs = model(X_train)
loss = criterion(outputs, y_train.unsqueeze(1))
# Backward pass and optimization
predictions = model(X_test)
rmse = np.sqrt(mean_squared_error(y_test, predictions.detach().numpy()))
average_rmse = sum(accuracies) / len(accuracies)
print(f"Epoch: {epoch+1}, Average RMSE: {average_rmse}")
dump(model, "model.joblib")
return {"filename": file.filename, "average_rmse": average_rmse}