""" Implementation of YOLOv3 architecture """ import os import pytorch_lightning as pl import pandas as pd import seaborn as sn import torch import torch.nn as nn import torch.nn.functional as F import torchvision from IPython.core.display import display #from pl_bolts.datamodules import CIFAR10DataModule #from pl_bolts.transforms.dataset_normalizations import cifar10_normalization from pytorch_lightning import LightningModule, Trainer, seed_everything from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint from pytorch_lightning.callbacks.progress import TQDMProgressBar from pytorch_lightning.loggers import CSVLogger from torch.optim.lr_scheduler import OneCycleLR from torchmetrics.functional import accuracy import torch.cuda.amp as amp from torch.utils.data import DataLoader from loss import YoloLoss from pytorch_lightning import LightningModule, Trainer from torch.optim.lr_scheduler import OneCycleLR from torch_lr_finder import LRFinder import torch.nn as nn from dataset import YOLODataset import config import torch import torch.optim as optim import os #from model import YOLOv3 from tqdm import tqdm from utils import ( mean_average_precision, cells_to_bboxes, get_evaluation_bboxes, save_checkpoint, load_checkpoint, check_class_accuracy, get_loaders, plot_couple_examples ) from loss import YoloLoss import warnings from pytorch_lightning import LightningModule import torch from loss import YoloLoss import torch.nn as nn import config """ Information about architecture config: Tuple is structured by (filters, kernel_size, stride) Every conv is a same convolution. List is structured by "B" indicating a residual block followed by the number of repeats "S" is for scale prediction block and computing the yolo loss "U" is for upsampling the feature map and concatenating with a previous layer """ config_1 = [ (32, 3, 1), (64, 3, 2), ["B", 1], (128, 3, 2), ["B", 2], (256, 3, 2), ["B", 8], (512, 3, 2), ["B", 8], (1024, 3, 2), ["B", 4], # To this point is Darknet-53 (512, 1, 1), (1024, 3, 1), "S", (256, 1, 1), "U", (256, 1, 1), (512, 3, 1), "S", (128, 1, 1), "U", (128, 1, 1), (256, 3, 1), "S", ] class CNNBlock(nn.Module): def __init__(self, in_channels, out_channels, bn_act=True, **kwargs): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, bias=not bn_act, **kwargs) self.bn = nn.BatchNorm2d(out_channels) self.leaky = nn.LeakyReLU(0.1) self.use_bn_act = bn_act def forward(self, x): if self.use_bn_act: return self.leaky(self.bn(self.conv(x))) else: return self.conv(x) class ResidualBlock(nn.Module): def __init__(self, channels, use_residual=True, num_repeats=1): super().__init__() self.layers = nn.ModuleList() for repeat in range(num_repeats): self.layers += [ nn.Sequential( CNNBlock(channels, channels // 2, kernel_size=1), CNNBlock(channels // 2, channels, kernel_size=3, padding=1), ) ] self.use_residual = use_residual self.num_repeats = num_repeats def forward(self, x): for layer in self.layers: if self.use_residual: x = x + layer(x) else: x = layer(x) return x class ScalePrediction(nn.Module): def __init__(self, in_channels, num_classes): super().__init__() self.pred = nn.Sequential( CNNBlock(in_channels, 2 * in_channels, kernel_size=3, padding=1), CNNBlock( 2 * in_channels, (num_classes + 5) * 3, bn_act=False, kernel_size=1 ), ) self.num_classes = num_classes def forward(self, x): return ( self.pred(x) .reshape(x.shape[0], 3, self.num_classes + 5, x.shape[2], x.shape[3]) .permute(0, 1, 3, 4, 2) ) class YOLOv3(LightningModule): def __init__(self, in_channels=3, num_classes=80): super().__init__() self.num_classes = num_classes self.in_channels = in_channels self.layers = self._create_conv_layers() def forward(self, x): outputs = [] # for each scale route_connections = [] for layer in self.layers: if isinstance(layer, ScalePrediction): outputs.append(layer(x)) continue x = layer(x) if isinstance(layer, ResidualBlock) and layer.num_repeats == 8: route_connections.append(x) elif isinstance(layer, nn.Upsample): x = torch.cat([x, route_connections[-1]], dim=1) route_connections.pop() return outputs def _create_conv_layers(self): layers = nn.ModuleList() in_channels = self.in_channels for module in config_1: if isinstance(module, tuple): out_channels, kernel_size, stride = module layers.append( CNNBlock( in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=1 if kernel_size == 3 else 0, ) ) in_channels = out_channels elif isinstance(module, list): num_repeats = module[1] layers.append(ResidualBlock(in_channels, num_repeats=num_repeats,)) elif isinstance(module, str): if module == "S": layers += [ ResidualBlock(in_channels, use_residual=False, num_repeats=1), CNNBlock(in_channels, in_channels // 2, kernel_size=1), ScalePrediction(in_channels // 2, num_classes=self.num_classes), ] in_channels = in_channels // 2 elif module == "U": layers.append(nn.Upsample(scale_factor=2),) in_channels = in_channels * 3 return layers class YoloVersion3(LightningModule): def __init__(self): super(YoloVersion3, self).__init__( ) self.save_hyperparameters() # Set our init args as class attributes self.learning_rate=config.LEARNING_RATE #self.config=config self.num_classes=config.NUM_CLASSES self.train_csv=config.DATASET + "/train.csv" self.test_csv=config.DATASET + "/test.csv" self.loss_fn= YoloLoss() self.scaler = amp.GradScaler() #self.train_transform_function= config.train_transforms #self.in_channels = 3 self.model= YOLOv3(num_classes=config.NUM_CLASSES).to(config.DEVICE) self.scaled_anchors = ( torch.tensor(config.ANCHORS) * torch.tensor(config.S).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2)).to(config.DEVICE) #self.register_buffer("scaled_anchors", self.scaled_anchors) self.training_step_outputs = [] def forward(self, x): return self.model(x) def training_step(self, batch, batch_idx): x, y = batch y0, y1, y2 = ( y[0], y[1], y[2], ) out = self(x) loss = ( self.loss_fn(out[0], y0, self.scaled_anchors[0]) + self.loss_fn(out[1], y1, self.scaled_anchors[1]) + self.loss_fn(out[2], y2, self.scaled_anchors[2]) ) self.log("train_loss", loss, on_epoch=True, prog_bar=True, logger=True) # Logging the training loss for visualization self.training_step_outputs.append(loss) return loss def on_train_epoch_end(self): print(f"\nCurrently epoch {self.current_epoch}") train_epoch_average = torch.stack(self.training_step_outputs).mean() self.training_step_outputs.clear() print(f"Train loss {train_epoch_average}") print("On Train Eval loader:") print("On Train loader:") class_accuracy, no_obj_accuracy, obj_accuracy = check_class_accuracy(self.model, self.train_loader, threshold=config.CONF_THRESHOLD) self.log("class_accuracy", class_accuracy, on_epoch=True, prog_bar=True, logger=True) self.log("no_obj_accuracy", no_obj_accuracy, on_epoch=True, prog_bar=True, logger=True) self.log("obj_accuracy", obj_accuracy, on_epoch=True, prog_bar=True, logger=True) if (self.current_epoch>0) and ((self.current_epoch+1) % 6 == 0): # for every 10 epochs we are plotting plot_couple_examples(self.model, self.test_loader, 0.6, 0.5, self.scaled_anchors) if (self.current_epoch>0) and (self.current_epoch+1 == self.trainer.max_epochs ): #map calculation across last epoch check_class_accuracy(self.model, self.test_loader, threshold=config.CONF_THRESHOLD) pred_boxes, true_boxes = get_evaluation_bboxes( self.test_loader, self.model, iou_threshold=config.NMS_IOU_THRESH, anchors=config.ANCHORS, threshold=config.CONF_THRESHOLD, ) mapval = mean_average_precision( pred_boxes, true_boxes, iou_threshold=config.MAP_IOU_THRESH, box_format="midpoint", num_classes=config.NUM_CLASSES, ) print(f"MAP: {mapval.item()}") self.log("MAP", mapval.item(), on_epoch=True, prog_bar=True, logger=True) def configure_optimizers(self): optimizer = optim.Adam( self.parameters(), lr=config.LEARNING_RATE, weight_decay=config.WEIGHT_DECAY, ) self.trainer.fit_loop.setup_data() dataloader = self.trainer.train_dataloader EPOCHS = config.NUM_EPOCHS # 40 % of number of epochs lr_scheduler = OneCycleLR( optimizer, max_lr=1E-3, steps_per_epoch=len(dataloader), epochs=EPOCHS, pct_start=5/EPOCHS, div_factor=100, three_phase=False, final_div_factor=100, anneal_strategy='linear' ) scheduler = {"scheduler": lr_scheduler, "interval" : "step"} return [optimizer] def setup(self, stage=None): self.train_loader, self.test_loader, self.train_eval_loader = get_loaders( train_csv_path=self.train_csv, test_csv_path=self.test_csv, ) def train_dataloader(self): return self.train_loader def val_dataloader(self): return self.train_eval_loader def test_dataloader(self): return self.test_loader # if __name__ == "__main__": # model = YoloVersion3() # checkpoint = ModelCheckpoint(filename='last_epoch', save_last=True) # lr_rate_monitor = LearningRateMonitor(logging_interval="epoch") # trainer = pl.Trainer( # max_epochs=config.NUM_EPOCHS, # deterministic=True, # logger=True, # default_root_dir="/content/drive/MyDrive/sunandini/Checkpoint/", # callbacks=[lr_rate_monitor], # enable_model_summary=False, # log_every_n_steps=1, # precision="16-mixed" # ) # print("---- Training Started ---- Sunandini ----") # trainer.fit(model) # torch.save(model.state_dict(), 'YOLOv3.pth') if __name__ == "__main__": num_classes = 20 IMAGE_SIZE = 416 model = YOLOv3(num_classes=num_classes) x = torch.randn((2, 3, IMAGE_SIZE, IMAGE_SIZE)) out = model(x) assert model(x)[0].shape == (2, 3, IMAGE_SIZE//32, IMAGE_SIZE//32, num_classes + 5) assert model(x)[1].shape == (2, 3, IMAGE_SIZE//16, IMAGE_SIZE//16, num_classes + 5) assert model(x)[2].shape == (2, 3, IMAGE_SIZE//8, IMAGE_SIZE//8, num_classes + 5) print("Success!")