| |
| import logging |
| import math |
| from typing import List, Tuple, Union |
| import torch |
|
|
| from detectron2.layers import batched_nms, cat, move_device_like |
| from detectron2.structures import Boxes, Instances |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def _is_tracing(): |
| |
| if torch.jit.is_scripting(): |
| |
| return False |
| else: |
| return torch.jit.is_tracing() |
|
|
|
|
| def find_top_rpn_proposals( |
| proposals: List[torch.Tensor], |
| pred_objectness_logits: List[torch.Tensor], |
| image_sizes: List[Tuple[int, int]], |
| nms_thresh: float, |
| pre_nms_topk: int, |
| post_nms_topk: int, |
| min_box_size: float, |
| training: bool, |
| ): |
| """ |
| For each feature map, select the `pre_nms_topk` highest scoring proposals, |
| apply NMS, clip proposals, and remove small boxes. Return the `post_nms_topk` |
| highest scoring proposals among all the feature maps for each image. |
| |
| Args: |
| proposals (list[Tensor]): A list of L tensors. Tensor i has shape (N, Hi*Wi*A, 4). |
| All proposal predictions on the feature maps. |
| pred_objectness_logits (list[Tensor]): A list of L tensors. Tensor i has shape (N, Hi*Wi*A). |
| image_sizes (list[tuple]): sizes (h, w) for each image |
| nms_thresh (float): IoU threshold to use for NMS |
| pre_nms_topk (int): number of top k scoring proposals to keep before applying NMS. |
| When RPN is run on multiple feature maps (as in FPN) this number is per |
| feature map. |
| post_nms_topk (int): number of top k scoring proposals to keep after applying NMS. |
| When RPN is run on multiple feature maps (as in FPN) this number is total, |
| over all feature maps. |
| min_box_size (float): minimum proposal box side length in pixels (absolute units |
| wrt input images). |
| training (bool): True if proposals are to be used in training, otherwise False. |
| This arg exists only to support a legacy bug; look for the "NB: Legacy bug ..." |
| comment. |
| |
| Returns: |
| list[Instances]: list of N Instances. The i-th Instances |
| stores post_nms_topk object proposals for image i, sorted by their |
| objectness score in descending order. |
| """ |
| num_images = len(image_sizes) |
| device = ( |
| proposals[0].device |
| if torch.jit.is_scripting() |
| else ("cpu" if torch.jit.is_tracing() else proposals[0].device) |
| ) |
|
|
| |
| topk_scores = [] |
| topk_proposals = [] |
| level_ids = [] |
| batch_idx = move_device_like(torch.arange(num_images, device=device), proposals[0]) |
| for level_id, (proposals_i, logits_i) in enumerate(zip(proposals, pred_objectness_logits)): |
| Hi_Wi_A = logits_i.shape[1] |
| if isinstance(Hi_Wi_A, torch.Tensor): |
| num_proposals_i = torch.clamp(Hi_Wi_A, max=pre_nms_topk) |
| else: |
| num_proposals_i = min(Hi_Wi_A, pre_nms_topk) |
|
|
| topk_scores_i, topk_idx = logits_i.topk(num_proposals_i, dim=1) |
|
|
| |
| topk_proposals_i = proposals_i[batch_idx[:, None], topk_idx] |
|
|
| topk_proposals.append(topk_proposals_i) |
| topk_scores.append(topk_scores_i) |
| level_ids.append( |
| move_device_like( |
| torch.full((num_proposals_i,), level_id, dtype=torch.int64, device=device), |
| proposals[0], |
| ) |
| ) |
|
|
| |
| topk_scores = cat(topk_scores, dim=1) |
| topk_proposals = cat(topk_proposals, dim=1) |
| level_ids = cat(level_ids, dim=0) |
|
|
| |
| results: List[Instances] = [] |
| for n, image_size in enumerate(image_sizes): |
| boxes = Boxes(topk_proposals[n]) |
| scores_per_img = topk_scores[n] |
| lvl = level_ids |
|
|
| valid_mask = torch.isfinite(boxes.tensor).all(dim=1) & torch.isfinite(scores_per_img) |
| if not valid_mask.all(): |
| if training: |
| raise FloatingPointError( |
| "Predicted boxes or scores contain Inf/NaN. Training has diverged." |
| ) |
| boxes = boxes[valid_mask] |
| scores_per_img = scores_per_img[valid_mask] |
| lvl = lvl[valid_mask] |
| boxes.clip(image_size) |
|
|
| |
| keep = boxes.nonempty(threshold=min_box_size) |
| if _is_tracing() or keep.sum().item() != len(boxes): |
| boxes, scores_per_img, lvl = boxes[keep], scores_per_img[keep], lvl[keep] |
|
|
| keep = batched_nms(boxes.tensor, scores_per_img, lvl, nms_thresh) |
| |
| |
| |
| |
| |
| |
| |
| keep = keep[:post_nms_topk] |
|
|
| res = Instances(image_size) |
| res.proposal_boxes = boxes[keep] |
| res.objectness_logits = scores_per_img[keep] |
| results.append(res) |
| return results |
|
|
|
|
| def add_ground_truth_to_proposals( |
| gt: Union[List[Instances], List[Boxes]], proposals: List[Instances] |
| ) -> List[Instances]: |
| """ |
| Call `add_ground_truth_to_proposals_single_image` for all images. |
| |
| Args: |
| gt(Union[List[Instances], List[Boxes]): list of N elements. Element i is a Instances |
| representing the ground-truth for image i. |
| proposals (list[Instances]): list of N elements. Element i is a Instances |
| representing the proposals for image i. |
| |
| Returns: |
| list[Instances]: list of N Instances. Each is the proposals for the image, |
| with field "proposal_boxes" and "objectness_logits". |
| """ |
| assert gt is not None |
|
|
| if len(proposals) != len(gt): |
| raise ValueError("proposals and gt should have the same length as the number of images!") |
| if len(proposals) == 0: |
| return proposals |
|
|
| return [ |
| add_ground_truth_to_proposals_single_image(gt_i, proposals_i) |
| for gt_i, proposals_i in zip(gt, proposals) |
| ] |
|
|
|
|
| def add_ground_truth_to_proposals_single_image( |
| gt: Union[Instances, Boxes], proposals: Instances |
| ) -> Instances: |
| """ |
| Augment `proposals` with `gt`. |
| |
| Args: |
| Same as `add_ground_truth_to_proposals`, but with gt and proposals |
| per image. |
| |
| Returns: |
| Same as `add_ground_truth_to_proposals`, but for only one image. |
| """ |
| if isinstance(gt, Boxes): |
| |
| gt = Instances(proposals.image_size, gt_boxes=gt) |
|
|
| gt_boxes = gt.gt_boxes |
| device = proposals.objectness_logits.device |
| |
| |
| gt_logit_value = math.log((1.0 - 1e-10) / (1 - (1.0 - 1e-10))) |
| gt_logits = gt_logit_value * torch.ones(len(gt_boxes), device=device) |
|
|
| |
| gt_proposal = Instances(proposals.image_size, **gt.get_fields()) |
| gt_proposal.proposal_boxes = gt_boxes |
| gt_proposal.objectness_logits = gt_logits |
|
|
| for key in proposals.get_fields().keys(): |
| assert gt_proposal.has( |
| key |
| ), "The attribute '{}' in `proposals` does not exist in `gt`".format(key) |
|
|
| |
| |
| new_proposals = Instances.cat([proposals, gt_proposal]) |
|
|
| return new_proposals |
|
|