File size: 11,316 Bytes
1850baa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# Copyright (c) Facebook, Inc. and its affiliates.
import numpy as np
from typing import List
import pycocotools.mask as mask_util
from detectron2.structures import Instances
from detectron2.utils.visualizer import (
ColorMode,
Visualizer,
_create_text_labels,
_PanopticPrediction,
)
from .colormap import random_color, random_colors
class _DetectedInstance:
"""
Used to store data about detected objects in video frame,
in order to transfer color to objects in the future frames.
Attributes:
label (int):
bbox (tuple[float]):
mask_rle (dict):
color (tuple[float]): RGB colors in range (0, 1)
ttl (int): time-to-live for the instance. For example, if ttl=2,
the instance color can be transferred to objects in the next two frames.
"""
__slots__ = ["label", "bbox", "mask_rle", "color", "ttl"]
def __init__(self, label, bbox, mask_rle, color, ttl):
self.label = label
self.bbox = bbox
self.mask_rle = mask_rle
self.color = color
self.ttl = ttl
class VideoVisualizer:
def __init__(self, metadata, instance_mode=ColorMode.IMAGE):
"""
Args:
metadata (MetadataCatalog): image metadata.
"""
self.metadata = metadata
self._old_instances = []
assert instance_mode in [
ColorMode.IMAGE,
ColorMode.IMAGE_BW,
], "Other mode not supported yet."
self._instance_mode = instance_mode
self._max_num_instances = self.metadata.get("max_num_instances", 74)
self._assigned_colors = {}
self._color_pool = random_colors(self._max_num_instances, rgb=True, maximum=1)
self._color_idx_set = set(range(len(self._color_pool)))
def draw_instance_predictions(self, frame, predictions):
"""
Draw instance-level prediction results on an image.
Args:
frame (ndarray): an RGB image of shape (H, W, C), in the range [0, 255].
predictions (Instances): the output of an instance detection/segmentation
model. Following fields will be used to draw:
"pred_boxes", "pred_classes", "scores", "pred_masks" (or "pred_masks_rle").
Returns:
output (VisImage): image object with visualizations.
"""
frame_visualizer = Visualizer(frame, self.metadata)
num_instances = len(predictions)
if num_instances == 0:
return frame_visualizer.output
boxes = predictions.pred_boxes.tensor.numpy() if predictions.has("pred_boxes") else None
scores = predictions.scores if predictions.has("scores") else None
classes = predictions.pred_classes.numpy() if predictions.has("pred_classes") else None
keypoints = predictions.pred_keypoints if predictions.has("pred_keypoints") else None
colors = predictions.COLOR if predictions.has("COLOR") else [None] * len(predictions)
periods = predictions.ID_period if predictions.has("ID_period") else None
period_threshold = self.metadata.get("period_threshold", 0)
visibilities = (
[True] * len(predictions)
if periods is None
else [x > period_threshold for x in periods]
)
if predictions.has("pred_masks"):
masks = predictions.pred_masks
# mask IOU is not yet enabled
# masks_rles = mask_util.encode(np.asarray(masks.permute(1, 2, 0), order="F"))
# assert len(masks_rles) == num_instances
else:
masks = None
if not predictions.has("COLOR"):
if predictions.has("ID"):
colors = self._assign_colors_by_id(predictions)
else:
# ToDo: clean old assign color method and use a default tracker to assign id
detected = [
_DetectedInstance(classes[i], boxes[i], mask_rle=None, color=colors[i], ttl=8)
for i in range(num_instances)
]
colors = self._assign_colors(detected)
labels = _create_text_labels(classes, scores, self.metadata.get("thing_classes", None))
if self._instance_mode == ColorMode.IMAGE_BW:
# any() returns uint8 tensor
frame_visualizer.output.reset_image(
frame_visualizer._create_grayscale_image(
(masks.any(dim=0) > 0).numpy() if masks is not None else None
)
)
alpha = 0.3
else:
alpha = 0.5
labels = (
None
if labels is None
else [y[0] for y in filter(lambda x: x[1], zip(labels, visibilities))]
) # noqa
assigned_colors = (
None
if colors is None
else [y[0] for y in filter(lambda x: x[1], zip(colors, visibilities))]
) # noqa
frame_visualizer.overlay_instances(
boxes=None if masks is not None else boxes[visibilities], # boxes are a bit distracting
masks=None if masks is None else masks[visibilities],
labels=labels,
keypoints=None if keypoints is None else keypoints[visibilities],
assigned_colors=assigned_colors,
alpha=alpha,
)
return frame_visualizer.output
def draw_sem_seg(self, frame, sem_seg, area_threshold=None):
"""
Args:
sem_seg (ndarray or Tensor): semantic segmentation of shape (H, W),
each value is the integer label.
area_threshold (Optional[int]): only draw segmentations larger than the threshold
"""
# don't need to do anything special
frame_visualizer = Visualizer(frame, self.metadata)
frame_visualizer.draw_sem_seg(sem_seg, area_threshold=None)
return frame_visualizer.output
def draw_panoptic_seg_predictions(
self, frame, panoptic_seg, segments_info, area_threshold=None, alpha=0.5
):
frame_visualizer = Visualizer(frame, self.metadata)
pred = _PanopticPrediction(panoptic_seg, segments_info, self.metadata)
if self._instance_mode == ColorMode.IMAGE_BW:
frame_visualizer.output.reset_image(
frame_visualizer._create_grayscale_image(pred.non_empty_mask())
)
# draw mask for all semantic segments first i.e. "stuff"
for mask, sinfo in pred.semantic_masks():
category_idx = sinfo["category_id"]
try:
mask_color = [x / 255 for x in self.metadata.stuff_colors[category_idx]]
except AttributeError:
mask_color = None
frame_visualizer.draw_binary_mask(
mask,
color=mask_color,
text=self.metadata.stuff_classes[category_idx],
alpha=alpha,
area_threshold=area_threshold,
)
all_instances = list(pred.instance_masks())
if len(all_instances) == 0:
return frame_visualizer.output
# draw mask for all instances second
masks, sinfo = list(zip(*all_instances))
num_instances = len(masks)
masks_rles = mask_util.encode(
np.asarray(np.asarray(masks).transpose(1, 2, 0), dtype=np.uint8, order="F")
)
assert len(masks_rles) == num_instances
category_ids = [x["category_id"] for x in sinfo]
detected = [
_DetectedInstance(category_ids[i], bbox=None, mask_rle=masks_rles[i], color=None, ttl=8)
for i in range(num_instances)
]
colors = self._assign_colors(detected)
labels = [self.metadata.thing_classes[k] for k in category_ids]
frame_visualizer.overlay_instances(
boxes=None,
masks=masks,
labels=labels,
keypoints=None,
assigned_colors=colors,
alpha=alpha,
)
return frame_visualizer.output
def _assign_colors(self, instances):
"""
Naive tracking heuristics to assign same color to the same instance,
will update the internal state of tracked instances.
Returns:
list[tuple[float]]: list of colors.
"""
# Compute iou with either boxes or masks:
is_crowd = np.zeros((len(instances),), dtype=bool)
if instances[0].bbox is None:
assert instances[0].mask_rle is not None
# use mask iou only when box iou is None
# because box seems good enough
rles_old = [x.mask_rle for x in self._old_instances]
rles_new = [x.mask_rle for x in instances]
ious = mask_util.iou(rles_old, rles_new, is_crowd)
threshold = 0.5
else:
boxes_old = [x.bbox for x in self._old_instances]
boxes_new = [x.bbox for x in instances]
ious = mask_util.iou(boxes_old, boxes_new, is_crowd)
threshold = 0.6
if len(ious) == 0:
ious = np.zeros((len(self._old_instances), len(instances)), dtype="float32")
# Only allow matching instances of the same label:
for old_idx, old in enumerate(self._old_instances):
for new_idx, new in enumerate(instances):
if old.label != new.label:
ious[old_idx, new_idx] = 0
matched_new_per_old = np.asarray(ious).argmax(axis=1)
max_iou_per_old = np.asarray(ious).max(axis=1)
# Try to find match for each old instance:
extra_instances = []
for idx, inst in enumerate(self._old_instances):
if max_iou_per_old[idx] > threshold:
newidx = matched_new_per_old[idx]
if instances[newidx].color is None:
instances[newidx].color = inst.color
continue
# If an old instance does not match any new instances,
# keep it for the next frame in case it is just missed by the detector
inst.ttl -= 1
if inst.ttl > 0:
extra_instances.append(inst)
# Assign random color to newly-detected instances:
for inst in instances:
if inst.color is None:
inst.color = random_color(rgb=True, maximum=1)
self._old_instances = instances[:] + extra_instances
return [d.color for d in instances]
def _assign_colors_by_id(self, instances: Instances) -> List:
colors = []
untracked_ids = set(self._assigned_colors.keys())
for id in instances.ID:
if id in self._assigned_colors:
colors.append(self._color_pool[self._assigned_colors[id]])
untracked_ids.remove(id)
else:
assert (
len(self._color_idx_set) >= 1
), f"Number of id exceeded maximum, \
max = {self._max_num_instances}"
idx = self._color_idx_set.pop()
color = self._color_pool[idx]
self._assigned_colors[id] = idx
colors.append(color)
for id in untracked_ids:
self._color_idx_set.add(self._assigned_colors[id])
del self._assigned_colors[id]
return colors
|