|
from torchvision import models |
|
import torch.nn as nn |
|
from tqdm import tqdm |
|
import torch |
|
import mlflow |
|
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score |
|
from sklearn.ensemble import RandomForestClassifier |
|
|
|
|
|
|
|
def train_one_epoch(model, trainloader, criterion, optimizer, device): |
|
model.train() |
|
running_loss = 0.0 |
|
correct = 0 |
|
total = 0 |
|
|
|
for images, labels in tqdm(trainloader, desc="Training", leave=False): |
|
images, labels = images.to(device), labels.to(device) |
|
|
|
|
|
outputs = model(images) |
|
loss = criterion(outputs, labels) |
|
|
|
|
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
|
|
|
|
running_loss += loss.item() |
|
_, predicted = outputs.max(1) |
|
correct += predicted.eq(labels).sum().item() |
|
total += labels.size(0) |
|
|
|
epoch_loss = running_loss / len(trainloader) |
|
epoch_accuracy = 100.0 * correct / total |
|
return epoch_loss, epoch_accuracy |
|
|
|
|
|
|
|
@torch.no_grad() |
|
def evaluate(model, testloader, criterion, device): |
|
model.eval() |
|
running_loss = 0.0 |
|
correct = 0 |
|
total = 0 |
|
all_labels = [] |
|
all_predictions = [] |
|
|
|
for images, labels in tqdm(testloader, desc="Evaluating", leave=False): |
|
images, labels = images.to(device), labels.to(device) |
|
|
|
|
|
outputs = model(images) |
|
loss = criterion(outputs, labels) |
|
|
|
|
|
running_loss += loss.item() |
|
_, predicted = outputs.max(1) |
|
correct += predicted.eq(labels).sum().item() |
|
total += labels.size(0) |
|
|
|
|
|
all_labels.extend(labels.cpu().numpy()) |
|
all_predictions.extend(predicted.cpu().numpy()) |
|
|
|
epoch_loss = running_loss / len(testloader) |
|
|
|
|
|
epoch_accuracy = accuracy_score(all_labels, all_predictions, normalize=True) * 100 |
|
precision = precision_score(all_labels, all_predictions, average="weighted") |
|
recall = recall_score(all_labels, all_predictions, average="weighted") |
|
f1 = f1_score(all_labels, all_predictions, average="weighted") |
|
|
|
return epoch_loss, epoch_accuracy, precision, recall, f1 |
|
|
|
|
|
|
|
def train_and_evaluate( |
|
model, |
|
trainloader, |
|
testloader, |
|
criterion, |
|
optimizer, |
|
device, |
|
epochs, |
|
use_mlflow=False, |
|
): |
|
""" |
|
Train and evaluate the model. |
|
|
|
Args: |
|
model (nn.Module): The neural network model. |
|
trainloader (DataLoader): DataLoader for training data. |
|
testloader (DataLoader): DataLoader for test data. |
|
criterion (nn.Module): Loss function. |
|
optimizer (optim.Optimizer): Optimizer. |
|
device (torch.device): Device to train on ('cuda' or 'cpu'). |
|
epochs (int): Number of epochs to train. |
|
|
|
Returns: |
|
dict: Training and evaluation statistics. |
|
""" |
|
history = { |
|
"train_loss": [], |
|
"train_acc": [], |
|
"test_loss": [], |
|
"test_acc": [], |
|
"precision": [], |
|
"recall": [], |
|
"f1": [], |
|
} |
|
|
|
model.to(device) |
|
|
|
for epoch in range(epochs): |
|
print(f"Epoch {epoch + 1}/{epochs}") |
|
|
|
|
|
train_loss, train_acc = train_one_epoch( |
|
model, trainloader, criterion, optimizer, device |
|
) |
|
print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%") |
|
|
|
|
|
test_loss, test_acc, precision, recall, f1 = evaluate( |
|
model, testloader, criterion, device |
|
) |
|
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%") |
|
|
|
|
|
history["train_loss"].append(train_loss) |
|
history["train_acc"].append(train_acc) |
|
history["test_loss"].append(test_loss) |
|
history["test_acc"].append(test_acc) |
|
history["precision"].append(precision) |
|
history["recall"].append(recall) |
|
history["f1"].append(f1) |
|
|
|
if use_mlflow: |
|
mlflow.log_metric("epoch", epoch) |
|
mlflow.log_metric("train_loss", train_loss) |
|
mlflow.log_metric("train_acc", train_acc) |
|
mlflow.log_metric("test_loss", test_loss) |
|
mlflow.log_metric("test_acc", test_acc) |
|
mlflow.log_metric("precision", precision) |
|
mlflow.log_metric("recall", recall) |
|
mlflow.log_metric("f1", f1) |
|
return history |
|
|
|
|
|
def set_parameter_requires_grad(model, feature_extracting): |
|
if feature_extracting: |
|
for param in model.parameters(): |
|
param.requires_grad = False |
|
|
|
|
|
def initialize_model( |
|
model_name, |
|
num_classes, |
|
feature_extract=True, |
|
use_pretrained=True, |
|
hidden_size=512, |
|
image_shape=(224, 224, 3), |
|
): |
|
|
|
|
|
model_ft = None |
|
|
|
if model_name == "resnet": |
|
""" Resnet18 |
|
""" |
|
model_ft = models.resnet18(pretrained=use_pretrained) |
|
set_parameter_requires_grad(model_ft, feature_extract) |
|
num_ftrs = model_ft.fc.in_features |
|
model_ft.fc = nn.Linear(num_ftrs, num_classes) |
|
|
|
elif model_name == "alexnet": |
|
""" Alexnet |
|
""" |
|
model_ft = models.alexnet(pretrained=use_pretrained) |
|
set_parameter_requires_grad(model_ft, feature_extract) |
|
num_ftrs = model_ft.classifier[6].in_features |
|
model_ft.classifier[6] = nn.Linear(num_ftrs, num_classes) |
|
|
|
elif model_name == "vgg": |
|
""" VGG11_bn |
|
""" |
|
model_ft = models.vgg11_bn(pretrained=use_pretrained) |
|
set_parameter_requires_grad(model_ft, feature_extract) |
|
num_ftrs = model_ft.classifier[6].in_features |
|
model_ft.classifier[6] = nn.Linear(num_ftrs, num_classes) |
|
|
|
elif model_name == "squeezenet": |
|
""" Squeezenet |
|
""" |
|
model_ft = models.squeezenet1_0(pretrained=use_pretrained) |
|
set_parameter_requires_grad(model_ft, feature_extract) |
|
model_ft.classifier[1] = nn.Conv2d( |
|
512, num_classes, kernel_size=(1, 1), stride=(1, 1) |
|
) |
|
model_ft.num_classes = num_classes |
|
|
|
elif model_name == "densenet": |
|
""" Densenet |
|
""" |
|
model_ft = models.densenet121(pretrained=use_pretrained) |
|
set_parameter_requires_grad(model_ft, feature_extract) |
|
num_ftrs = model_ft.classifier.in_features |
|
model_ft.classifier = nn.Linear(num_ftrs, num_classes) |
|
|
|
elif model_name == "custom_mlp": |
|
""" Custom MLP |
|
""" |
|
model_ft = nn.Sequential( |
|
nn.Linear(image_shape[0] * image_shape[1] * image_shape[2], hidden_size), |
|
nn.ReLU(), |
|
nn.Linear(hidden_size, hidden_size), |
|
nn.ReLU(), |
|
nn.Linear(hidden_size, hidden_size // 2), |
|
nn.ReLU(), |
|
nn.Linear(hidden_size // 2, num_classes), |
|
) |
|
elif model_name == "custom_cnn": |
|
""" Custom CNN |
|
""" |
|
model_ft = nn.Sequential( |
|
nn.Conv2d(3, 16, 3, 1, 1), |
|
nn.ReLU(), |
|
nn.MaxPool2d(2), |
|
nn.Conv2d(16, 32, 3, 1, 1), |
|
nn.ReLU(), |
|
nn.MaxPool2d(2), |
|
nn.Conv2d(32, 64, 3, 1, 1), |
|
nn.ReLU(), |
|
nn.MaxPool2d(2), |
|
nn.Flatten(), |
|
nn.Linear(64 * 28 * 28, hidden_size), |
|
nn.ReLU(), |
|
nn.Linear(hidden_size, num_classes), |
|
) |
|
elif model_name == "random_forest": |
|
""" Random Forest |
|
""" |
|
model_ft = RandomForestClassifier(n_estimators=100, random_state=42) |
|
|
|
else: |
|
print("Invalid model name, exiting...") |
|
exit() |
|
|
|
return model_ft |
|
|