# Michael Peres 30/03/2024. # Model for binary classification. # Import Statements for model. from torchvision.transforms import ToTensor from torchvision.transforms import v2 from torchvision import transforms import matplotlib.pyplot as plt from time import time from torch import nn import pandas as pd import numpy as np import torch, os from torch.optim.lr_scheduler import ReduceLROnPlateau from tqdm import tqdm # Going to be using keras input_shape = (224, 224, 3) # device = ( # "cuda" # if torch.cuda.is_available() # else "mps" # if torch.backends.mps.is_available() # else "cpu" # ) device = "cpu" #having trouble with mpu on mac, so will use cpu for now until main pc is available. print(f"Using {device} device for training/inference.") if device == "cuda": print(f"GPU being used: {torch.cuda.get_device_name(0)}") # We have a custom dataset that we will be using in this example. class MakiAlexNet(nn.Module): def __init__(self, num_classes=2): super(MakiAlexNet, self).__init__() self.num_classes = num_classes self.conv1 = nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, stride=4, padding=1) # LazyConv2d determine the input channels automatically. self.conv2 = nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, padding=2) self.conv3 = nn.Conv2d(in_channels=256, out_channels=384, kernel_size=3, padding=1) # 256, 384 self.conv4 = nn.Conv2d(in_channels=384, out_channels=384, kernel_size=3, padding=1) # 384,384 self.conv5 = nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3, padding=1) # 384, 256 self.activation = nn.ReLU() self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2) # Replace Flatten with GlobalAvgPool2d self.gap = nn.AvgPool2d(5) # Adjust output size if needed # In this case LazyLinear is really useful after flattening, # such that abstraction is made from the initial output layer and the linear layer nodes. self.fcc = nn.Sequential( nn.Flatten(), nn.Linear(6400, 4096), # nn.Linear(in_features=256, out_features=4096), # adaptation to code # nn.LazyLinear(4096), # this defines the output neurons size and takes in the leading input channels. nn.ReLU(), nn.Dropout(p=0.5), # nn.LazyLinear(4096), nn.Linear(4096, 4096), nn.ReLU(), nn.Dropout(p=0.5), # nn.LazyLinear(self.num_classes), nn.Linear(4096, self.num_classes) ) # Create an empty dictionary to store layer outputs self.layer_outputs = {} # Register hooks for desired layers self.conv5.register_forward_hook(self._save_layer_output) def _save_layer_output(self, module, input, output): self.layer_outputs[module.__class__.__name__] = output def forward(self, x): """Defined forward pass of AlexNet for learning left or right prediction.""" x = self.conv1(x) # wider x = self.activation(x) x = self.maxpool(x) # down sample. x = self.conv2(x) # wider. x = self.activation(x) x = self.maxpool(x) # down sample. x = self.conv3(x) # wider. x = self.activation(x) x = self.conv4(x) x = self.activation(x) x = self.conv5(x) x = self.activation(x) x = self.maxpool(x) # down sample. # x = self.gap(x).squeeze(-1).squeeze(-1) x = self.fcc(x) # Flatten and passed to Linear layer to 2 classes. return x def init_weights(m): if isinstance(m, nn.Conv2d): nn.init.xavier_uniform_(m.weight) if m.bias is not None: m.bias.data.fill_(0.01) elif isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) m.bias.data.fill_(0.01) if __name__ == "__main__": from dataset_creation import test_loader, train_loader # Initiate the custom dataloaders and datasets here. # Running the model to learn, also introducing good features to make it learn better like a cosine scheduler for the learning rate. EPOCH = 35 model = MakiAlexNet() # model.apply(init_weights) # torch.load("alexnet_cognitive.pth", map_location=device) model.to(device) print(model) print("Model has been tested and is working correctly.") # Running the model with test data. criterion = nn.CrossEntropyLoss() optimizer = torch.optim.SGD(model.parameters(), lr=0.00001*5, weight_decay=0.0001, momentum=0.9) # Define learning rate scheduler scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3) if os.path.exists("best_model.txt"): with open("best_model.txt", "r") as file: best_accuracy = float(file.read()) else: best_accuracy = 0.0 for epoch in tqdm(range(EPOCH), desc="Training Epoch Cycle"): model.train() # Set model to training mode running_loss = 0.0 for i, data in enumerate(train_loader, 0): if i % 10 == 0: print(f"Internal Loop of batches: {i}") inputs, labels = data # print(type(labels), labels) inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() train_loss = running_loss / len(train_loader) print(f'Epoch [{epoch + 1}] training loss: {train_loss:.3f}') # Validation phase model.eval() # Set model to evaluation mode val_running_loss = 0.0 val_correct = 0 val_total = 0 with torch.no_grad(): for data in test_loader: # Assuming test_loader is used as a validation loader inputs, labels = data inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) loss = criterion(outputs, labels) val_running_loss += loss.item() _, predicted = torch.max(outputs.data, 1) val_total += labels.size(0) val_correct += (predicted == labels).sum().item() val_loss = val_running_loss / len(test_loader) val_accuracy = 100 * val_correct / val_total print(f'Epoch [{epoch + 1}] validation loss: {val_loss:.3f}, accuracy: {val_accuracy:.2f}%') if val_accuracy > best_accuracy: best_accuracy = val_accuracy torch.save(model.state_dict(), "alexnet_cognitive_gap.pth") with open("best_model.txt", "w") as file: file.write(f"{best_accuracy}") # Update the LR scheduler with validation loss scheduler.step(val_loss) # print(f'LR: {scheduler.get_last_lr()}')