|
|
|
|
|
|
|
import contextlib |
|
import copy |
|
import io |
|
import itertools |
|
import logging |
|
import numpy as np |
|
import os |
|
from collections import OrderedDict |
|
from typing import Dict, Iterable, List, Optional |
|
import pycocotools.mask as mask_utils |
|
import torch |
|
from pycocotools.coco import COCO |
|
from tabulate import tabulate |
|
|
|
from detectron2.config import CfgNode |
|
from detectron2.data import MetadataCatalog |
|
from detectron2.evaluation import DatasetEvaluator |
|
from detectron2.structures import BoxMode |
|
from detectron2.utils.comm import gather, get_rank, is_main_process, synchronize |
|
from detectron2.utils.file_io import PathManager |
|
from detectron2.utils.logger import create_small_table |
|
|
|
from densepose.converters import ToChartResultConverter, ToMaskConverter |
|
from densepose.data.datasets.coco import maybe_filter_and_map_categories_cocoapi |
|
from densepose.structures import ( |
|
DensePoseChartPredictorOutput, |
|
DensePoseEmbeddingPredictorOutput, |
|
quantize_densepose_chart_result, |
|
) |
|
|
|
from .densepose_coco_evaluation import DensePoseCocoEval, DensePoseEvalMode |
|
from .mesh_alignment_evaluator import MeshAlignmentEvaluator |
|
from .tensor_storage import ( |
|
SingleProcessFileTensorStorage, |
|
SingleProcessRamTensorStorage, |
|
SingleProcessTensorStorage, |
|
SizeData, |
|
storage_gather, |
|
) |
|
|
|
|
|
class DensePoseCOCOEvaluator(DatasetEvaluator): |
|
def __init__( |
|
self, |
|
dataset_name, |
|
distributed, |
|
output_dir=None, |
|
evaluator_type: str = "iuv", |
|
min_iou_threshold: float = 0.5, |
|
storage: Optional[SingleProcessTensorStorage] = None, |
|
embedder=None, |
|
should_evaluate_mesh_alignment: bool = False, |
|
mesh_alignment_mesh_names: Optional[List[str]] = None, |
|
): |
|
self._embedder = embedder |
|
self._distributed = distributed |
|
self._output_dir = output_dir |
|
self._evaluator_type = evaluator_type |
|
self._storage = storage |
|
self._should_evaluate_mesh_alignment = should_evaluate_mesh_alignment |
|
|
|
assert not ( |
|
should_evaluate_mesh_alignment and embedder is None |
|
), "Mesh alignment evaluation is activated, but no vertex embedder provided!" |
|
if should_evaluate_mesh_alignment: |
|
self._mesh_alignment_evaluator = MeshAlignmentEvaluator( |
|
embedder, |
|
mesh_alignment_mesh_names, |
|
) |
|
|
|
self._cpu_device = torch.device("cpu") |
|
self._logger = logging.getLogger(__name__) |
|
|
|
self._metadata = MetadataCatalog.get(dataset_name) |
|
self._min_threshold = min_iou_threshold |
|
json_file = PathManager.get_local_path(self._metadata.json_file) |
|
with contextlib.redirect_stdout(io.StringIO()): |
|
self._coco_api = COCO(json_file) |
|
maybe_filter_and_map_categories_cocoapi(dataset_name, self._coco_api) |
|
|
|
def reset(self): |
|
self._predictions = [] |
|
|
|
def process(self, inputs, outputs): |
|
""" |
|
Args: |
|
inputs: the inputs to a COCO model (e.g., GeneralizedRCNN). |
|
It is a list of dict. Each dict corresponds to an image and |
|
contains keys like "height", "width", "file_name", "image_id". |
|
outputs: the outputs of a COCO model. It is a list of dicts with key |
|
"instances" that contains :class:`Instances`. |
|
The :class:`Instances` object needs to have `densepose` field. |
|
""" |
|
for input, output in zip(inputs, outputs): |
|
instances = output["instances"].to(self._cpu_device) |
|
if not instances.has("pred_densepose"): |
|
continue |
|
prediction_list = prediction_to_dict( |
|
instances, |
|
input["image_id"], |
|
self._embedder, |
|
self._metadata.class_to_mesh_name, |
|
self._storage is not None, |
|
) |
|
if self._storage is not None: |
|
for prediction_dict in prediction_list: |
|
dict_to_store = {} |
|
for field_name in self._storage.data_schema: |
|
dict_to_store[field_name] = prediction_dict[field_name] |
|
record_id = self._storage.put(dict_to_store) |
|
prediction_dict["record_id"] = record_id |
|
prediction_dict["rank"] = get_rank() |
|
for field_name in self._storage.data_schema: |
|
del prediction_dict[field_name] |
|
self._predictions.extend(prediction_list) |
|
|
|
def evaluate(self, img_ids=None): |
|
if self._distributed: |
|
synchronize() |
|
predictions = gather(self._predictions) |
|
predictions = list(itertools.chain(*predictions)) |
|
else: |
|
predictions = self._predictions |
|
|
|
multi_storage = storage_gather(self._storage) if self._storage is not None else None |
|
|
|
if not is_main_process(): |
|
return |
|
return copy.deepcopy(self._eval_predictions(predictions, multi_storage, img_ids)) |
|
|
|
def _eval_predictions(self, predictions, multi_storage=None, img_ids=None): |
|
""" |
|
Evaluate predictions on densepose. |
|
Return results with the metrics of the tasks. |
|
""" |
|
self._logger.info("Preparing results for COCO format ...") |
|
|
|
if self._output_dir: |
|
PathManager.mkdirs(self._output_dir) |
|
file_path = os.path.join(self._output_dir, "coco_densepose_predictions.pth") |
|
with PathManager.open(file_path, "wb") as f: |
|
torch.save(predictions, f) |
|
|
|
self._logger.info("Evaluating predictions ...") |
|
res = OrderedDict() |
|
results_gps, results_gpsm, results_segm = _evaluate_predictions_on_coco( |
|
self._coco_api, |
|
predictions, |
|
multi_storage, |
|
self._embedder, |
|
class_names=self._metadata.get("thing_classes"), |
|
min_threshold=self._min_threshold, |
|
img_ids=img_ids, |
|
) |
|
res["densepose_gps"] = results_gps |
|
res["densepose_gpsm"] = results_gpsm |
|
res["densepose_segm"] = results_segm |
|
if self._should_evaluate_mesh_alignment: |
|
res["densepose_mesh_alignment"] = self._evaluate_mesh_alignment() |
|
return res |
|
|
|
def _evaluate_mesh_alignment(self): |
|
self._logger.info("Mesh alignment evaluation ...") |
|
mean_ge, mean_gps, per_mesh_metrics = self._mesh_alignment_evaluator.evaluate() |
|
results = { |
|
"GE": mean_ge * 100, |
|
"GPS": mean_gps * 100, |
|
} |
|
mesh_names = set() |
|
for metric_name in per_mesh_metrics: |
|
for mesh_name, value in per_mesh_metrics[metric_name].items(): |
|
results[f"{metric_name}-{mesh_name}"] = value * 100 |
|
mesh_names.add(mesh_name) |
|
self._print_mesh_alignment_results(results, mesh_names) |
|
return results |
|
|
|
def _print_mesh_alignment_results(self, results: Dict[str, float], mesh_names: Iterable[str]): |
|
self._logger.info("Evaluation results for densepose, mesh alignment:") |
|
self._logger.info(f'| {"Mesh":13s} | {"GErr":7s} | {"GPS":7s} |') |
|
self._logger.info("| :-----------: | :-----: | :-----: |") |
|
for mesh_name in mesh_names: |
|
ge_key = f"GE-{mesh_name}" |
|
ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " " |
|
gps_key = f"GPS-{mesh_name}" |
|
gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " " |
|
self._logger.info(f"| {mesh_name:13s} | {ge_str:7s} | {gps_str:7s} |") |
|
self._logger.info("| :-------------------------------: |") |
|
ge_key = "GE" |
|
ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " " |
|
gps_key = "GPS" |
|
gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " " |
|
self._logger.info(f'| {"MEAN":13s} | {ge_str:7s} | {gps_str:7s} |') |
|
|
|
|
|
def prediction_to_dict(instances, img_id, embedder, class_to_mesh_name, use_storage): |
|
""" |
|
Args: |
|
instances (Instances): the output of the model |
|
img_id (str): the image id in COCO |
|
|
|
Returns: |
|
list[dict]: the results in densepose evaluation format |
|
""" |
|
scores = instances.scores.tolist() |
|
classes = instances.pred_classes.tolist() |
|
raw_boxes_xywh = BoxMode.convert( |
|
instances.pred_boxes.tensor.clone(), BoxMode.XYXY_ABS, BoxMode.XYWH_ABS |
|
) |
|
|
|
if isinstance(instances.pred_densepose, DensePoseEmbeddingPredictorOutput): |
|
results_densepose = densepose_cse_predictions_to_dict( |
|
instances, embedder, class_to_mesh_name, use_storage |
|
) |
|
elif isinstance(instances.pred_densepose, DensePoseChartPredictorOutput): |
|
if not use_storage: |
|
results_densepose = densepose_chart_predictions_to_dict(instances) |
|
else: |
|
results_densepose = densepose_chart_predictions_to_storage_dict(instances) |
|
|
|
results = [] |
|
for k in range(len(instances)): |
|
result = { |
|
"image_id": img_id, |
|
"category_id": classes[k], |
|
"bbox": raw_boxes_xywh[k].tolist(), |
|
"score": scores[k], |
|
} |
|
results.append({**result, **results_densepose[k]}) |
|
return results |
|
|
|
|
|
def densepose_chart_predictions_to_dict(instances): |
|
segmentations = ToMaskConverter.convert( |
|
instances.pred_densepose, instances.pred_boxes, instances.image_size |
|
) |
|
|
|
results = [] |
|
for k in range(len(instances)): |
|
densepose_results_quantized = quantize_densepose_chart_result( |
|
ToChartResultConverter.convert(instances.pred_densepose[k], instances.pred_boxes[k]) |
|
) |
|
densepose_results_quantized.labels_uv_uint8 = ( |
|
densepose_results_quantized.labels_uv_uint8.cpu() |
|
) |
|
segmentation = segmentations.tensor[k] |
|
segmentation_encoded = mask_utils.encode( |
|
np.require(segmentation.numpy(), dtype=np.uint8, requirements=["F"]) |
|
) |
|
segmentation_encoded["counts"] = segmentation_encoded["counts"].decode("utf-8") |
|
result = { |
|
"densepose": densepose_results_quantized, |
|
"segmentation": segmentation_encoded, |
|
} |
|
results.append(result) |
|
return results |
|
|
|
|
|
def densepose_chart_predictions_to_storage_dict(instances): |
|
results = [] |
|
for k in range(len(instances)): |
|
densepose_predictor_output = instances.pred_densepose[k] |
|
result = { |
|
"coarse_segm": densepose_predictor_output.coarse_segm.squeeze(0).cpu(), |
|
"fine_segm": densepose_predictor_output.fine_segm.squeeze(0).cpu(), |
|
"u": densepose_predictor_output.u.squeeze(0).cpu(), |
|
"v": densepose_predictor_output.v.squeeze(0).cpu(), |
|
} |
|
results.append(result) |
|
return results |
|
|
|
|
|
def densepose_cse_predictions_to_dict(instances, embedder, class_to_mesh_name, use_storage): |
|
results = [] |
|
for k in range(len(instances)): |
|
cse = instances.pred_densepose[k] |
|
results.append( |
|
{ |
|
"coarse_segm": cse.coarse_segm[0].cpu(), |
|
"embedding": cse.embedding[0].cpu(), |
|
} |
|
) |
|
return results |
|
|
|
|
|
def _evaluate_predictions_on_coco( |
|
coco_gt, |
|
coco_results, |
|
multi_storage=None, |
|
embedder=None, |
|
class_names=None, |
|
min_threshold: float = 0.5, |
|
img_ids=None, |
|
): |
|
logger = logging.getLogger(__name__) |
|
|
|
densepose_metrics = _get_densepose_metrics(min_threshold) |
|
if len(coco_results) == 0: |
|
logger.warn("No predictions from the model! Set scores to -1") |
|
results_gps = {metric: -1 for metric in densepose_metrics} |
|
results_gpsm = {metric: -1 for metric in densepose_metrics} |
|
results_segm = {metric: -1 for metric in densepose_metrics} |
|
return results_gps, results_gpsm, results_segm |
|
|
|
coco_dt = coco_gt.loadRes(coco_results) |
|
|
|
results = [] |
|
for eval_mode_name in ["GPS", "GPSM", "IOU"]: |
|
eval_mode = getattr(DensePoseEvalMode, eval_mode_name) |
|
coco_eval = DensePoseCocoEval( |
|
coco_gt, coco_dt, "densepose", multi_storage, embedder, dpEvalMode=eval_mode |
|
) |
|
result = _derive_results_from_coco_eval( |
|
coco_eval, eval_mode_name, densepose_metrics, class_names, min_threshold, img_ids |
|
) |
|
results.append(result) |
|
return results |
|
|
|
|
|
def _get_densepose_metrics(min_threshold: float = 0.5): |
|
metrics = ["AP"] |
|
if min_threshold <= 0.201: |
|
metrics += ["AP20"] |
|
if min_threshold <= 0.301: |
|
metrics += ["AP30"] |
|
if min_threshold <= 0.401: |
|
metrics += ["AP40"] |
|
metrics.extend(["AP50", "AP75", "APm", "APl", "AR", "AR50", "AR75", "ARm", "ARl"]) |
|
return metrics |
|
|
|
|
|
def _derive_results_from_coco_eval( |
|
coco_eval, eval_mode_name, metrics, class_names, min_threshold: float, img_ids |
|
): |
|
if img_ids is not None: |
|
coco_eval.params.imgIds = img_ids |
|
coco_eval.params.iouThrs = np.linspace( |
|
min_threshold, 0.95, int(np.round((0.95 - min_threshold) / 0.05)) + 1, endpoint=True |
|
) |
|
coco_eval.evaluate() |
|
coco_eval.accumulate() |
|
coco_eval.summarize() |
|
results = {metric: float(coco_eval.stats[idx] * 100) for idx, metric in enumerate(metrics)} |
|
logger = logging.getLogger(__name__) |
|
logger.info( |
|
f"Evaluation results for densepose, {eval_mode_name} metric: \n" |
|
+ create_small_table(results) |
|
) |
|
if class_names is None or len(class_names) <= 1: |
|
return results |
|
|
|
|
|
|
|
precisions = coco_eval.eval["precision"] |
|
|
|
assert len(class_names) == precisions.shape[2] |
|
|
|
results_per_category = [] |
|
for idx, name in enumerate(class_names): |
|
|
|
|
|
precision = precisions[:, :, idx, 0, -1] |
|
precision = precision[precision > -1] |
|
ap = np.mean(precision) if precision.size else float("nan") |
|
results_per_category.append((f"{name}", float(ap * 100))) |
|
|
|
|
|
n_cols = min(6, len(results_per_category) * 2) |
|
results_flatten = list(itertools.chain(*results_per_category)) |
|
results_2d = itertools.zip_longest(*[results_flatten[i::n_cols] for i in range(n_cols)]) |
|
table = tabulate( |
|
results_2d, |
|
tablefmt="pipe", |
|
floatfmt=".3f", |
|
headers=["category", "AP"] * (n_cols // 2), |
|
numalign="left", |
|
) |
|
logger.info(f"Per-category {eval_mode_name} AP: \n" + table) |
|
|
|
results.update({"AP-" + name: ap for name, ap in results_per_category}) |
|
return results |
|
|
|
|
|
def build_densepose_evaluator_storage(cfg: CfgNode, output_folder: str): |
|
storage_spec = cfg.DENSEPOSE_EVALUATION.STORAGE |
|
if storage_spec == "none": |
|
return None |
|
evaluator_type = cfg.DENSEPOSE_EVALUATION.TYPE |
|
|
|
hout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE |
|
wout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE |
|
n_csc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_COARSE_SEGM_CHANNELS |
|
|
|
if evaluator_type == "iuv": |
|
n_fsc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_PATCHES + 1 |
|
schema = { |
|
"coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)), |
|
"fine_segm": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), |
|
"u": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), |
|
"v": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), |
|
} |
|
elif evaluator_type == "cse": |
|
embed_size = cfg.MODEL.ROI_DENSEPOSE_HEAD.CSE.EMBED_SIZE |
|
schema = { |
|
"coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)), |
|
"embedding": SizeData(dtype="float32", shape=(embed_size, hout, wout)), |
|
} |
|
else: |
|
raise ValueError(f"Unknown evaluator type: {evaluator_type}") |
|
|
|
if storage_spec == "ram": |
|
storage = SingleProcessRamTensorStorage(schema, io.BytesIO()) |
|
elif storage_spec == "file": |
|
fpath = os.path.join(output_folder, f"DensePoseEvaluatorStorage.{get_rank()}.bin") |
|
PathManager.mkdirs(output_folder) |
|
storage = SingleProcessFileTensorStorage(schema, fpath, "wb") |
|
else: |
|
raise ValueError(f"Unknown storage specification: {storage_spec}") |
|
return storage |
|
|