Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
import numpy as np | |
import torch | |
import cv2 | |
import pyclipper | |
from shapely.geometry import Polygon | |
from maskrcnn_benchmark.structures.bounding_box import BoxList | |
from maskrcnn_benchmark.structures.boxlist_ops import cat_boxlist, cat_boxlist_gt | |
from maskrcnn_benchmark.structures.boxlist_ops import remove_small_boxes | |
from maskrcnn_benchmark.structures.segmentation_mask import SegmentationMask | |
import random | |
import time | |
class SEGPostProcessor(torch.nn.Module): | |
""" | |
Performs post-processing on the outputs of the RPN boxes, before feeding the | |
proposals to the heads | |
""" | |
def __init__( | |
self, | |
top_n, | |
binary_thresh, | |
box_thresh, | |
min_size, | |
cfg, | |
): | |
""" | |
Arguments: | |
top_n (int) | |
binary_thresh (float) | |
box_thresh (float) | |
min_size (int) | |
""" | |
super(SEGPostProcessor, self).__init__() | |
self.top_n = top_n | |
self.binary_thresh = binary_thresh | |
self.box_thresh = box_thresh | |
self.min_size = min_size | |
self.cfg = cfg | |
def add_gt_proposals(self, proposals, targets): | |
""" | |
Arguments: | |
proposals: list[BoxList] | |
targets: list[BoxList] | |
""" | |
# Get the device we're operating on | |
# device = proposals[0].bbox. | |
if self.cfg.MODEL.SEG.USE_SEG_POLY or self.cfg.MODEL.ROI_BOX_HEAD.USE_MASKED_FEATURE or self.cfg.MODEL.ROI_MASK_HEAD.USE_MASKED_FEATURE: | |
gt_boxes = [target.copy_with_fields(['masks']) for target in targets] | |
else: | |
gt_boxes = [target.copy_with_fields([]) for target in targets] | |
# later cat of bbox requires all fields to be present for all bbox | |
# so we need to add a dummy for objectness that's missing | |
# for gt_box in gt_boxes: | |
# gt_box.add_field("objectness", torch.ones(len(gt_box), device=device)) | |
proposals = [ | |
cat_boxlist_gt([proposal, gt_box]) | |
for proposal, gt_box in zip(proposals, gt_boxes) | |
] | |
return proposals | |
def aug_tensor_proposals(self, boxes): | |
# boxes: N * 4 | |
boxes = boxes.float() | |
N = boxes.shape[0] | |
device = boxes.device | |
aug_boxes = torch.zeros((4, N, 4), device=device) | |
aug_boxes[0, :, :] = boxes.clone() | |
xmin, ymin, xmax, ymax = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] | |
x_center = (xmin + xmax) / 2. | |
y_center = (ymin + ymax) / 2. | |
width = xmax - xmin | |
height = ymax - ymin | |
for i in range(3): | |
choice = random.random() | |
if choice < 0.5: | |
# shrink or expand | |
ratio = (torch.randn((N,), device=device) * 3 + 1) / 2. | |
height = height * ratio | |
ratio = (torch.randn((N,), device=device) * 3 + 1) / 2. | |
width = width * ratio | |
else: | |
move_x = width * (torch.randn((N,), device=device) * 4 - 2) | |
move_y = height * (torch.randn((N,), device=device) * 4 - 2) | |
x_center += move_x | |
y_center += move_y | |
boxes[:, 0] = x_center - width / 2 | |
boxes[:, 2] = x_center + width / 2 | |
boxes[:, 1] = y_center - height / 2 | |
boxes[:, 3] = y_center + height / 2 | |
aug_boxes[i+1, :, :] = boxes.clone() | |
return aug_boxes.reshape((-1, 4)) | |
def forward_for_single_feature_map(self, pred, image_shapes): | |
""" | |
Arguments: | |
pred: tensor of size N, 1, H, W | |
""" | |
device = pred.device | |
# torch.cuda.synchronize() | |
# start_time = time.time() | |
bitmap = self.binarize(pred) | |
# torch.cuda.synchronize() | |
# end_time = time.time() | |
# print('binarize time:', end_time - start_time) | |
N, height, width = pred.shape[0], pred.shape[2], pred.shape[3] | |
# torch.cuda.synchronize() | |
# start_time = time.time() | |
bitmap_numpy = bitmap.cpu().numpy() # The first channel | |
pred_map_numpy = pred.cpu().numpy() | |
# torch.cuda.synchronize() | |
# end_time = time.time() | |
# print('gpu2numpy time:', end_time - start_time) | |
boxes_batch = [] | |
rotated_boxes_batch = [] | |
polygons_batch = [] | |
scores_batch = [] | |
# torch.cuda.synchronize() | |
# start_time = time.time() | |
for batch_index in range(N): | |
image_shape = image_shapes[batch_index] | |
boxes, scores, rotated_boxes, polygons = self.boxes_from_bitmap( | |
pred_map_numpy[batch_index], | |
bitmap_numpy[batch_index], width, height) | |
boxes = boxes.to(device) | |
if self.training and self.cfg.MODEL.SEG.AUG_PROPOSALS: | |
boxes = self.aug_tensor_proposals(boxes) | |
if boxes.shape[0] > self.top_n: | |
boxes = boxes[:self.top_n, :] | |
# _, top_index = scores.topk(self.top_n, 0, sorted=False) | |
# boxes = boxes[top_index, :] | |
# scores = scores[top_index] | |
# boxlist = BoxList(boxes, (width, height), mode="xyxy") | |
boxlist = BoxList(boxes, (image_shape[1], image_shape[0]), mode="xyxy") | |
if self.cfg.MODEL.SEG.USE_SEG_POLY or self.cfg.MODEL.ROI_BOX_HEAD.USE_MASKED_FEATURE or self.cfg.MODEL.ROI_MASK_HEAD.USE_MASKED_FEATURE: | |
masks = SegmentationMask(polygons, (image_shape[1], image_shape[0])) | |
boxlist.add_field('masks', masks) | |
boxlist = boxlist.clip_to_image(remove_empty=False) | |
# boxlist = remove_small_boxes(boxlist, self.min_size) | |
boxes_batch.append(boxlist) | |
rotated_boxes_batch.append(rotated_boxes) | |
polygons_batch.append(polygons) | |
scores_batch.append(scores) | |
# torch.cuda.synchronize() | |
# end_time = time.time() | |
# print('loop time:', end_time - start_time) | |
return boxes_batch, rotated_boxes_batch, polygons_batch, scores_batch | |
def forward(self, seg_output, image_shapes, targets=None): | |
""" | |
Arguments: | |
seg_output: list[tensor] | |
Returns: | |
boxlists (list[BoxList]): bounding boxes | |
""" | |
sampled_boxes = [] | |
boxes_batch, rotated_boxes_batch, polygons_batch, scores_batch = self.forward_for_single_feature_map(seg_output, image_shapes) | |
if not self.training: | |
return boxes_batch, rotated_boxes_batch, polygons_batch, scores_batch | |
sampled_boxes.append(boxes_batch) | |
boxlists = list(zip(*sampled_boxes)) | |
boxlists = [cat_boxlist(boxlist) for boxlist in boxlists] | |
# append ground-truth bboxes to proposals | |
if self.training and targets is not None: | |
boxlists = self.add_gt_proposals(boxlists, targets) | |
return boxlists | |
# def select_over_all_levels(self, boxlists): | |
# num_images = len(boxlists) | |
# # different behavior during training and during testing: | |
# # during training, post_nms_top_n is over *all* the proposals combined, while | |
# # during testing, it is over the proposals for each image | |
# # TODO resolve this difference and make it consistent. It should be per image, | |
# # and not per batch | |
# if self.training: | |
# objectness = torch.cat( | |
# [boxlist.get_field("objectness") for boxlist in boxlists], dim=0 | |
# ) | |
# box_sizes = [len(boxlist) for boxlist in boxlists] | |
# post_nms_top_n = min(self.fpn_post_nms_top_n, len(objectness)) | |
# _, inds_sorted = torch.topk(objectness, post_nms_top_n, dim=0, sorted=True) | |
# inds_mask = torch.zeros_like(objectness, dtype=torch.uint8) | |
# inds_mask[inds_sorted] = 1 | |
# inds_mask = inds_mask.split(box_sizes) | |
# for i in range(num_images): | |
# boxlists[i] = boxlists[i][inds_mask[i]] | |
# else: | |
# for i in range(num_images): | |
# objectness = boxlists[i].get_field("objectness") | |
# post_nms_top_n = min(self.fpn_post_nms_top_n, len(objectness)) | |
# _, inds_sorted = torch.topk( | |
# objectness, post_nms_top_n, dim=0, sorted=True | |
# ) | |
# boxlists[i] = boxlists[i][inds_sorted] | |
# return boxlists | |
def binarize(self, pred): | |
if self.cfg.MODEL.SEG.USE_MULTIPLE_THRESH: | |
binary_maps = [] | |
for thre in self.cfg.MODEL.SEG.MULTIPLE_THRESH: | |
binary_map = pred > thre | |
binary_maps.append(binary_map) | |
return torch.cat(binary_maps, dim=1) | |
else: | |
return pred > self.binary_thresh | |
def boxes_from_bitmap(self, pred, bitmap, dest_width, dest_height): | |
""" | |
_bitmap: single map with shape (1, H, W), | |
whose values are binarized as {0, 1} | |
""" | |
# assert _bitmap.size(0) == 1 | |
# bitmap = _bitmap[0] # The first channel | |
pred = pred[0] | |
height, width = bitmap.shape[1], bitmap.shape[2] | |
boxes = [] | |
scores = [] | |
rotated_boxes = [] | |
polygons = [] | |
contours_all = [] | |
for i in range(bitmap.shape[0]): | |
try: | |
_, contours, _ = cv2.findContours( | |
(bitmap[i] * 255).astype(np.uint8), | |
cv2.RETR_LIST, | |
cv2.CHAIN_APPROX_NONE, | |
) | |
except BaseException: | |
contours, _ = cv2.findContours( | |
(bitmap[i] * 255).astype(np.uint8), | |
cv2.RETR_LIST, | |
cv2.CHAIN_APPROX_NONE, | |
) | |
contours_all.extend(contours) | |
for contour in contours_all: | |
epsilon = 0.01 * cv2.arcLength(contour, True) | |
approx = cv2.approxPolyDP(contour, epsilon, True) | |
polygon = approx.reshape((-1, 2)) | |
points, sside = self.get_mini_boxes(contour) | |
if sside < self.min_size: | |
continue | |
points = np.array(points) | |
score = self.box_score_fast(pred, points) | |
if not self.training and self.box_thresh > score: | |
continue | |
if polygon.shape[0] > 2: | |
polygon = self.unclip(polygon, expand_ratio=self.cfg.MODEL.SEG.EXPAND_RATIO) | |
if len(polygon) > 1: | |
continue | |
else: | |
continue | |
# polygon = polygon.reshape(-1, 2) | |
polygon = polygon.reshape(-1) | |
box = self.unclip(points, expand_ratio=self.cfg.MODEL.SEG.BOX_EXPAND_RATIO).reshape(-1, 2) | |
box = np.array(box) | |
box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width) | |
box[:, 1] = np.clip( | |
np.round(box[:, 1] / height * dest_height), 0, dest_height | |
) | |
min_x, min_y = min(box[:, 0]), min(box[:, 1]) | |
max_x, max_y = max(box[:, 0]), max(box[:, 1]) | |
horizontal_box = torch.from_numpy(np.array([min_x, min_y, max_x, max_y])) | |
boxes.append(horizontal_box) | |
scores.append(score) | |
rotated_box, _ = self.get_mini_boxes(box.reshape(-1, 1, 2)) | |
rotated_box = np.array(rotated_box) | |
rotated_boxes.append(rotated_box) | |
polygons.append([polygon]) | |
if len(boxes) == 0: | |
boxes = [torch.from_numpy(np.array([0, 0, 0, 0]))] | |
scores = [0.] | |
boxes = torch.stack(boxes) | |
scores = torch.from_numpy(np.array(scores)) | |
return boxes, scores, rotated_boxes, polygons | |
def aug_proposals(self, box): | |
xmin, ymin, xmax, ymax = box[0], box[1], box[2], box[3] | |
x_center = int((xmin + xmax) / 2.) | |
y_center = int((ymin + ymax) / 2.) | |
width = xmax - xmin | |
height = ymax - ymin | |
choice = random.random() | |
if choice < 0.5: | |
# shrink or expand | |
ratio = (random.random() * 3 + 1) / 2. | |
height = height * ratio | |
ratio = (random.random() * 3 + 1) / 2. | |
width = width * ratio | |
else: | |
move_x = width * (random.random() * 4 - 2) | |
move_y = height * (random.random() * 4 - 2) | |
x_center += move_x | |
y_center += move_y | |
xmin = int(x_center - width / 2) | |
xmax = int(x_center + width / 2) | |
ymin = int(y_center - height / 2) | |
ymax = int(y_center + height / 2) | |
return [xmin, ymin, xmax, ymax] | |
def unclip(self, box, expand_ratio=1.5): | |
poly = Polygon(box) | |
distance = poly.area * expand_ratio / poly.length | |
offset = pyclipper.PyclipperOffset() | |
offset.AddPath(box, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON) | |
expanded = np.array(offset.Execute(distance)) | |
return expanded | |
def get_mini_boxes(self, contour): | |
bounding_box = cv2.minAreaRect(contour) | |
points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0]) | |
index_1, index_2, index_3, index_4 = 0, 1, 2, 3 | |
if points[1][1] > points[0][1]: | |
index_1 = 0 | |
index_4 = 1 | |
else: | |
index_1 = 1 | |
index_4 = 0 | |
if points[3][1] > points[2][1]: | |
index_2 = 2 | |
index_3 = 3 | |
else: | |
index_2 = 3 | |
index_3 = 2 | |
box = [points[index_1], points[index_2], points[index_3], points[index_4]] | |
return box, min(bounding_box[1]) | |
def box_score(self, bitmap, box): | |
""" | |
naive version of box score computation, | |
only for helping principle understand. | |
""" | |
mask = np.zeros_like(bitmap, dtype=np.uint8) | |
cv2.fillPoly(mask, box.reshape(1, 4, 2).astype(np.int32), 1) | |
return cv2.mean(bitmap, mask)[0] | |
def box_score_fast(self, bitmap, _box): | |
h, w = bitmap.shape[:2] | |
box = _box.copy() | |
xmin = np.clip(np.floor(box[:, 0].min()).astype(np.int), 0, w - 1) | |
xmax = np.clip(np.ceil(box[:, 0].max()).astype(np.int), 0, w - 1) | |
ymin = np.clip(np.floor(box[:, 1].min()).astype(np.int), 0, h - 1) | |
ymax = np.clip(np.ceil(box[:, 1].max()).astype(np.int), 0, h - 1) | |
mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8) | |
box[:, 0] = box[:, 0] - xmin | |
box[:, 1] = box[:, 1] - ymin | |
cv2.fillPoly(mask, box.reshape(1, 4, 2).astype(np.int32), 1) | |
return cv2.mean(bitmap[ymin : ymax + 1, xmin : xmax + 1], mask)[0] | |
def make_seg_postprocessor(config, is_train): | |
top_n = config.MODEL.SEG.TOP_N_TRAIN | |
if not is_train: | |
top_n = config.MODEL.SEG.TOP_N_TEST | |
binary_thresh = config.MODEL.SEG.BINARY_THRESH | |
box_thresh = config.MODEL.SEG.BOX_THRESH | |
min_size = config.MODEL.SEG.MIN_SIZE | |
box_selector = SEGPostProcessor( | |
top_n=top_n, | |
binary_thresh=binary_thresh, | |
box_thresh=box_thresh, | |
min_size=min_size, | |
cfg = config | |
) | |
return box_selector | |