Paul Engstler
Initial commit
92f0e98
from typing import Callable, Iterable, Tuple
import torch
import numpy as np
import PIL.Image
import cv2
import wandb
from tqdm import tqdm
from pytorch_grad_cam import GradCAM
from utils.val_loop_hook import ValidationLoopHook
def _get_grad_cam_target(model):
"""
Determines the appropriate GradCAM target.
"""
# very naive check
if hasattr(model, "features"):
return getattr(model, "features")
pooling = [torch.nn.AdaptiveAvgPool1d, torch.nn.AvgPool1d, torch.nn.MaxPool1d, torch.nn.AdaptiveMaxPool1d,
torch.nn.AdaptiveAvgPool2d, torch.nn.AvgPool2d, torch.nn.MaxPool2d, torch.nn.AdaptiveMaxPool2d,
torch.nn.AdaptiveAvgPool3d, torch.nn.AvgPool3d, torch.nn.MaxPool3d, torch.nn.AdaptiveMaxPool3d]
convolutions = [torch.nn.Conv1d, torch.nn.Conv2d, torch.nn.Conv3d]
# reverse search starting from the final module
inverted_modules = list(model.modules())[::-1]
for i, module in enumerate(inverted_modules):
if any([isinstance(module, po) for po in pooling]):
# if a pooling layer was hit, pick the module directly before it
return inverted_modules[i+1]
elif any([isinstance(module, co) for co in convolutions]):
# if a convolution was hit (but no pooling layer), pick that one instead
return module
elif isinstance(module, torch.nn.Sequential):
# if a sequential module is hit, explore it
for child in list(module.children())[::-1]:
sequential_result = _get_grad_cam_target(child)
if sequential_result is not None:
return sequential_result
def _show_cam_on_image(img: np.ndarray, mask: np.ndarray, use_rgb: bool = False, colormap: int = cv2.COLORMAP_JET) -> np.ndarray:
""" This function overlays the cam mask on the image as an heatmap.
By default the heatmap is in BGR format.
:param img: The base image in RGB or BGR format.
:param mask: The cam mask.
:param use_rgb: Whether to use an RGB or BGR heatmap, this should be set to True if 'img' is in RGB format.
:param colormap: The OpenCV colormap to be used.
:returns: The default image with the cam overlay.
"""
heatmap = cv2.applyColorMap(np.uint8(255 * mask), colormap)
if use_rgb:
heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
heatmap = np.float32(heatmap) / 255
normalize = lambda x: (x - np.min(x))/np.ptp(x)
cam = 0.6 * heatmap + normalize(img)
cam = cam / np.max(cam)
return np.uint8(255 * cam)
def _strip_image_from_grid_row(row, gap=5, bg=255):
strip = torch.full(
(row.shape[0] * (row.shape[3] + gap) - gap,
row.shape[1] * (row.shape[3] + gap) - gap,
row.shape[4]), bg, dtype=row.dtype)
for i in range(0, row.shape[0] * row.shape[1]):
strip[(i // row.shape[1]) * (row.shape[2] + gap) : ((i // row.shape[1])+1) * (row.shape[2] + gap) - gap,
(i % row.shape[1]) * (row.shape[3] + gap) : ((i % row.shape[1])+1) * (row.shape[3] + gap) - gap,
:] = row[i // row.shape[1]][i % row.shape[1]]
return PIL.Image.fromarray(strip.numpy())
class GradCAMBuilder(ValidationLoopHook):
def __init__(self, image_shape: Iterable[int], target_category: int = None, num_images: int = 5):
self.image_shape = image_shape
self.target_category = target_category
self.num_images = num_images
self.targets = torch.zeros(self.num_images)
self.activations = torch.zeros(self.num_images)
self.images = torch.zeros(torch.Size([self.num_images]) + torch.Size(self.image_shape))
def process(self, batch, target_batch, logits_batch, prediction_batch):
image_batch = batch["image"]
with torch.no_grad():
if self.target_category is not None:
local_activations = logits_batch[:, self.target_category]
else:
local_activations = torch.amax(logits_batch, dim=-1)
# filter samples where the prediction lines up with the target
target_match = (prediction_batch == target_batch)
# filter public dataset samples
public = torch.tensor(["verse" in id for id in batch["verse_id"]]).type_as(target_match)
mask = target_match & public
if torch.max(mask) == False:
# no samples match criteria in this batch, skip
return
# identify better activations and replace them accordingly
local_top_idx = torch.argsort(local_activations, descending=True)
# filter samples
local_top_idx = local_top_idx[mask[local_top_idx]]
current_idx = 0
while current_idx < self.num_images and local_activations[local_top_idx[current_idx]] > torch.min(self.activations):
# next item in local batch matches criteria and has a higher activation than one in the global batch, replace it
idx_to_replace = torch.argsort(self.activations)[0]
self.activations[idx_to_replace] = local_activations[local_top_idx[current_idx]]
self.images[idx_to_replace] = image_batch[local_top_idx[current_idx]]
self.targets[idx_to_replace] = target_batch[local_top_idx[current_idx]]
current_idx += 1
def trigger(self, module):
model = module.backbone
module.eval()
# determine the Grad-CAM target module/layer
grad_cam_target = _get_grad_cam_target(model)
cam = GradCAM(model, [grad_cam_target], use_cuda=torch.cuda.is_available())
# determine final order such that the highest activations are placed on top
sorted_idx = torch.argsort(self.activations, descending=True)
self.activations = self.activations[sorted_idx]
self.images = self.images[sorted_idx]
self.targets = self.targets[sorted_idx]
# if a polyaxon experiment crashes here, remove the GradCAMBuilder instance from the
# model.validation_hooks list
grad_cams = cam(input_tensor=self.images, target_category=self.target_category)
module.train()
if len(self.images.shape) == 5:
# 3D, visualize slices
ld_res = grad_cams.shape[-1]
img_res = self.images.shape[-1]
img_slices = torch.linspace(int(img_res/ld_res/2), img_res-int(img_res/ld_res/2), ld_res, dtype=torch.long)
# Show all images slices in a larger combined image
grad_cams_image = _strip_image_from_grid_row(
torch.stack([
torch.stack([
torch.tensor(
_show_cam_on_image((self.images[i, 0, ..., img_slices[s]]).unsqueeze(-1).repeat(1, 1, 3).numpy(), grad_cams[i, ..., s], use_rgb=True)
)
for s in range(grad_cams.shape[-1])])
for i in range(self.num_images if self.num_images < grad_cams.shape[0] else grad_cams.shape[0])])
)
elif len(self.images.shape) == 4:
# 2D
grad_cams_image = _strip_image_from_grid_row(
torch.stack([
torch.stack([
torch.tensor(
_show_cam_on_image((self.images[i, 0, ...]).unsqueeze(-1).repeat(1, 1, 3).numpy(), grad_cams[i, ...], use_rgb=True)
)
])
for i in range(self.num_images if self.num_images < grad_cams.shape[0] else grad_cams.shape[0])])
)
else:
raise RuntimeError("Attempting to build Grad-CAMs for data that is neither 2D nor 3D")
module.logger.experiment.log({
"val/grad_cam": wandb.Image(grad_cams_image)
})
def reset(self):
self.targets = torch.zeros(self.num_images)
self.activations = torch.zeros(self.num_images)
self.images = torch.zeros(torch.Size([self.num_images]) + torch.Size(self.image_shape))