|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import numpy as np |
|
from fastapi import FastAPI, UploadFile, File |
|
from sklearn.metrics import mean_squared_error |
|
import pandas as pd |
|
from sklearn.model_selection import train_test_split |
|
import csv |
|
import io |
|
|
|
|
|
|
|
class DNN(nn.Module): |
|
def __init__(self, input_size, hidden_size, output_size, num_hidden_layers): |
|
super(DNN, self).__init__() |
|
self.fc1 = nn.Linear(input_size, hidden_size) |
|
self.relu1 = nn.ReLU() |
|
self.hidden_layers = nn.ModuleList() |
|
for _ in range(num_hidden_layers): |
|
self.hidden_layers.append(nn.Linear(hidden_size, hidden_size)) |
|
self.hidden_layers.append(nn.ReLU()) |
|
self.fc3 = nn.Linear(hidden_size, output_size) |
|
|
|
def forward(self, x): |
|
x = self.fc1(x) |
|
x = self.relu1(x) |
|
for layer in self.hidden_layers: |
|
x = layer(x) |
|
x = self.fc3(x) |
|
return x |
|
|
|
|
|
|
|
model = DNN(input_size=6, hidden_size=64, output_size=1, num_hidden_layers=32) |
|
|
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
model = model.to(device) |
|
model.load_state_dict(torch.load("model_weights.pth", map_location=device)) |
|
|
|
|
|
app = FastAPI(docs_url="/", redoc_url="/new_redoc") |
|
|
|
|
|
|
|
@app.get( |
|
"/generate/{Soil_Quality}/{Seed_Variety}/{Fertilizer_Amount_kg_per_hectare}/{Sunny_Days}/{Rainfall_mm}/{Irrigation_Schedule}" |
|
) |
|
def generate( |
|
Soil_Quality: float, |
|
Seed_Variety: float, |
|
Fertilizer_Amount_kg_per_hectare: float, |
|
Sunny_Days: float, |
|
Rainfall_mm: float, |
|
Irrigation_Schedule: float, |
|
): |
|
global model |
|
|
|
|
|
input_data = [ |
|
Soil_Quality, |
|
Seed_Variety, |
|
Fertilizer_Amount_kg_per_hectare, |
|
Sunny_Days, |
|
Rainfall_mm, |
|
Irrigation_Schedule, |
|
] |
|
|
|
input_data = torch.tensor([input_data], dtype=torch.float32) |
|
input_data = input_data.to(device) |
|
prediction = model(input_data) |
|
return {"prediction": prediction.item()} |
|
|
|
@app.post("/train") |
|
async def train( |
|
trainDatafile: UploadFile = File(...), |
|
testDatafile: UploadFile = File(...), |
|
epochs: int = 100, |
|
): |
|
global model |
|
|
|
contents1 = await trainDatafile.read() |
|
train_data = pd.read_csv(io.StringIO(contents1.decode("utf-8"))) |
|
|
|
contents2 = await testDatafile.read() |
|
test_data = pd.read_csv(io.StringIO(contents2.decode("utf-8"))) |
|
|
|
|
|
X_train = train_data.drop("Yield_kg_per_hectare", axis=1).values |
|
y_train = train_data["Yield_kg_per_hectare"].values |
|
X_test = test_data.drop("Yield_kg_per_hectare", axis=1).values |
|
y_test = test_data["Yield_kg_per_hectare"].values |
|
|
|
|
|
X_train = torch.tensor(X_train, dtype=torch.float32) |
|
X_train = X_train.to(device) |
|
y_train = torch.tensor(y_train, dtype=torch.float32) |
|
y_train = y_train.to(device) |
|
|
|
X_test = torch.tensor(X_test, dtype=torch.float32) |
|
X_test = X_test.to(device) |
|
y_test = torch.tensor(y_test, dtype=torch.float32) |
|
|
|
|
|
criterion = nn.MSELoss() |
|
optimizer = optim.Adam(model.parameters(), lr=0.001) |
|
|
|
rmseList = [] |
|
|
|
for epoch in range(epochs): |
|
optimizer.zero_grad() |
|
|
|
|
|
outputs = model(X_train) |
|
loss = criterion(outputs, y_train.unsqueeze(1)) |
|
|
|
|
|
loss.backward() |
|
optimizer.step() |
|
|
|
predictions = model(X_test) |
|
rmse = np.sqrt( |
|
mean_squared_error( |
|
y_test.cpu().detach().numpy(), predictions.cpu().detach().numpy() |
|
) |
|
) |
|
print( |
|
f"Epoch: {epoch+1}, RMSE: {float(rmse)}, Loss: {float(np.sqrt(loss.cpu().detach().numpy()))}" |
|
) |
|
rmseList.append(float(rmse)) |
|
|
|
torch.save(model.state_dict(), "model_weights.pth") |
|
|
|
return {"rmse": rmseList} |
|
|