|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import numpy as np |
|
from fastapi import FastAPI, UploadFile, File |
|
from sklearn.model_selection import KFold |
|
from sklearn.metrics import mean_squared_error |
|
from sklearn.preprocessing import OneHotEncoder |
|
import csv |
|
import io |
|
|
|
from joblib import load, dump |
|
|
|
|
|
|
|
class DNN(nn.Module): |
|
def __init__(self, input_size, hidden_size, output_size): |
|
super(DNN, self).__init__() |
|
self.fc1 = nn.Linear(input_size, hidden_size) |
|
self.relu1 = nn.ReLU() |
|
self.fc2 = nn.Linear(hidden_size, hidden_size) |
|
self.relu2 = nn.ReLU() |
|
self.fc3 = nn.Linear(hidden_size, output_size) |
|
|
|
def forward(self, x): |
|
x = self.fc1(x) |
|
x = self.relu1(x) |
|
x = self.fc2(x) |
|
x = self.relu2(x) |
|
x = self.fc3(x) |
|
return x |
|
|
|
|
|
|
|
model = DNN(input_size=7, hidden_size=128, output_size=1) |
|
|
|
|
|
encoder = OneHotEncoder(handle_unknown="ignore") |
|
|
|
|
|
app = FastAPI(docs_url="/", redoc_url="/new_redoc") |
|
|
|
|
|
|
|
@app.get("/generate/{squareFeet}/{bedrooms}/{bathrooms}/{neighborhood}/{yearBuilt}") |
|
def generate( |
|
squareFeet: float, |
|
bedrooms: float, |
|
bathrooms: float, |
|
neighborhood: str, |
|
yearBuilt: float, |
|
): |
|
global model, encoder |
|
|
|
|
|
neighborhood_encoded = encoder.transform([[neighborhood]]).toarray()[0] |
|
|
|
|
|
input_data = [squareFeet, bedrooms, bathrooms, *neighborhood_encoded, yearBuilt] |
|
|
|
input_data = torch.tensor([input_data], dtype=torch.float32) |
|
prediction = model(input_data) |
|
return {"output": prediction.item()} |
|
|
|
|
|
@app.post("/train") |
|
async def train(file: UploadFile = File(...)): |
|
global model, encoder |
|
|
|
contents = await file.read() |
|
data = list(csv.reader(io.StringIO(contents.decode("utf-8")))) |
|
|
|
data_np = np.array(data[1:], dtype=object) |
|
|
|
|
|
encoded_columns = encoder.fit_transform(data_np[:, 3].reshape(-1, 1)) |
|
data_np = np.delete(data_np, 3, axis=1) |
|
data_np = np.concatenate((data_np, encoded_columns.toarray()), axis=1) |
|
data_np = np.array(data_np, dtype=float) |
|
|
|
|
|
X = data_np[:, :-1] |
|
|
|
|
|
y = data_np[:, -1] |
|
y = np.ravel(y) |
|
|
|
|
|
X = torch.tensor(X, dtype=torch.float32) |
|
y = torch.tensor(y, dtype=torch.float32) |
|
|
|
|
|
criterion = nn.MSELoss() |
|
optimizer = optim.Adam(model.parameters(), lr=0.0001) |
|
|
|
|
|
kf = KFold(n_splits=4) |
|
accuracies = [] |
|
|
|
epochs = 25 |
|
|
|
for epoch in range(epochs): |
|
for train_index, test_index in kf.split(X): |
|
X_train, X_test = X[train_index], X[test_index] |
|
y_train, y_test = y[train_index], y[test_index] |
|
|
|
optimizer.zero_grad() |
|
|
|
|
|
outputs = model(X_train) |
|
loss = criterion(outputs, y_train.unsqueeze(1)) |
|
|
|
|
|
loss.backward() |
|
optimizer.step() |
|
|
|
predictions = model(X_test) |
|
rmse = np.sqrt(mean_squared_error(y_test, predictions.detach().numpy())) |
|
accuracies.append(rmse) |
|
|
|
average_rmse = sum(accuracies) / len(accuracies) |
|
print(f"Epoch: {epoch+1}, Average RMSE: {average_rmse}") |
|
|
|
dump(model, "model.joblib") |
|
|
|
return {"filename": file.filename, "average_rmse": average_rmse} |
|
|