Spaces:
Runtime error
Runtime error
| """Module to define the model.""" | |
| # Resources | |
| # https://lightning.ai/docs/pytorch/stable/starter/introduction.html | |
| # https://lightning.ai/docs/pytorch/stable/starter/converting.html | |
| # https://lightning.ai/docs/pytorch/stable/notebooks/lightning_examples/cifar10-baseline.html | |
| import modules.config as config | |
| import pytorch_lightning as pl | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import torch.optim as optim | |
| import torchinfo | |
| from torch.optim.lr_scheduler import OneCycleLR | |
| from torch_lr_finder import LRFinder | |
| from torchmetrics import Accuracy | |
| # What is the start LR and weight decay you'd prefer? | |
| PREFERRED_START_LR = config.PREFERRED_START_LR | |
| PREFERRED_WEIGHT_DECAY = config.PREFERRED_WEIGHT_DECAY | |
| def detailed_model_summary(model, input_size): | |
| """Define a function to print the model summary.""" | |
| # https://github.com/TylerYep/torchinfo | |
| torchinfo.summary( | |
| model, | |
| input_size=input_size, | |
| batch_dim=0, | |
| col_names=( | |
| "input_size", | |
| "kernel_size", | |
| "output_size", | |
| "num_params", | |
| "trainable", | |
| ), | |
| verbose=1, | |
| col_width=16, | |
| ) | |
| ############# Assignment 12 Model ############# | |
| # This is for Assignment 12 | |
| # Model used from Assignment 10 and converted to lightning model | |
| class CustomResNet(pl.LightningModule): | |
| """This defines the structure of the NN.""" | |
| # Class variable to print shape | |
| print_shape = False | |
| # Default dropout value | |
| dropout_value = 0.02 | |
| def __init__(self): | |
| super().__init__() | |
| # Define loss function | |
| # https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html | |
| self.loss_function = torch.nn.CrossEntropyLoss() | |
| # Define accuracy function | |
| # https://torchmetrics.readthedocs.io/en/stable/classification/accuracy.html | |
| self.accuracy_function = Accuracy(task="multiclass", num_classes=10) | |
| # Add results dictionary | |
| self.results = { | |
| "train_loss": [], | |
| "train_acc": [], | |
| "test_loss": [], | |
| "test_acc": [], | |
| "val_loss": [], | |
| "val_acc": [], | |
| } | |
| # Save misclassified images | |
| self.misclassified_image_data = {"images": [], "ground_truths": [], "predicted_vals": []} | |
| # LR | |
| self.learning_rate = PREFERRED_START_LR | |
| # Model Notes | |
| # PrepLayer - Conv 3x3 s1, p1) >> BN >> RELU [64k] | |
| # 1. Input size: 32x32x3 | |
| self.prep = nn.Sequential( | |
| nn.Conv2d( | |
| in_channels=3, | |
| out_channels=64, | |
| kernel_size=(3, 3), | |
| stride=1, | |
| padding=1, | |
| dilation=1, | |
| bias=False, | |
| ), | |
| nn.BatchNorm2d(64), | |
| nn.ReLU(), | |
| nn.Dropout(self.dropout_value), | |
| ) | |
| # Layer1: X = Conv 3x3 (s1, p1) >> MaxPool2D >> BN >> RELU [128k] | |
| self.layer1_x = nn.Sequential( | |
| nn.Conv2d( | |
| in_channels=64, | |
| out_channels=128, | |
| kernel_size=(3, 3), | |
| stride=1, | |
| padding=1, | |
| dilation=1, | |
| bias=False, | |
| ), | |
| nn.MaxPool2d(kernel_size=2, stride=2), | |
| nn.BatchNorm2d(128), | |
| nn.ReLU(), | |
| nn.Dropout(self.dropout_value), | |
| ) | |
| # Layer1: R1 = ResBlock( (Conv-BN-ReLU-Conv-BN-ReLU))(X) [128k] | |
| self.layer1_r1 = nn.Sequential( | |
| nn.Conv2d( | |
| in_channels=128, | |
| out_channels=128, | |
| kernel_size=(3, 3), | |
| stride=1, | |
| padding=1, | |
| dilation=1, | |
| bias=False, | |
| ), | |
| nn.BatchNorm2d(128), | |
| nn.ReLU(), | |
| nn.Dropout(self.dropout_value), | |
| nn.Conv2d( | |
| in_channels=128, | |
| out_channels=128, | |
| kernel_size=(3, 3), | |
| stride=1, | |
| padding=1, | |
| dilation=1, | |
| bias=False, | |
| ), | |
| nn.BatchNorm2d(128), | |
| nn.ReLU(), | |
| nn.Dropout(self.dropout_value), | |
| ) | |
| # Layer 2: Conv 3x3 [256k], MaxPooling2D, BN, ReLU | |
| self.layer2 = nn.Sequential( | |
| nn.Conv2d( | |
| in_channels=128, | |
| out_channels=256, | |
| kernel_size=(3, 3), | |
| stride=1, | |
| padding=1, | |
| dilation=1, | |
| bias=False, | |
| ), | |
| nn.MaxPool2d(kernel_size=2, stride=2), | |
| nn.BatchNorm2d(256), | |
| nn.ReLU(), | |
| nn.Dropout(self.dropout_value), | |
| ) | |
| # Layer 3: X = Conv 3x3 (s1, p1) >> MaxPool2D >> BN >> RELU [512k] | |
| self.layer3_x = nn.Sequential( | |
| nn.Conv2d( | |
| in_channels=256, | |
| out_channels=512, | |
| kernel_size=(3, 3), | |
| stride=1, | |
| padding=1, | |
| dilation=1, | |
| bias=False, | |
| ), | |
| nn.MaxPool2d(kernel_size=2, stride=2), | |
| nn.BatchNorm2d(512), | |
| nn.ReLU(), | |
| nn.Dropout(self.dropout_value), | |
| ) | |
| # Layer 3: R2 = ResBlock( (Conv-BN-ReLU-Conv-BN-ReLU))(X) [512k] | |
| self.layer3_r2 = nn.Sequential( | |
| nn.Conv2d( | |
| in_channels=512, | |
| out_channels=512, | |
| kernel_size=(3, 3), | |
| stride=1, | |
| padding=1, | |
| dilation=1, | |
| bias=False, | |
| ), | |
| nn.BatchNorm2d(512), | |
| nn.ReLU(), | |
| nn.Dropout(self.dropout_value), | |
| nn.Conv2d( | |
| in_channels=512, | |
| out_channels=512, | |
| kernel_size=(3, 3), | |
| stride=1, | |
| padding=1, | |
| dilation=1, | |
| bias=False, | |
| ), | |
| nn.BatchNorm2d(512), | |
| nn.ReLU(), | |
| nn.Dropout(self.dropout_value), | |
| ) | |
| # MaxPooling with Kernel Size 4 | |
| # If stride is None, it is set to kernel_size | |
| self.maxpool = nn.MaxPool2d(kernel_size=4, stride=4) | |
| # FC Layer | |
| self.fc = nn.Linear(512, 10) | |
| # Save hyperparameters | |
| self.save_hyperparameters() | |
| def print_view(self, x, msg=""): | |
| """Print shape of the model""" | |
| if self.print_shape: | |
| if msg != "": | |
| print(msg, "\n\t", x.shape, "\n") | |
| else: | |
| print(x.shape) | |
| def forward(self, x): | |
| """Forward pass""" | |
| # PrepLayer | |
| x = self.prep(x) | |
| self.print_view(x, "PrepLayer") | |
| # Layer 1 | |
| x = self.layer1_x(x) | |
| self.print_view(x, "Layer 1, X") | |
| r1 = self.layer1_r1(x) | |
| self.print_view(r1, "Layer 1, R1") | |
| x = x + r1 | |
| self.print_view(x, "Layer 1, X + R1") | |
| # Layer 2 | |
| x = self.layer2(x) | |
| self.print_view(x, "Layer 2") | |
| # Layer 3 | |
| x = self.layer3_x(x) | |
| self.print_view(x, "Layer 3, X") | |
| r2 = self.layer3_r2(x) | |
| self.print_view(r2, "Layer 3, R2") | |
| x = x + r2 | |
| self.print_view(x, "Layer 3, X + R2") | |
| # MaxPooling | |
| x = self.maxpool(x) | |
| self.print_view(x, "Max Pooling") | |
| # FC Layer | |
| # Reshape before FC such that it becomes 1D | |
| x = x.view(x.shape[0], -1) | |
| self.print_view(x, "Reshape before FC") | |
| x = self.fc(x) | |
| self.print_view(x, "After FC") | |
| # Softmax | |
| return F.log_softmax(x, dim=-1) | |
| # Alert: Remove this function later as Tuner is now being used to automatically find the best LR | |
| def find_optimal_lr(self, train_loader): | |
| """Use LR Finder to find the best starting learning rate""" | |
| # https://github.com/davidtvs/pytorch-lr-finder | |
| # https://github.com/davidtvs/pytorch-lr-finder#notes | |
| # https://github.com/davidtvs/pytorch-lr-finder/blob/master/torch_lr_finder/lr_finder.py | |
| # New optimizer with default LR | |
| tmp_optimizer = optim.Adam(self.parameters(), lr=PREFERRED_START_LR, weight_decay=PREFERRED_WEIGHT_DECAY) | |
| # Create LR finder object | |
| lr_finder = LRFinder(self, optimizer=tmp_optimizer, criterion=self.loss_function) | |
| lr_finder.range_test(train_loader=train_loader, end_lr=10, num_iter=100) | |
| # https://github.com/davidtvs/pytorch-lr-finder/issues/88 | |
| _, suggested_lr = lr_finder.plot(suggest_lr=True) | |
| lr_finder.reset() | |
| # plot.figure.savefig("LRFinder - Suggested Max LR.png") | |
| print(f"Suggested Max LR: {suggested_lr}") | |
| if suggested_lr is None: | |
| suggested_lr = PREFERRED_START_LR | |
| return suggested_lr | |
| # optimiser function | |
| def configure_optimizers(self): | |
| """Add ADAM optimizer to the lightning module""" | |
| optimizer = optim.Adam(self.parameters(), lr=self.learning_rate, weight_decay=PREFERRED_WEIGHT_DECAY) | |
| # Percent start for OneCycleLR | |
| # Handles the case where max_epochs is less than 5 | |
| percent_start = 5 / int(self.trainer.max_epochs) | |
| if percent_start >= 1: | |
| percent_start = 0.3 | |
| # https://lightning.ai/docs/pytorch/stable/common/optimization.html#total-stepping-batches | |
| scheduler_dict = { | |
| "scheduler": OneCycleLR( | |
| optimizer=optimizer, | |
| max_lr=self.learning_rate, | |
| total_steps=int(self.trainer.estimated_stepping_batches), | |
| pct_start=percent_start, | |
| div_factor=100, | |
| three_phase=False, | |
| anneal_strategy="linear", | |
| final_div_factor=100, | |
| verbose=False, | |
| ), | |
| "interval": "step", | |
| } | |
| return {"optimizer": optimizer, "lr_scheduler": scheduler_dict} | |
| # Define loss function | |
| def compute_loss(self, prediction, target): | |
| """Compute Loss""" | |
| # Calculate loss | |
| loss = self.loss_function(prediction, target) | |
| return loss | |
| # Define accuracy function | |
| def compute_accuracy(self, prediction, target): | |
| """Compute accuracy""" | |
| # Calculate accuracy | |
| acc = self.accuracy_function(prediction, target) | |
| return acc * 100 | |
| # Function to compute loss and accuracy for both training and validation | |
| def compute_metrics(self, batch): | |
| """Function to calculate loss and accuracy""" | |
| # Get data and target from batch | |
| data, target = batch | |
| # Generate predictions using model | |
| pred = self(data) | |
| # Calculate loss for the batch | |
| loss = self.compute_loss(prediction=pred, target=target) | |
| # Calculate accuracy for the batch | |
| acc = self.compute_accuracy(prediction=pred, target=target) | |
| return loss, acc | |
| # Get misclassified images based on how many images to return | |
| def store_misclassified_images(self): | |
| """Get an array of misclassified images""" | |
| self.misclassified_image_data = {"images": [], "ground_truths": [], "predicted_vals": []} | |
| # Initialize the model to evaluation mode | |
| self.eval() | |
| # Disable gradient calculation while testing | |
| with torch.no_grad(): | |
| for batch in self.trainer.test_dataloaders: | |
| # Move data and labels to device | |
| data, target = batch | |
| data, target = data.to(self.device), target.to(self.device) | |
| # Predict using model | |
| pred = self(data) | |
| # Get the index of the max log-probability | |
| output = pred.argmax(dim=1) | |
| # Save the incorrect predictions | |
| incorrect_indices = ~output.eq(target) | |
| # Store images incorrectly predicted, generated predictions and the actual value | |
| self.misclassified_image_data["images"].extend(data[incorrect_indices]) | |
| self.misclassified_image_data["ground_truths"].extend(target[incorrect_indices]) | |
| self.misclassified_image_data["predicted_vals"].extend(output[incorrect_indices]) | |
| # training function | |
| def training_step(self, batch, batch_idx): | |
| """Training step""" | |
| # Compute loss and accuracy | |
| loss, acc = self.compute_metrics(batch) | |
| self.log("train_loss", loss, prog_bar=True, on_epoch=True, logger=True) | |
| self.log("train_acc", acc, prog_bar=True, on_epoch=True, logger=True) | |
| # Return training loss | |
| return loss | |
| # validation function | |
| def validation_step(self, batch, batch_idx): | |
| """Validation step""" | |
| # Compute loss and accuracy | |
| loss, acc = self.compute_metrics(batch) | |
| self.log("val_loss", loss, prog_bar=True, on_epoch=True, logger=True) | |
| self.log("val_acc", acc, prog_bar=True, on_epoch=True, logger=True) | |
| # Return validation loss | |
| return loss | |
| # test function will just use validation step | |
| def test_step(self, batch, batch_idx): | |
| """Test step""" | |
| # Compute loss and accuracy | |
| loss, acc = self.compute_metrics(batch) | |
| self.log("test_loss", loss, prog_bar=False, on_epoch=True, logger=True) | |
| self.log("test_acc", acc, prog_bar=False, on_epoch=True, logger=True) | |
| # Return validation loss | |
| return loss | |
| # At the end of train epoch append the training loss and accuracy to an instance variable called results | |
| def on_train_epoch_end(self): | |
| """On train epoch end""" | |
| # Append training loss and accuracy to results | |
| self.results["train_loss"].append(self.trainer.callback_metrics["train_loss"].detach().item()) | |
| self.results["train_acc"].append(self.trainer.callback_metrics["train_acc"].detach().item()) | |
| # At the end of validation epoch append the validation loss and accuracy to an instance variable called results | |
| def on_validation_epoch_end(self): | |
| """On validation epoch end""" | |
| # Append validation loss and accuracy to results | |
| self.results["test_loss"].append(self.trainer.callback_metrics["val_loss"].detach().item()) | |
| self.results["test_acc"].append(self.trainer.callback_metrics["val_acc"].detach().item()) | |
| # # At the end of test epoch append the test loss and accuracy to an instance variable called results | |
| # def on_test_epoch_end(self): | |
| # """On test epoch end""" | |
| # # Append test loss and accuracy to results | |
| # self.results["test_loss"].append(self.trainer.callback_metrics["test_loss"].detach().item()) | |
| # self.results["test_acc"].append(self.trainer.callback_metrics["test_acc"].detach().item()) | |
| # At the end of test save misclassified images, the predictions and ground truth in an instance variable called misclassified_image_data | |
| def on_test_end(self): | |
| """On test end""" | |
| print("Test ended! Saving misclassified images") | |
| # Get misclassified images | |
| self.store_misclassified_images() | |