jwyang
first commit
4121bec
# Copyright (c) Facebook, Inc. and its affiliates.
import logging
import numpy as np
from typing import Dict, List, Optional, Tuple
from numpy.lib import pad
import torch
from torch import nn
from torch.nn import functional as F
from random import randint
from detectron2.config import configurable
from detectron2.data.detection_utils import convert_image_to_rgb
from detectron2.structures import ImageList, Instances, Boxes
from detectron2.utils.events import get_event_storage
from detectron2.utils.logger import log_first_n
from ..backbone import Backbone, build_backbone
from ..postprocessing import detector_postprocess
from ..proposal_generator import build_proposal_generator
from ..roi_heads import build_roi_heads
from .build import META_ARCH_REGISTRY
__all__ = ["GeneralizedRCNN", "ProposalNetwork"]
@META_ARCH_REGISTRY.register()
class GeneralizedRCNN(nn.Module):
"""
Generalized R-CNN. Any models that contains the following three components:
1. Per-image feature extraction (aka backbone)
2. Region proposal generation
3. Per-region feature extraction and prediction
"""
@configurable
def __init__(
self,
*,
backbone: Backbone,
proposal_generator: nn.Module,
roi_heads: nn.Module,
pixel_mean: Tuple[float],
pixel_std: Tuple[float],
input_format: Optional[str] = None,
vis_period: int = 0,
use_clip_c4: False,
use_clip_attpool: False,
):
"""
Args:
backbone: a backbone module, must follow detectron2's backbone interface
proposal_generator: a module that generates proposals using backbone features
roi_heads: a ROI head that performs per-region computation
pixel_mean, pixel_std: list or tuple with #channels element, representing
the per-channel mean and std to be used to normalize the input image
input_format: describe the meaning of channels of input. Needed by visualization
vis_period: the period to run visualization. Set to 0 to disable.
"""
super().__init__()
self.backbone = backbone
self.proposal_generator = proposal_generator
self.roi_heads = roi_heads
self.input_format = input_format
self.vis_period = vis_period
if vis_period > 0:
assert input_format is not None, "input_format is required for visualization!"
self.register_buffer("pixel_mean", torch.tensor(pixel_mean).view(-1, 1, 1), False)
self.register_buffer("pixel_std", torch.tensor(pixel_std).view(-1, 1, 1), False)
assert (
self.pixel_mean.shape == self.pixel_std.shape
), f"{self.pixel_mean} and {self.pixel_std} have different shapes!"
if np.sum(pixel_mean) < 3.0: # converrt pixel value to range [0.0, 1.0] by dividing 255.0
assert input_format == 'RGB'
self.div_pixel = True
else: # default setting
self.div_pixel = False
self.use_clip_c4 = use_clip_c4 # if True, use C4 mode where roi_head uses the last resnet layer from backbone
self.use_clip_attpool = use_clip_attpool # if True (C4+text_emb_as_classifier), use att_pool to replace default mean pool
@classmethod
def from_config(cls, cfg):
backbone = build_backbone(cfg)
return {
"backbone": backbone,
"proposal_generator": build_proposal_generator(cfg, backbone.output_shape()),
"roi_heads": build_roi_heads(cfg, backbone.output_shape()),
"input_format": cfg.INPUT.FORMAT,
"vis_period": cfg.VIS_PERIOD,
"pixel_mean": cfg.MODEL.PIXEL_MEAN,
"pixel_std": cfg.MODEL.PIXEL_STD,
"use_clip_c4": cfg.MODEL.BACKBONE.NAME == "build_clip_resnet_backbone",
"use_clip_attpool": cfg.MODEL.ROI_HEADS.NAME == 'CLIPRes5ROIHeads' and cfg.MODEL.CLIP.USE_TEXT_EMB_CLASSIFIER,
}
@property
def device(self):
return self.pixel_mean.device
def visualize_training(self, batched_inputs, proposals):
"""
A function used to visualize images and proposals. It shows ground truth
bounding boxes on the original image and up to 20 top-scoring predicted
object proposals on the original image. Users can implement different
visualization functions for different models.
Args:
batched_inputs (list): a list that contains input to the model.
proposals (list): a list that contains predicted proposals. Both
batched_inputs and proposals should have the same length.
"""
from detectron2.utils.visualizer import Visualizer
storage = get_event_storage()
max_vis_prop = 20
for input, prop in zip(batched_inputs, proposals):
img = input["image"]
img = convert_image_to_rgb(img.permute(1, 2, 0), self.input_format)
v_gt = Visualizer(img, None)
v_gt = v_gt.overlay_instances(boxes=input["instances"].gt_boxes)
anno_img = v_gt.get_image()
box_size = min(len(prop.proposal_boxes), max_vis_prop)
v_pred = Visualizer(img, None)
v_pred = v_pred.overlay_instances(
boxes=prop.proposal_boxes[0:box_size].tensor.cpu().numpy()
)
prop_img = v_pred.get_image()
vis_img = np.concatenate((anno_img, prop_img), axis=1)
vis_img = vis_img.transpose(2, 0, 1)
vis_name = "Left: GT bounding boxes; Right: Predicted proposals"
storage.put_image(vis_name, vis_img)
break # only visualize one image in a batch
def forward(self, batched_inputs: List[Dict[str, torch.Tensor]]):
"""
Args:
batched_inputs: a list, batched outputs of :class:`DatasetMapper` .
Each item in the list contains the inputs for one image.
For now, each item in the list is a dict that contains:
* image: Tensor, image in (C, H, W) format.
* instances (optional): groundtruth :class:`Instances`
* proposals (optional): :class:`Instances`, precomputed proposals.
Other information that's included in the original dicts, such as:
* "height", "width" (int): the output resolution of the model, used in inference.
See :meth:`postprocess` for details.
Returns:
list[dict]:
Each dict is the output for one input image.
The dict contains one key "instances" whose value is a :class:`Instances`.
The :class:`Instances` object has the following keys:
"pred_boxes", "pred_classes", "scores", "pred_masks", "pred_keypoints"
"""
if not self.training:
return self.inference(batched_inputs)
images = self.preprocess_image(batched_inputs)
if "instances" in batched_inputs[0]:
gt_instances = [x["instances"].to(self.device) for x in batched_inputs]
else:
gt_instances = None
# eg: {'p2': torch.Size([b, c, 200, 304]), 'p3': torch.Size([b, c, 100, 152]), 'p4': torch.Size([b, c, 50, 76]), 'p5': torch.Size([b, c, 25, 38]), 'p6': torch.Size([b, c, 13, 19])}
features = self.backbone(images.tensor)
if self.proposal_generator is not None:
proposals, proposal_losses = self.proposal_generator(images, features, gt_instances)
else:
assert "proposals" in batched_inputs[0]
proposals = [x["proposals"].to(self.device) for x in batched_inputs]
proposal_losses = {}
if self.use_clip_c4: # use C4 + resnet weights from CLIP
if self.use_clip_attpool: # use att_pool from CLIP to match dimension
_, detector_losses = self.roi_heads(images, features, proposals, gt_instances, res5=self.backbone.layer4, attnpool=self.backbone.attnpool)
else: # use default mean pool
_, detector_losses = self.roi_heads(images, features, proposals, gt_instances, res5=self.backbone.layer4)
else: # default setting
_, detector_losses = self.roi_heads(images, features, proposals, gt_instances)
if self.vis_period > 0:
storage = get_event_storage()
if storage.iter % self.vis_period == 0:
self.visualize_training(batched_inputs, proposals)
losses = {}
losses.update(detector_losses)
losses.update(proposal_losses)
return losses
def inference(
self,
batched_inputs: List[Dict[str, torch.Tensor]],
detected_instances: Optional[List[Instances]] = None,
do_postprocess: bool = True,
):
"""
Run inference on the given inputs.
Args:
batched_inputs (list[dict]): same as in :meth:`forward`
detected_instances (None or list[Instances]): if not None, it
contains an `Instances` object per image. The `Instances`
object contains "pred_boxes" and "pred_classes" which are
known boxes in the image.
The inference will then skip the detection of bounding boxes,
and only predict other per-ROI outputs.
do_postprocess (bool): whether to apply post-processing on the outputs.
Returns:
When do_postprocess=True, same as in :meth:`forward`.
Otherwise, a list[Instances] containing raw network outputs.
"""
assert not self.training
images = self.preprocess_image(batched_inputs)
features = self.backbone(images.tensor)
if detected_instances is None:
if self.proposal_generator is not None:
proposals, _ = self.proposal_generator(images, features, None)
else:
assert "proposals" in batched_inputs[0]
proposals = [x["proposals"].to(self.device) for x in batched_inputs]
if self.use_clip_c4: # use C4 + resnet weights from CLIP
if self.use_clip_attpool: # use att_pool from CLIP to match dimension
results, _ = self.roi_heads(images, features, proposals, None, res5=self.backbone.layer4, attnpool=self.backbone.attnpool)
else: # use default mean pool
results, _ = self.roi_heads(images, features, proposals, None, res5=self.backbone.layer4)
else: # default setting
results, _ = self.roi_heads(images, features, proposals, None)
else:
detected_instances = [x.to(self.device) for x in detected_instances]
if self.use_clip_c4: # use C4 + resnet weights from CLIP
if self.use_clip_attpool: # use att_pool from CLIP to match dimension
results = self.roi_heads.forward_with_given_boxes(features, detected_instances, res5=self.backbone.layer4, attnpool=self.backbone.attnpool)
else: # use default mean pool
results = self.roi_heads.forward_with_given_boxes(features, detected_instances, res5=self.backbone.layer4)
else: # default setting
results = self.roi_heads.forward_with_given_boxes(features, detected_instances)
#visualize_proposals(batched_inputs, proposals, self.input_format)
if do_postprocess:
assert not torch.jit.is_scripting(), "Scripting is not supported for postprocess."
return GeneralizedRCNN._postprocess(results, batched_inputs, images.image_sizes)
else:
return results
def preprocess_image(self, batched_inputs: List[Dict[str, torch.Tensor]]):
"""
Normalize, pad and batch the input images.
"""
images = [x["image"].to(self.device) for x in batched_inputs]
if self.div_pixel:
images = [((x / 255.0) - self.pixel_mean) / self.pixel_std for x in images]
else:
images = [(x - self.pixel_mean) / self.pixel_std for x in images]
images = ImageList.from_tensors(images, self.backbone.size_divisibility)
return images
@staticmethod
def _postprocess(instances, batched_inputs: List[Dict[str, torch.Tensor]], image_sizes):
"""
Rescale the output instances to the target size.
"""
# note: private function; subject to changes
processed_results = []
for results_per_image, input_per_image, image_size in zip(
instances, batched_inputs, image_sizes
):
height = input_per_image.get("height", image_size[0])
width = input_per_image.get("width", image_size[1])
r = detector_postprocess(results_per_image, height, width)
processed_results.append({"instances": r})
return processed_results
@META_ARCH_REGISTRY.register()
class ProposalNetwork(nn.Module):
"""
A meta architecture that only predicts object proposals.
"""
@configurable
def __init__(
self,
*,
backbone: Backbone,
proposal_generator: nn.Module,
pixel_mean: Tuple[float],
pixel_std: Tuple[float],
input_format: Optional[str] = None,
):
"""
Args:
backbone: a backbone module, must follow detectron2's backbone interface
proposal_generator: a module that generates proposals using backbone features
pixel_mean, pixel_std: list or tuple with #channels element, representing
the per-channel mean and std to be used to normalize the input image
"""
super().__init__()
self.backbone = backbone
self.proposal_generator = proposal_generator
self.register_buffer("pixel_mean", torch.tensor(pixel_mean).view(-1, 1, 1), False)
self.register_buffer("pixel_std", torch.tensor(pixel_std).view(-1, 1, 1), False)
if np.sum(pixel_mean) < 3.0: # converrt pixel value to range [0.0, 1.0] by dividing 255.0
assert input_format == 'RGB'
self.div_pixel = True
else: # default setting
self.div_pixel = False
@classmethod
def from_config(cls, cfg):
backbone = build_backbone(cfg)
return {
"backbone": backbone,
"proposal_generator": build_proposal_generator(cfg, backbone.output_shape()),
"input_format": cfg.INPUT.FORMAT,
"pixel_mean": cfg.MODEL.PIXEL_MEAN,
"pixel_std": cfg.MODEL.PIXEL_STD,
}
@property
def device(self):
return self.pixel_mean.device
def forward(self, batched_inputs):
"""
Args:
Same as in :class:`GeneralizedRCNN.forward`
Returns:
list[dict]:
Each dict is the output for one input image.
The dict contains one key "proposals" whose value is a
:class:`Instances` with keys "proposal_boxes" and "objectness_logits".
"""
images = [x["image"].to(self.device) for x in batched_inputs]
if self.div_pixel:
images = [((x / 255.0) - self.pixel_mean) / self.pixel_std for x in images]
else:
images = [(x - self.pixel_mean) / self.pixel_std for x in images]
images = ImageList.from_tensors(images, self.backbone.size_divisibility)
features = self.backbone(images.tensor)
if "instances" in batched_inputs[0]:
gt_instances = [x["instances"].to(self.device) for x in batched_inputs]
elif "targets" in batched_inputs[0]:
log_first_n(
logging.WARN, "'targets' in the model inputs is now renamed to 'instances'!", n=10
)
gt_instances = [x["targets"].to(self.device) for x in batched_inputs]
else:
gt_instances = None
proposals, proposal_losses = self.proposal_generator(images, features, gt_instances)
# In training, the proposals are not useful at all but we generate them anyway.
# This makes RPN-only models about 5% slower.
if self.training:
return proposal_losses
processed_results = []
for results_per_image, input_per_image, image_size in zip(
proposals, batched_inputs, images.image_sizes
):
height = input_per_image.get("height", image_size[0])
width = input_per_image.get("width", image_size[1])
r = detector_postprocess(results_per_image, height, width)
processed_results.append({"proposals": r})
return processed_results