''' Collection of boilerplate and utility functions for PyTorch's processing pipeline. Many are adapted and expanded from https://github.com/mrdbourke/pytorch-deep-learning/ ''' import torch from torch import nn from torch.utils.data import DataLoader from torch.utils.tensorboard import SummaryWriter from torchinfo import summary import torchmetrics import numpy as np import requests from datetime import datetime import os from pathlib import Path import zipfile from tqdm.auto import tqdm import matplotlib.pyplot as plt from typing import Callable #### Misc Util Functions #### def set_seeds(seed: int = 42): '''Set both torch and torch.cuda seeds.''' torch.manual_seed(seed) torch.cuda.manual_seed(seed) #### Training & Testing Functions #### def train_combinations(combinations: dict[str, tuple[str, str, str, int, str]], model_factories: dict[str, Callable[[], nn.Module]], train_dataloaders: dict[str, DataLoader], optimiser_factories: dict[str, Callable[[nn.Module], torch.optim.Optimizer]], test_dataloader: DataLoader, loss_fn: nn.Module, metric_name_and_fn: tuple[str, Callable[[torch.Tensor, torch.Tensor], torch.Tensor]], reset_seed: int = 42, device: torch.device = 'cuda' if torch.cuda.is_available() else 'cpu', show_progress_bar = True): '''Run a series of modelling tasks by defining combinations of models, dataloaders, optimisers and epochs, as well as an optional previously-fit combination to start from (e.g. for a combination which is the same as a previous one but with more epochs or different training data). Models' state_dicts are saved in ./models, and evaluation metrics in ./runs (and also printed throughout). :param combinations: The experiment program, a dictionary of VERY SHORT keys (used as model naming prefixes) mapped to tuples containing dictionary keys to the model ingredients: (model key, train dataloader key, optimiser key, epochs, OPTIONAL KEY OF PREVIOUS COMBINATION TO START FROM) :param model_factories: Named model-producing functions :param train_dataloaders: Named dataloaders for training data :param optimiser_factories: Named optimiser-generating functions (e.g. dict(Adam001 = lambda m: torch.optim.Adam(m.parameters(), lr = 0.001))) :param test_dataloader: The dataloader for testing data :param loss_fn: A loss function taking in only prediction and target tensors :param metric_name_and_fn: A tuple of metric name and function taking in only prediction and target tensors :param reset_seed: A seed to re-impose (on both torch and torch.cuda) before every combination is executed (or None to not do so) :param device: A target device to compute on (e.g. 'cuda' or 'cpu') :param show_progress_bar: Show a progress bar for the experiment loop, their nested epoch loops and each of the nested training and testing steps batch loops ''' # Well worth checking labelling issues before the long processing assert all(len(comb) == 5 for comb in combinations.values()), 'Some combinations are not 5-tuples; they should be of the form (model key, train dataloader key, optimiser key, epochs, None or previous combination key)' ms, ds, os, es, bcs = zip(*combinations.values()) # bcs stands for base combinations for keys, ingredients, param_name in [(ms, model_factories, 'model_factories'), (ds, train_dataloaders, 'train_dataloaders'), (os, optimiser_factories, 'optimiser_factories')]: assert not (set_diff := set(keys).difference(ingredients.keys())), f'Combination ingredient(s) {set_diff} not present in the {param_name} dictionary keys' assert not (set_diff := set(bcs).difference([None]).difference(combinations.keys())), f'Base model key(s) {set_diff} not present in the combination dictionary keys' combs_order = {k: i for i, k in enumerate(combinations.keys())} for comb_with_bc, bc, stated_m in {(k, vs[-1], vs[0]) for k, vs in combinations.items() if vs[-1] is not None}: assert combinations[bc][0] == stated_m, f'The stated model for combination {comb_with_bc} ({stated_m}) does not match the one of its stated base combination {bc} ({combinations[bc][0]})' assert combs_order[bc] < combs_order[comb_with_bc], f'Combination {comb_with_bc} (#{combs_order[comb_with_bc]}) requires combination {bc} (#{combs_order[bc]}) but occurs before it in the combination order' saved_models = dict() for experiment_number, combination_key in tqdm(combinations.keys(), desc = 'Modelling combinations', disable = not show_progress_bar): model_key, train_data_key, optimiser_key, epochs, base_comb_key = combinations[combination_key] print(f'[INFO] Experiment number: {experiment_number}') print(f'[INFO] Model: {model_key}') print(f'[INFO] DataLoader: {train_data_key}') print(f'[INFO] Number of epochs: {epochs}') print(f'[INFO] Base model to build on: {base_comb_key}') model = model_factories[model_key]() if base_comb_key is not None: model.load_state_dict(torch.load(saved_models[base_comb_key])) if reset_seed: set_seeds(reset_seed) fit(model = model, train_dataloader = train_dataloaders[train_data_key], test_dataloader = test_dataloader, optimiser = optimiser_factories[optimiser_key](model), loss_fn = loss_fn, metric_name_and_fn = metric_name_and_fn, epochs = epochs, device = device, show_progress_bar = show_progress_bar, model_name = f'Combination {experiment_number}: {combination_key} - {optimiser_key}', writer = tensorboard_writer(experiment_name = train_data_key, model_name = model_key, extra = f'{experiment_number}_{combination_key}_{optimiser_key}_{epochs}_epochs')) saved_models[combination_key] = save_model(model = model, target_dir = 'models', model_name = f'{experiment_number}_{combination_key}_{model_key}_{train_data_key}_{optimiser_key}_{epochs}_epochs.pth') print('-'*50 + '\n') def fit(model: nn.Module, train_dataloader: DataLoader, test_dataloader: DataLoader, optimiser: torch.optim.Optimizer, loss_fn: nn.Module, metric_name_and_fn: tuple[str, Callable[[torch.Tensor, torch.Tensor], torch.Tensor]], epochs: int, writer: torch.utils.tensorboard.writer.SummaryWriter, device: torch.device = 'cuda' if torch.cuda.is_available() else 'cpu', show_progress_bar = True, model_name: str = None) -> dict[str, list]: '''Trains and tests a PyTorch model. Passes a target PyTorch models through train_step() and test_step() functions for a number of epochs, training and testing the model in the same epoch loop. Calculates, prints and stores evaluation metrics throughout. :param model: A PyTorch model to be trained and tested :param train_dataloader: A DataLoader instance for the model to be trained on :param test_dataloader: A DataLoader instance for the model to be tested on :param optimiser: A PyTorch optimizer to help minimize the loss function :param loss_fn: A loss function taking in only prediction and target tensors and returning a tensor (.item() is used where appropriate) :param metric_name_and_fn: A tuple of metric name and function taking in only prediction and target tensors and returning a tensor (.item() is used where appropriate) :param epochs: An integer indicating how many epochs to train for :param writer: A SummaryWriter() instance to log model results to (set to None otherwise). E.g. tensorboard_writer(experiment_name = ..., model_name = ..., extra = f'{experiment_number}_{combination_key}_{optimiser_key}_{epochs}_epochs') :param device: A target device to compute on (e.g. 'cuda' or 'cpu') :param show_progress_bar: Show a progress bar for the global epoch loop and each of the nested training and testing steps batch loops :param model_name: A label to display in the progress bar if shown :return: A dictionary of training and testing loss as well as training and testing accuracy metrics.d Each metric has a value in a list for each epoch: {train_loss: [...], train_metric: [...], test_loss: [...], test_metric: [...]} ''' keys = ['train_loss', 'train_metric', 'test_loss', 'test_metric'] results = {k : [] for k in keys} model.to(device) for epoch in tqdm(range(1, epochs + 1), desc = model_name, disable = not show_progress_bar): train_loss, train_metric = training_step(model = model, dataloader = train_dataloader, loss_fn = loss_fn, metric_fn = metric_name_and_fn[1], optimiser = optimiser, device = device, show_progress_bar = show_progress_bar, epoch = epoch) test_loss, test_metric = testing_step( model = model, dataloader = test_dataloader, loss_fn = loss_fn, metric_fn = metric_name_and_fn[1], device = device, show_progress_bar = show_progress_bar, epoch = epoch) print( f'Epoch: {epoch} | ' f'train_loss: {train_loss:.4f} | ' f'train_metric: {train_metric:.4f} | ' f'test_loss: {test_loss:.4f} | ' f'test_metric: {test_metric:.4f}' ) for k, v in zip(keys, [train_loss, train_metric, test_loss, test_metric]): results[k].append(v) if writer is not None: writer.add_scalars(main_tag = 'Loss', tag_scalar_dict = dict(train_loss = train_loss, test_loss = test_loss), global_step = epoch) writer.add_scalars(main_tag = metric_name_and_fn[0], tag_scalar_dict = dict(train_metric = train_metric, test_metric = test_metric), global_step = epoch) writer.add_graph(model = model, input_to_model = torch.randn(32, 3, 224, 224).to(device)) # pass an example input if writer is not None: writer.close() return results def training_step(model: nn.Module, dataloader: DataLoader, loss_fn: nn.Module, metric_fn: Callable[[torch.Tensor, torch.Tensor], torch.Tensor], optimiser: torch.optim.Optimizer, device: torch.device = 'cuda' if torch.cuda.is_available() else 'cpu', show_progress_bar = True, epoch: int = None) -> tuple[float, float]: '''Trains a PyTorch model for a single epoch. Turns a target PyTorch model to training mode and then runs through all of the required training steps (forward pass, loss calculation, optimizer step, metric calculation). :param model: A PyTorch model to be trained :param dataloader: A DataLoader instance for the model to be trained on :param loss_fn: A loss function taking in only prediction and target tensors and returning a tensor (.item() is used where appropriate) :param metric_fn: A performance metric function taking in only prediction and target tensors and returning a tensor (.item() is used where appropriate) :param optimizer: A PyTorch optimizer to help minimize the loss function :param device: A target device to compute on (e.g. 'cuda' or 'cpu') :param show_progress_bar: Show a progress bar for the training loop over batches :return: A tuple of training loss and training metric ''' model.train() progress_bar = tqdm(enumerate(dataloader), desc = f'{"T" if epoch is None else f"Epoch {epoch} t"}raining batches', disable = not show_progress_bar) train_loss, train_metric = 0, 0 for batch, (X, y) in progress_bar: X, y = X.to(device), y.to(device) y_pred = model(X) loss = loss_fn(y_pred, y) train_loss += loss.item() optimiser.zero_grad() # set_to_none is True by default loss.backward() optimiser.step() train_metric += metric_fn(y_pred, y).item() progress_bar.set_postfix(dict(train_loss = train_loss / (batch + 1), train_metric = train_metric / (batch + 1))) return train_loss / len(dataloader), train_metric / len(dataloader) # batch mean of the metrics def testing_step(model: nn.Module, dataloader: DataLoader, loss_fn: nn.Module, metric_fn: Callable[[torch.Tensor, torch.Tensor], torch.Tensor], device: torch.device = 'cuda' if torch.cuda.is_available() else 'cpu', show_progress_bar = True, epoch: int = None) -> tuple[float, float]: '''Tests a PyTorch model for a single epoch. Turns a target PyTorch model to 'eval' mode and then performs a forward pass on a testing dataset. :param model: A PyTorch model to be tested :param dataloader: A DataLoader instance for the model to be tested on :param loss_fn: A loss function taking in only prediction and target tensors and returning a tensor (.item() is used where appropriate) :param metric_fn: A performance metric function taking in only prediction and target tensors and returning a tensor (.item() is used where appropriate) :param device: A target device to compute on (e.g. 'cuda' or 'cpu') :param show_progress_bar: Show a progress bar for the testing loop over batches :return: A tuple of testing loss and testing metric ''' model.eval() progress_bar = tqdm(enumerate(dataloader), desc = f'{"T" if epoch is None else f"Epoch {epoch} t"}esting batches', disable = not show_progress_bar) test_loss, test_metric = 0, 0 with torch.inference_mode(): for batch, (X, y) in progress_bar: X, y = X.to(device), y.to(device) y_pred = model(X) test_loss += loss_fn( y_pred, y).item() test_metric += metric_fn(y_pred, y).item() progress_bar.set_postfix(dict(test_loss = test_loss / (batch + 1), test_metric = test_metric / (batch + 1))) return test_loss / len(dataloader), test_metric / len(dataloader) # batch mean of the metrics #### I/O Functions #### def save_model(model: nn.Module, target_dir: str, model_name: str): '''Saves a PyTorch model to a target directory. :param model: A target PyTorch model to save :param target_dir: A directory for saving the model to :param model_name: A filename for the saved model; should include either '.pth' or '.pt' as the file extension ''' target_dir_path = Path(target_dir) target_dir_path.mkdir(parents = True, exist_ok = True) assert model_name.endswith('.pth') or model_name.endswith('.pt'), 'model_name should end with ".pt" or ".pth"' model_save_path = target_dir_path / model_name print(f'[INFO] Saving model to: {model_save_path}') torch.save(obj = model.state_dict(), f = model_save_path) return model_save_path def tensorboard_writer(experiment_name: str, model_name: str, extra: str = None) -> torch.utils.tensorboard.writer.SummaryWriter(): '''Creates a torch.utils.tensorboard.writer.SummaryWriter() instance saving to a directory constructed from the inputs; equivalent to SummaryWriter(log_dir = 'runs/YYYY-MM-DD/experiment_name/model_name/extra') :param experiment_name: Name of experiment :param model_name: Name of model :param extra: Anything extra to add to the directory; defaults to None :return: Instance of a writer saving to log_dir ''' timestamp = datetime.now().strftime('%Y-%m-%d') log_dir = os.path.join('runs', timestamp, experiment_name, model_name) if extra: log_dir = os.path.join(log_dir, extra) print(f'[INFO] Created SummaryWriter, saving to: {log_dir}...') return SummaryWriter(log_dir = log_dir) def download_unzip(source: str, destination: str, remove_source: bool = True) -> Path: '''Downloads a zipped dataset from source and unzips it at destination. :param source: A link to a zipped file containing data :param destination: A target directory to unzip data to :param remove_source: Whether to remove the source after downloading and extracting :return: pathlib.Path to downloaded data ''' # Setup path to data folder data_path = Path('data/') image_path = data_path / destination # If the image folder doesn't exist, download it and prepare it... if image_path.is_dir(): print(f'[INFO] {image_path} directory exists, skipping download.') else: print(f'[INFO] Did not find {image_path} directory, creating one...') image_path.mkdir(parents = True, exist_ok = True) # Download pizza, steak, sushi data target_file = Path(source).name with open(data_path / target_file, 'wb') as f: request = requests.get(source) print(f'[INFO] Downloading {target_file} from {source}...') f.write(request.content) # Unzip pizza, steak, sushi data with zipfile.ZipFile(data_path / target_file, 'r') as zip_ref: print(f'[INFO] Unzipping {target_file} data...') zip_ref.extractall(image_path) # Remove .zip file if remove_source: os.remove(data_path / target_file) return image_path #### Info Functions #### def summ(model: nn.Module, input_size: tuple): '''Shorthand for typical summary specification''' return summary(model = model, input_size = (32, 3, 224, 224), col_names = ['input_size', 'output_size', 'num_params', 'trainable'], col_width = 20, row_settings = ['var_names']) #### Plotting Functions #### def plot_predictions(train_data, train_labels, test_data, test_labels, predictions = None): '''Plots (matplotlib) linear training data and test data and compares predictions. Training data is in blue, test data in green, and predictions in red (if present). ''' plt.figure(figsize = (10, 7)) plt.scatter(train_data, train_labels, c = 'b', s = 4, label = 'Training data') plt.scatter(test_data, test_labels, c = 'g', s = 4, label = 'Testing data') if predictions is not None: plt.scatter(test_data, predictions, c = 'r', s = 4, label = 'Predictions') plt.legend(prop = {'size': 14}) def plot_loss_curves(train_loss: list, train_metric: list, test_loss: list, test_metric: list): '''Plots (matplotlib) training (and testing) curves from lists of values. ''' epochs = range(len(train_loss)) plt.figure(figsize = (15, 7)) # Loss plt.subplot(1, 2, 1) plt.plot(epochs, train_loss, label = 'train_loss') plt.plot(epochs, test_loss, label = 'test_loss') plt.title('Loss') plt.xlabel('Epochs') plt.legend() # Accuracy plt.subplot(1, 2, 2) plt.plot(epochs, train_metric, label = 'train_metric') plt.plot(epochs, test_metric, label = 'test_metric') plt.title('Performance Metric') plt.xlabel('Epochs') plt.legend() def plot_decision_boundary(model: nn.Module, X: torch.Tensor, y: torch.Tensor): '''Plots (matplotlib) decision boundaries of model predicting on X in comparison to y. Source - https://madewithml.com/courses/foundations/neural-networks/ (with modifications) ''' # Put everything to CPU (works better with NumPy + Matplotlib) model.to('cpu') X, y = X.to('cpu'), y.to('cpu') # Setup prediction boundaries and grid x_min, x_max = X[:, 0].min() - 0.1, X[:, 0].max() + 0.1 y_min, y_max = X[:, 1].min() - 0.1, X[:, 1].max() + 0.1 xx, yy = np.meshgrid(np.linspace(x_min, x_max, 101), np.linspace(y_min, y_max, 101)) # Make features X_to_pred_on = torch.from_numpy(np.column_stack((xx.ravel(), yy.ravel()))).float() # Make predictions model.eval() with torch.inference_mode(): y_logits = model(X_to_pred_on) # Test for multi-class or binary and adjust logits to prediction labels y_pred = torch.softmax(y_logits, dim = 1).argmax(dim = 1) if len(torch.unique(y)) > 2 else torch.round(torch.sigmoid(y_logits)) # Reshape preds and plot y_pred = y_pred.reshape(xx.shape).detach().numpy() plt.contourf(xx, yy, y_pred, cmap = plt.cm.RdYlBu, alpha = 0.7) plt.scatter(X[:, 0], X[:, 1], c = y, s = 40, cmap = plt.cm.RdYlBu) plt.xlim(xx.min(), xx.max()) plt.ylim(yy.min(), yy.max())