import logging import tempfile import os import torch import numpy as np import json from collections import OrderedDict from tqdm import tqdm from maskrcnn_benchmark.modeling.roi_heads.mask_head.inference import Masker from maskrcnn_benchmark.structures.bounding_box import BoxList from maskrcnn_benchmark.structures.boxlist_ops import boxlist_iou def do_od_evaluation( dataset, predictions, box_only, output_folder, iou_types, expected_results, expected_results_sigma_tol, ): logger = logging.getLogger("maskrcnn_benchmark.inference") if box_only: logger.info("Evaluating bbox proposals") if dataset.coco is None and output_folder: json_results = prepare_for_tsv_detection(predictions, dataset) with open(os.path.join(output_folder, "box_proposals.json"), "w") as f: json.dump(json_results, f) return None areas = {"all": "", "small": "s", "medium": "m", "large": "l"} res = COCOResults("box_proposal") for limit in [100, 1000]: for area, suffix in areas.items(): stats = evaluate_box_proposals(predictions, dataset, area=area, limit=limit) key = "AR{}@{:d}".format(suffix, limit) res.results["box_proposal"][key] = stats["ar"].item() logger.info(res) check_expected_results(res, expected_results, expected_results_sigma_tol) if output_folder: torch.save(res, os.path.join(output_folder, "box_proposals.pth")) return res, predictions logger.info("Preparing results for COCO format") coco_results = {} if "bbox" in iou_types: logger.info("Preparing bbox results") if dataset.coco is None: coco_results["bbox"] = prepare_for_tsv_detection(predictions, dataset) else: coco_results["bbox"] = prepare_for_coco_detection(predictions, dataset) if "segm" in iou_types: logger.info("Preparing segm results") coco_results["segm"] = prepare_for_coco_segmentation(predictions, dataset) if "keypoints" in iou_types: logger.info("Preparing keypoints results") coco_results["keypoints"] = prepare_for_coco_keypoint(predictions, dataset) results = COCOResults(*iou_types) logger.info("Evaluating predictions") for iou_type in iou_types: with tempfile.NamedTemporaryFile() as f: file_path = f.name if output_folder: file_path = os.path.join(output_folder, iou_type + ".json") if dataset.coco: res = evaluate_predictions_on_coco(dataset.coco, coco_results[iou_type], file_path, iou_type) results.update(res) elif output_folder: with open(file_path, "w") as f: json.dump(coco_results[iou_type], f) logger.info(results) check_expected_results(results, expected_results, expected_results_sigma_tol) if output_folder: torch.save(results, os.path.join(output_folder, "coco_results.pth")) return results, coco_results def prepare_for_tsv_detection(predictions, dataset): # assert isinstance(dataset, COCODataset) proposal_results = [] image_list = [] for im_id, prediction in enumerate(predictions): image_info = dataset.get_img_info(im_id) if len(prediction) == 0: continue # TODO replace with get_img_info? image_id = image_info["id"] image_width = image_info["width"] image_height = image_info["height"] prediction = prediction.resize((image_width, image_height)) prediction = prediction.convert("xywh") boxes = prediction.bbox.tolist() scores = prediction.get_field("scores").tolist() labels = prediction.get_field("labels").tolist() if prediction.has_field("centers"): centers = prediction.get_field("centers") else: centers = None for k, box in enumerate(boxes): proposal = { "image_id": image_id, "category_id": labels[k], "bbox": box, "score": scores[k], "area": image_width * image_height, "iscrowd": 0, } if centers is not None: proposal.update(center=centers[k].tolist()) proposal_results.append(proposal) image_list.append(image_info) # categories = [{'supercategory': 'proposal', 'id': 0, 'name': 'proposal'}] return dict(images=image_list, annotations=proposal_results) def prepare_for_coco_detection(predictions, dataset): # assert isinstance(dataset, COCODataset) coco_results = [] for image_id, prediction in enumerate(predictions): original_id = dataset.id_to_img_map[image_id] if len(prediction) == 0: continue # TODO replace with get_img_info? image_width = dataset.coco.imgs[original_id]["width"] image_height = dataset.coco.imgs[original_id]["height"] prediction = prediction.resize((image_width, image_height)) prediction = prediction.convert("xywh") boxes = prediction.bbox.tolist() scores = prediction.get_field("scores").tolist() labels = prediction.get_field("labels").tolist() for k, box in enumerate(boxes): if labels[k] in dataset.contiguous_category_id_to_json_id: coco_results.append( { "image_id": original_id, "category_id": dataset.contiguous_category_id_to_json_id[labels[k]], "bbox": box, "score": scores[k], } ) return coco_results def prepare_for_coco_segmentation(predictions, dataset): import pycocotools.mask as mask_util import numpy as np masker = Masker(threshold=0.5, padding=1) # assert isinstance(dataset, COCODataset) coco_results = [] for image_id, prediction in tqdm(enumerate(predictions)): original_id = dataset.id_to_img_map[image_id] if len(prediction) == 0: continue # TODO replace with get_img_info? image_width = dataset.coco.imgs[original_id]["width"] image_height = dataset.coco.imgs[original_id]["height"] prediction = prediction.resize((image_width, image_height)) masks = prediction.get_field("mask") # t = time.time() # Masker is necessary only if masks haven't been already resized. if list(masks.shape[-2:]) != [image_height, image_width]: masks = masker(masks.expand(1, -1, -1, -1, -1), prediction) masks = masks[0] # logger.info('Time mask: {}'.format(time.time() - t)) # prediction = prediction.convert('xywh') # boxes = prediction.bbox.tolist() scores = prediction.get_field("scores").tolist() labels = prediction.get_field("labels").tolist() # rles = prediction.get_field('mask') rles = [mask_util.encode(np.array(mask[0, :, :, np.newaxis], order="F"))[0] for mask in masks] for rle in rles: rle["counts"] = rle["counts"].decode("utf-8") mapped_labels = [dataset.contiguous_category_id_to_json_id[i] for i in labels] coco_results.extend( [ { "image_id": original_id, "category_id": mapped_labels[k], "segmentation": rle, "score": scores[k], } for k, rle in enumerate(rles) ] ) return coco_results def prepare_for_coco_keypoint(predictions, dataset): # assert isinstance(dataset, COCODataset) coco_results = [] for image_id, prediction in enumerate(predictions): original_id = dataset.id_to_img_map[image_id] if len(prediction.bbox) == 0: continue # TODO replace with get_img_info? image_width = dataset.coco.imgs[original_id]["width"] image_height = dataset.coco.imgs[original_id]["height"] prediction = prediction.resize((image_width, image_height)) prediction = prediction.convert("xywh") boxes = prediction.bbox.tolist() scores = prediction.get_field("scores").tolist() labels = prediction.get_field("labels").tolist() keypoints = prediction.get_field("keypoints") keypoints = keypoints.resize((image_width, image_height)) keypoints = keypoints.to_coco_format() mapped_labels = [dataset.contiguous_category_id_to_json_id[i] for i in labels] coco_results.extend( [ {"image_id": original_id, "category_id": mapped_labels[k], "keypoints": keypoint, "score": scores[k]} for k, keypoint in enumerate(keypoints) ] ) return coco_results # inspired from Detectron def evaluate_box_proposals(predictions, dataset, thresholds=None, area="all", limit=None): """Evaluate detection proposal recall metrics. This function is a much faster alternative to the official COCO API recall evaluation code. However, it produces slightly different results. """ # Record max overlap value for each gt box # Return vector of overlap values areas = { "all": 0, "small": 1, "medium": 2, "large": 3, "96-128": 4, "128-256": 5, "256-512": 6, "512-inf": 7, } area_ranges = [ [0**2, 1e5**2], # all [0**2, 32**2], # small [32**2, 96**2], # medium [96**2, 1e5**2], # large [96**2, 128**2], # 96-128 [128**2, 256**2], # 128-256 [256**2, 512**2], # 256-512 [512**2, 1e5**2], ] # 512-inf assert area in areas, "Unknown area range: {}".format(area) area_range = area_ranges[areas[area]] gt_overlaps = [] num_pos = 0 for image_id, prediction in enumerate(predictions): original_id = dataset.id_to_img_map[image_id] # TODO replace with get_img_info? image_width = dataset.coco.imgs[original_id]["width"] image_height = dataset.coco.imgs[original_id]["height"] prediction = prediction.resize((image_width, image_height)) # sort predictions in descending order # TODO maybe remove this and make it explicit in the documentation if prediction.has_field("objectness"): inds = prediction.get_field("objectness").sort(descending=True)[1] else: inds = prediction.get_field("scores").sort(descending=True)[1] prediction = prediction[inds] ann_ids = dataset.coco.getAnnIds(imgIds=original_id) anno = dataset.coco.loadAnns(ann_ids) gt_boxes = [obj["bbox"] for obj in anno if obj["iscrowd"] == 0] gt_boxes = torch.as_tensor(gt_boxes).reshape(-1, 4) # guard against no boxes gt_boxes = BoxList(gt_boxes, (image_width, image_height), mode="xywh").convert("xyxy") gt_areas = torch.as_tensor([obj["area"] for obj in anno if obj["iscrowd"] == 0]) if len(gt_boxes) == 0: continue valid_gt_inds = (gt_areas >= area_range[0]) & (gt_areas <= area_range[1]) gt_boxes = gt_boxes[valid_gt_inds] num_pos += len(gt_boxes) if len(gt_boxes) == 0: continue if len(prediction) == 0: continue if limit is not None and len(prediction) > limit: prediction = prediction[:limit] overlaps = boxlist_iou(prediction, gt_boxes) _gt_overlaps = torch.zeros(len(gt_boxes)) for j in range(min(len(prediction), len(gt_boxes))): # find which proposal box maximally covers each gt box # and get the iou amount of coverage for each gt box max_overlaps, argmax_overlaps = overlaps.max(dim=0) # find which gt box is 'best' covered (i.e. 'best' = most iou) gt_ovr, gt_ind = max_overlaps.max(dim=0) assert gt_ovr >= 0 # find the proposal box that covers the best covered gt box box_ind = argmax_overlaps[gt_ind] # record the iou coverage of this gt box _gt_overlaps[j] = overlaps[box_ind, gt_ind] assert _gt_overlaps[j] == gt_ovr # mark the proposal box and the gt box as used overlaps[box_ind, :] = -1 overlaps[:, gt_ind] = -1 # append recorded iou coverage level gt_overlaps.append(_gt_overlaps) if len(gt_overlaps) == 0: return { "ar": torch.zeros(1), "recalls": torch.zeros(1), "thresholds": thresholds, "gt_overlaps": gt_overlaps, "num_pos": num_pos, } gt_overlaps = torch.cat(gt_overlaps, dim=0) gt_overlaps, _ = torch.sort(gt_overlaps) if thresholds is None: step = 0.05 thresholds = torch.arange(0.5, 0.95 + 1e-5, step, dtype=torch.float32) recalls = torch.zeros_like(thresholds) # compute recall for each iou threshold for i, t in enumerate(thresholds): recalls[i] = (gt_overlaps >= t).float().sum() / float(num_pos) # ar = 2 * np.trapz(recalls, thresholds) ar = recalls.mean() return { "ar": ar, "recalls": recalls, "thresholds": thresholds, "gt_overlaps": gt_overlaps, "num_pos": num_pos, } def evaluate_predictions_on_coco(coco_gt, coco_results, json_result_file, iou_type="bbox"): import json with open(json_result_file, "w") as f: json.dump(coco_results, f) from pycocotools.coco import COCO from pycocotools.cocoeval import COCOeval coco_dt = coco_gt.loadRes(str(json_result_file)) if coco_results else COCO() # coco_dt = coco_gt.loadRes(coco_results) if iou_type == "keypoints": coco_gt = filter_valid_keypoints(coco_gt, coco_dt) coco_eval = COCOeval(coco_gt, coco_dt, iou_type) coco_eval.evaluate() coco_eval.accumulate() coco_eval.summarize() if iou_type == "bbox": summarize_per_category(coco_eval, json_result_file.replace(".json", ".csv")) return coco_eval def summarize_per_category(coco_eval, csv_output=None): """ Compute and display summary metrics for evaluation results. Note this functin can *only* be applied on the default parameter setting """ def _summarize(iouThr=None, areaRng="all", maxDets=100): p = coco_eval.params titleStr = "Average Precision" typeStr = "(AP)" iouStr = "{:0.2f}:{:0.2f}".format(p.iouThrs[0], p.iouThrs[-1]) if iouThr is None else "{:0.2f}".format(iouThr) result_str = " {:<18} {} @[ IoU={:<9} | area={:>6s} | maxDets={:>3d} ], ".format( titleStr, typeStr, iouStr, areaRng, maxDets ) aind = [i for i, aRng in enumerate(p.areaRngLbl) if aRng == areaRng] mind = [i for i, mDet in enumerate(p.maxDets) if mDet == maxDets] # dimension of precision: [TxRxKxAxM] s = coco_eval.eval["precision"] # IoU if iouThr is not None: t = np.where(iouThr == p.iouThrs)[0] s = s[t] s = s[:, :, :, aind, mind] if len(s[s > -1]) == 0: mean_s = -1 else: mean_s = np.mean(s[s > -1]) # cacluate AP(average precision) for each category num_classes = len(p.catIds) avg_ap = 0.0 for i in range(0, num_classes): result_str += "{}, ".format(np.mean(s[:, :, i, :])) avg_ap += np.mean(s[:, :, i, :]) result_str += "{} \n".format(avg_ap / num_classes) return result_str id2name = {} for _, cat in coco_eval.cocoGt.cats.items(): id2name[cat["id"]] = cat["name"] title_str = "metric, " for cid in coco_eval.params.catIds: title_str += "{}, ".format(id2name[cid]) title_str += "avg \n" results = [title_str] results.append(_summarize()) results.append(_summarize(iouThr=0.5, maxDets=coco_eval.params.maxDets[2])) results.append(_summarize(areaRng="small", maxDets=coco_eval.params.maxDets[2])) results.append(_summarize(areaRng="medium", maxDets=coco_eval.params.maxDets[2])) results.append(_summarize(areaRng="large", maxDets=coco_eval.params.maxDets[2])) with open(csv_output, "w") as f: for result in results: f.writelines(result) def filter_valid_keypoints(coco_gt, coco_dt): kps = coco_dt.anns[1]["keypoints"] for id, ann in coco_gt.anns.items(): ann["keypoints"][2::3] = [a * b for a, b in zip(ann["keypoints"][2::3], kps[2::3])] ann["num_keypoints"] = sum(ann["keypoints"][2::3]) return coco_gt class COCOResults(object): METRICS = { "bbox": ["AP", "AP50", "AP75", "APs", "APm", "APl"], "segm": ["AP", "AP50", "AP75", "APs", "APm", "APl"], "box_proposal": [ "AR@100", "ARs@100", "ARm@100", "ARl@100", "AR@1000", "ARs@1000", "ARm@1000", "ARl@1000", ], "keypoints": ["AP", "AP50", "AP75", "APm", "APl"], } def __init__(self, *iou_types): allowed_types = ("box_proposal", "bbox", "segm", "keypoints") assert all(iou_type in allowed_types for iou_type in iou_types) results = OrderedDict() for iou_type in iou_types: results[iou_type] = OrderedDict([(metric, -1) for metric in COCOResults.METRICS[iou_type]]) self.results = results def update(self, coco_eval): if coco_eval is None: return from pycocotools.cocoeval import COCOeval assert isinstance(coco_eval, COCOeval) s = coco_eval.stats iou_type = coco_eval.params.iouType res = self.results[iou_type] metrics = COCOResults.METRICS[iou_type] for idx, metric in enumerate(metrics): res[metric] = s[idx] def __repr__(self): # TODO make it pretty return repr(self.results) def check_expected_results(results, expected_results, sigma_tol): if not expected_results: return logger = logging.getLogger("maskrcnn_benchmark.inference") for task, metric, (mean, std) in expected_results: actual_val = results.results[task][metric] lo = mean - sigma_tol * std hi = mean + sigma_tol * std ok = (lo < actual_val) and (actual_val < hi) msg = ( "{} > {} sanity check (actual vs. expected): " "{:.3f} vs. mean={:.4f}, std={:.4}, range=({:.4f}, {:.4f})" ).format(task, metric, actual_val, mean, std, lo, hi) if not ok: msg = "FAIL: " + msg logger.error(msg) else: msg = "PASS: " + msg logger.info(msg)