# coding=utf-8 # Copyright 2021 The Deeplab2 Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """This file contains functions to post-process Panoptic-DeepLab results.""" import functools from typing import Tuple, Dict, Text import tensorflow as tf from deeplab2 import common from deeplab2 import config_pb2 from deeplab2.data import dataset from deeplab2.model import utils from deeplab2.tensorflow_ops.python.ops import merge_semantic_and_instance_maps_op as merge_ops def _get_semantic_predictions(semantic_logits: tf.Tensor) -> tf.Tensor: """Computes the semantic classes from the predictions. Args: semantic_logits: A tf.tensor of shape [batch, height, width, classes]. Returns: A tf.Tensor containing the semantic class prediction of shape [batch, height, width]. """ return tf.argmax(semantic_logits, axis=-1, output_type=tf.int32) def _get_instance_centers_from_heatmap( center_heatmap: tf.Tensor, center_threshold: float, nms_kernel_size: int, keep_k_centers: int) -> Tuple[tf.Tensor, tf.Tensor]: """Computes a list of instance centers. Args: center_heatmap: A tf.Tensor of shape [height, width, 1]. center_threshold: A float setting the threshold for the center heatmap. nms_kernel_size: An integer specifying the nms kernel size. keep_k_centers: An integer specifying the number of centers to keep (K). Non-positive values will keep all centers. Returns: A tuple of - tf.Tensor of shape [N, 2] containing N center coordinates (after non-maximum suppression) in (y, x) order. - tf.Tensor of shape [height, width] containing the center heatmap after non-maximum suppression. """ # Threshold center map. center_heatmap = tf.where( tf.greater(center_heatmap, center_threshold), center_heatmap, 0.0) # Non-maximum suppression. padded_map = utils.add_zero_padding(center_heatmap, nms_kernel_size, rank=3) pooled_center_heatmap = tf.keras.backend.pool2d( tf.expand_dims(padded_map, 0), pool_size=(nms_kernel_size, nms_kernel_size), strides=(1, 1), padding='valid', pool_mode='max') center_heatmap = tf.where( tf.equal(pooled_center_heatmap, center_heatmap), center_heatmap, 0.0) center_heatmap = tf.squeeze(center_heatmap, axis=[0, 3]) # `centers` is of shape (N, 2) with (y, x) order of the second dimension. centers = tf.where(tf.greater(center_heatmap, 0.0)) if keep_k_centers > 0 and tf.shape(centers)[0] > keep_k_centers: topk_scores, _ = tf.math.top_k( tf.reshape(center_heatmap, [-1]), keep_k_centers, sorted=False) centers = tf.where(tf.greater(center_heatmap, topk_scores[-1])) return centers, center_heatmap def _find_closest_center_per_pixel(centers: tf.Tensor, center_offsets: tf.Tensor) -> tf.Tensor: """Assigns all pixels to their closest center. Args: centers: A tf.Tensor of shape [N, 2] containing N centers with coordinate order (y, x). center_offsets: A tf.Tensor of shape [height, width, 2]. Returns: A tf.Tensor of shape [height, width] containing the index of the closest center, per pixel. """ height = tf.shape(center_offsets)[0] width = tf.shape(center_offsets)[1] x_coord, y_coord = tf.meshgrid(tf.range(width), tf.range(height)) coord = tf.stack([y_coord, x_coord], axis=-1) center_per_pixel = tf.cast(coord, tf.float32) + center_offsets # centers: [N, 2] -> [N, 1, 2]. # center_per_pixel: [H, W, 2] -> [1, H*W, 2]. centers = tf.cast(tf.expand_dims(centers, 1), tf.float32) center_per_pixel = tf.reshape(center_per_pixel, [height*width, 2]) center_per_pixel = tf.expand_dims(center_per_pixel, 0) # distances: [N, H*W]. distances = tf.norm(centers - center_per_pixel, axis=-1) return tf.reshape(tf.argmin(distances, axis=0), [height, width]) def _get_instances_from_heatmap_and_offset( semantic_segmentation: tf.Tensor, center_heatmap: tf.Tensor, center_offsets: tf.Tensor, center_threshold: float, thing_class_ids: tf.Tensor, nms_kernel_size: int, keep_k_centers: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]: """Computes the instance assignment per pixel. Args: semantic_segmentation: A tf.Tensor containing the semantic labels of shape [height, width]. center_heatmap: A tf.Tensor of shape [height, width, 1]. center_offsets: A tf.Tensor of shape [height, width, 2]. center_threshold: A float setting the threshold for the center heatmap. thing_class_ids: A tf.Tensor of shape [N] containing N thing indices. nms_kernel_size: An integer specifying the nms kernel size. keep_k_centers: An integer specifying the number of centers to keep. Negative values will keep all centers. Returns: A tuple of: - tf.Tensor containing the instance segmentation (filtered with the `thing` segmentation from the semantic segmentation output) with shape [height, width]. - tf.Tensor containing the processed centermap with shape [height, width]. - tf.Tensor containing instance scores (where higher "score" is a reasonable signal of a higher confidence detection.) Will be of shape [height, width] with the score for a pixel being the score of the instance it belongs to. The scores will be zero for pixels in background/"stuff" regions. """ thing_segmentation = tf.zeros_like(semantic_segmentation) for thing_id in thing_class_ids: thing_segmentation = tf.where(tf.equal(semantic_segmentation, thing_id), 1, thing_segmentation) centers, processed_center_heatmap = _get_instance_centers_from_heatmap( center_heatmap, center_threshold, nms_kernel_size, keep_k_centers) if tf.shape(centers)[0] == 0: return (tf.zeros_like(semantic_segmentation), processed_center_heatmap, tf.zeros_like(processed_center_heatmap)) instance_center_index = _find_closest_center_per_pixel( centers, center_offsets) # Instance IDs should start with 1. So we use the index into the centers, but # shifted by 1. instance_segmentation = tf.cast(instance_center_index, tf.int32) + 1 # The value of the heatmap at an instance's center is used as the score # for that instance. instance_scores = tf.gather_nd(processed_center_heatmap, centers) tf.debugging.assert_shapes([ (centers, ('N', 2)), (instance_scores, ('N',)), ]) # This will map the instance scores back to the image space: where each pixel # has a value equal to the score of its instance. flat_center_index = tf.reshape(instance_center_index, [-1]) instance_score_map = tf.gather(instance_scores, flat_center_index) instance_score_map = tf.reshape(instance_score_map, tf.shape(instance_segmentation)) instance_score_map *= tf.cast(thing_segmentation, tf.float32) return (thing_segmentation * instance_segmentation, processed_center_heatmap, instance_score_map) @tf.function def _get_panoptic_predictions( semantic_logits: tf.Tensor, center_heatmap: tf.Tensor, center_offsets: tf.Tensor, center_threshold: float, thing_class_ids: tf.Tensor, label_divisor: int, stuff_area_limit: int, void_label: int, nms_kernel_size: int, keep_k_centers: int, merge_semantic_and_instance_with_tf_op: bool ) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor, tf.Tensor, tf.Tensor]: """Computes the semantic class and instance ID per pixel. Args: semantic_logits: A tf.Tensor of shape [batch, height, width, classes]. center_heatmap: A tf.Tensor of shape [batch, height, width, 1]. center_offsets: A tf.Tensor of shape [batch, height, width, 2]. center_threshold: A float setting the threshold for the center heatmap. thing_class_ids: A tf.Tensor of shape [N] containing N thing indices. label_divisor: An integer specifying the label divisor of the dataset. stuff_area_limit: An integer specifying the number of pixels that stuff regions need to have at least. The stuff region will be included in the panoptic prediction, only if its area is larger than the limit; otherwise, it will be re-assigned as void_label. void_label: An integer specifying the void label. nms_kernel_size: An integer specifying the nms kernel size. keep_k_centers: An integer specifying the number of centers to keep. Negative values will keep all centers. merge_semantic_and_instance_with_tf_op: Boolean, specifying the merging operation uses TensorFlow (CUDA kernel) implementation (True) or tf.py_function implementation (False). Note the tf.py_function implementation is simply used as a backup solution when you could not successfully compile the provided TensorFlow implementation. To reproduce our results, please use the provided TensorFlow implementation `merge_ops` (i.e., set to True). Returns: A tuple of: - the panoptic prediction as tf.Tensor with shape [batch, height, width]. - the semantic prediction as tf.Tensor with shape [batch, height, width]. - the instance prediction as tf.Tensor with shape [batch, height, width]. - the centermap prediction as tf.Tensor with shape [batch, height, width]. - the instance score maps as tf.Tensor with shape [batch, height, width]. """ semantic_prediction = _get_semantic_predictions(semantic_logits) batch_size = tf.shape(semantic_logits)[0] instance_map_lists = tf.TensorArray( tf.int32, size=batch_size, dynamic_size=False) center_map_lists = tf.TensorArray( tf.float32, size=batch_size, dynamic_size=False) instance_score_map_lists = tf.TensorArray( tf.float32, size=batch_size, dynamic_size=False) for i in tf.range(batch_size): (instance_map, center_map, instance_score_map) = _get_instances_from_heatmap_and_offset( semantic_prediction[i, ...], center_heatmap[i, ...], center_offsets[i, ...], center_threshold, thing_class_ids, nms_kernel_size, keep_k_centers) instance_map_lists = instance_map_lists.write(i, instance_map) center_map_lists = center_map_lists.write(i, center_map) instance_score_map_lists = instance_score_map_lists.write( i, instance_score_map) # This does not work with unknown shapes. instance_maps = instance_map_lists.stack() center_maps = center_map_lists.stack() instance_score_maps = instance_score_map_lists.stack() if merge_semantic_and_instance_with_tf_op: panoptic_prediction = merge_ops.merge_semantic_and_instance_maps( semantic_prediction, instance_maps, thing_class_ids, label_divisor, stuff_area_limit, void_label) else: panoptic_prediction = _merge_semantic_and_instance_maps( semantic_prediction, instance_maps, thing_class_ids, label_divisor, stuff_area_limit, void_label) return (panoptic_prediction, semantic_prediction, instance_maps, center_maps, instance_score_maps) @tf.function def _merge_semantic_and_instance_maps( semantic_prediction: tf.Tensor, instance_maps: tf.Tensor, thing_class_ids: tf.Tensor, label_divisor: int, stuff_area_limit: int, void_label: int) -> tf.Tensor: """Merges semantic and instance maps to obtain panoptic segmentation. This function merges the semantic segmentation and class-agnostic instance segmentation to form the panoptic segmentation. In particular, the class label of each instance mask is inferred from the majority votes from the corresponding pixels in the semantic segmentation. This operation is first poposed in the DeeperLab paper and adopted by the Panoptic-DeepLab. - DeeperLab: Single-Shot Image Parser, T-J Yang, et al. arXiv:1902.05093. - Panoptic-DeepLab, B. Cheng, et al. In CVPR, 2020. Note that this function only supports batch = 1 for simplicity. Additionally, this function has a slightly different implementation from the provided TensorFlow implementation `merge_ops` but with a similar performance. This function is mainly used as a backup solution when you could not successfully compile the provided TensorFlow implementation. To reproduce our results, please use the provided TensorFlow implementation (i.e., not use this function, but the `merge_ops.merge_semantic_and_instance_maps`). Args: semantic_prediction: A tf.Tensor of shape [batch, height, width]. instance_maps: A tf.Tensor of shape [batch, height, width]. thing_class_ids: A tf.Tensor of shape [N] containing N thing indices. label_divisor: An integer specifying the label divisor of the dataset. stuff_area_limit: An integer specifying the number of pixels that stuff regions need to have at least. The stuff region will be included in the panoptic prediction, only if its area is larger than the limit; otherwise, it will be re-assigned as void_label. void_label: An integer specifying the void label. Returns: panoptic_prediction: A tf.Tensor with shape [batch, height, width]. """ prediction_shape = semantic_prediction.get_shape().as_list() # This implementation only supports batch size of 1. Since model construction # might lose batch size information (and leave it to None), override it here. prediction_shape[0] = 1 semantic_prediction = tf.ensure_shape(semantic_prediction, prediction_shape) instance_maps = tf.ensure_shape(instance_maps, prediction_shape) # Default panoptic_prediction to have semantic label = void_label. panoptic_prediction = tf.ones_like( semantic_prediction) * void_label * label_divisor # Start to paste predicted `thing` regions to panoptic_prediction. # Infer `thing` segmentation regions from semantic prediction. semantic_thing_segmentation = tf.zeros_like(semantic_prediction, dtype=tf.bool) for thing_class in thing_class_ids: semantic_thing_segmentation = tf.math.logical_or( semantic_thing_segmentation, semantic_prediction == thing_class) # Keep track of how many instances for each semantic label. num_instance_per_semantic_label = tf.TensorArray( tf.int32, size=0, dynamic_size=True, clear_after_read=False) instance_ids, _ = tf.unique(tf.reshape(instance_maps, [-1])) for instance_id in instance_ids: # Instance ID 0 is reserved for crowd region. if instance_id == 0: continue thing_mask = tf.math.logical_and(instance_maps == instance_id, semantic_thing_segmentation) if tf.reduce_sum(tf.cast(thing_mask, tf.int32)) == 0: continue semantic_bin_counts = tf.math.bincount( tf.boolean_mask(semantic_prediction, thing_mask)) semantic_majority = tf.cast( tf.math.argmax(semantic_bin_counts), tf.int32) while num_instance_per_semantic_label.size() <= semantic_majority: num_instance_per_semantic_label = num_instance_per_semantic_label.write( num_instance_per_semantic_label.size(), 0) new_instance_id = ( num_instance_per_semantic_label.read(semantic_majority) + 1) num_instance_per_semantic_label = num_instance_per_semantic_label.write( semantic_majority, new_instance_id) panoptic_prediction = tf.where( thing_mask, tf.ones_like(panoptic_prediction) * semantic_majority * label_divisor + new_instance_id, panoptic_prediction) # Done with `num_instance_per_semantic_label` tensor array. num_instance_per_semantic_label.close() # Start to paste predicted `stuff` regions to panoptic prediction. instance_stuff_regions = instance_maps == 0 semantic_ids, _ = tf.unique(tf.reshape(semantic_prediction, [-1])) for semantic_id in semantic_ids: if tf.reduce_sum(tf.cast(thing_class_ids == semantic_id, tf.int32)) > 0: continue # Check stuff area. stuff_mask = tf.math.logical_and(semantic_prediction == semantic_id, instance_stuff_regions) stuff_area = tf.reduce_sum(tf.cast(stuff_mask, tf.int32)) if stuff_area >= stuff_area_limit: panoptic_prediction = tf.where( stuff_mask, tf.ones_like(panoptic_prediction) * semantic_id * label_divisor, panoptic_prediction) return panoptic_prediction class SemanticOnlyPostProcessor(tf.keras.layers.Layer): """This class contains code of a semantic only post-processor.""" def __init__(self): """Initializes a semantic only post-processor.""" super(SemanticOnlyPostProcessor, self).__init__( name='SemanticOnlyPostProcessor') def call(self, result_dict: Dict[Text, tf.Tensor]) -> Dict[Text, tf.Tensor]: """Performs the post-processing given model predicted results. Args: result_dict: A dictionary of tf.Tensor containing model results. The dict has to contain - common.PRED_SEMANTIC_PROBS_KEY, Returns: The post-processed dict of tf.Tensor, containing the following: - common.PRED_SEMANTIC_KEY, """ processed_dict = {} processed_dict[common.PRED_SEMANTIC_KEY] = _get_semantic_predictions( result_dict[common.PRED_SEMANTIC_PROBS_KEY]) return processed_dict class PostProcessor(tf.keras.layers.Layer): """This class contains code of a Panoptic-Deeplab post-processor.""" def __init__( self, config: config_pb2.ExperimentOptions, dataset_descriptor: dataset.DatasetDescriptor): """Initializes a Panoptic-Deeplab post-processor. Args: config: A config_pb2.ExperimentOptions configuration. dataset_descriptor: A dataset.DatasetDescriptor. """ super(PostProcessor, self).__init__(name='PostProcessor') self._post_processor = functools.partial( _get_panoptic_predictions, center_threshold=config.evaluator_options.center_score_threshold, thing_class_ids=tf.convert_to_tensor( dataset_descriptor.class_has_instances_list), label_divisor=dataset_descriptor.panoptic_label_divisor, stuff_area_limit=config.evaluator_options.stuff_area_limit, void_label=dataset_descriptor.ignore_label, nms_kernel_size=config.evaluator_options.nms_kernel, keep_k_centers=config.evaluator_options.keep_k_centers, merge_semantic_and_instance_with_tf_op=( config.evaluator_options.merge_semantic_and_instance_with_tf_op), ) def call(self, result_dict: Dict[Text, tf.Tensor]) -> Dict[Text, tf.Tensor]: """Performs the post-processing given model predicted results. Args: result_dict: A dictionary of tf.Tensor containing model results. The dict has to contain - common.PRED_SEMANTIC_PROBS_KEY, - common.PRED_CENTER_HEATMAP_KEY, - common.PRED_OFFSET_MAP_KEY, Returns: The post-processed dict of tf.Tensor, containing the following: - common.PRED_SEMANTIC_KEY, - common.PRED_INSTANCE_KEY, - common.PRED_PANOPTIC_KEY, - common.PRED_INSTANCE_CENTER_KEY, - common.PRED_INSTANCE_SCORES_KEY, """ processed_dict = {} (processed_dict[common.PRED_PANOPTIC_KEY], processed_dict[common.PRED_SEMANTIC_KEY], processed_dict[common.PRED_INSTANCE_KEY], processed_dict[common.PRED_INSTANCE_CENTER_KEY], processed_dict[common.PRED_INSTANCE_SCORES_KEY] ) = self._post_processor( result_dict[common.PRED_SEMANTIC_PROBS_KEY], result_dict[common.PRED_CENTER_HEATMAP_KEY], result_dict[common.PRED_OFFSET_MAP_KEY]) return processed_dict