Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
import itertools | |
import os.path as osp | |
import tempfile | |
import warnings | |
from collections import OrderedDict | |
from typing import Dict, List, Optional, Sequence, Union | |
import numpy as np | |
from mmengine.fileio import get_local_path | |
from mmengine.logging import MMLogger | |
from terminaltables import AsciiTable | |
from mmdet.registry import METRICS | |
from mmdet.structures.mask import encode_mask_results | |
from ..functional import eval_recalls | |
from .coco_metric import CocoMetric | |
try: | |
import lvis | |
if getattr(lvis, '__version__', '0') >= '10.5.3': | |
warnings.warn( | |
'mmlvis is deprecated, please install official lvis-api by "pip install git+https://github.com/lvis-dataset/lvis-api.git"', # noqa: E501 | |
UserWarning) | |
from lvis import LVIS, LVISEval, LVISResults | |
except ImportError: | |
lvis = None | |
LVISEval = None | |
LVISResults = None | |
class LVISMetric(CocoMetric): | |
"""LVIS evaluation metric. | |
Args: | |
ann_file (str, optional): Path to the coco format annotation file. | |
If not specified, ground truth annotations from the dataset will | |
be converted to coco format. Defaults to None. | |
metric (str | List[str]): Metrics to be evaluated. Valid metrics | |
include 'bbox', 'segm', 'proposal', and 'proposal_fast'. | |
Defaults to 'bbox'. | |
classwise (bool): Whether to evaluate the metric class-wise. | |
Defaults to False. | |
proposal_nums (Sequence[int]): Numbers of proposals to be evaluated. | |
Defaults to (100, 300, 1000). | |
iou_thrs (float | List[float], optional): IoU threshold to compute AP | |
and AR. If not specified, IoUs from 0.5 to 0.95 will be used. | |
Defaults to None. | |
metric_items (List[str], optional): Metric result names to be | |
recorded in the evaluation result. Defaults to None. | |
format_only (bool): Format the output results without perform | |
evaluation. It is useful when you want to format the result | |
to a specific format and submit it to the test server. | |
Defaults to False. | |
outfile_prefix (str, optional): The prefix of json files. It includes | |
the file path and the prefix of filename, e.g., "a/b/prefix". | |
If not specified, a temp file will be created. Defaults to None. | |
collect_device (str): Device name used for collecting results from | |
different ranks during distributed training. Must be 'cpu' or | |
'gpu'. Defaults to 'cpu'. | |
prefix (str, optional): The prefix that will be added in the metric | |
names to disambiguate homonymous metrics of different evaluators. | |
If prefix is not provided in the argument, self.default_prefix | |
will be used instead. Defaults to None. | |
file_client_args (dict, optional): Arguments to instantiate the | |
corresponding backend in mmdet <= 3.0.0rc6. Defaults to None. | |
backend_args (dict, optional): Arguments to instantiate the | |
corresponding backend. Defaults to None. | |
""" | |
default_prefix: Optional[str] = 'lvis' | |
def __init__(self, | |
ann_file: Optional[str] = None, | |
metric: Union[str, List[str]] = 'bbox', | |
classwise: bool = False, | |
proposal_nums: Sequence[int] = (100, 300, 1000), | |
iou_thrs: Optional[Union[float, Sequence[float]]] = None, | |
metric_items: Optional[Sequence[str]] = None, | |
format_only: bool = False, | |
outfile_prefix: Optional[str] = None, | |
collect_device: str = 'cpu', | |
prefix: Optional[str] = None, | |
file_client_args: dict = None, | |
backend_args: dict = None) -> None: | |
if lvis is None: | |
raise RuntimeError( | |
'Package lvis is not installed. Please run "pip install ' | |
'git+https://github.com/lvis-dataset/lvis-api.git".') | |
super().__init__(collect_device=collect_device, prefix=prefix) | |
# coco evaluation metrics | |
self.metrics = metric if isinstance(metric, list) else [metric] | |
allowed_metrics = ['bbox', 'segm', 'proposal', 'proposal_fast'] | |
for metric in self.metrics: | |
if metric not in allowed_metrics: | |
raise KeyError( | |
"metric should be one of 'bbox', 'segm', 'proposal', " | |
f"'proposal_fast', but got {metric}.") | |
# do class wise evaluation, default False | |
self.classwise = classwise | |
# proposal_nums used to compute recall or precision. | |
self.proposal_nums = list(proposal_nums) | |
# iou_thrs used to compute recall or precision. | |
if iou_thrs is None: | |
iou_thrs = np.linspace( | |
.5, 0.95, int(np.round((0.95 - .5) / .05)) + 1, endpoint=True) | |
self.iou_thrs = iou_thrs | |
self.metric_items = metric_items | |
self.format_only = format_only | |
if self.format_only: | |
assert outfile_prefix is not None, 'outfile_prefix must be not' | |
'None when format_only is True, otherwise the result files will' | |
'be saved to a temp directory which will be cleaned up at the end.' | |
self.outfile_prefix = outfile_prefix | |
self.backend_args = backend_args | |
if file_client_args is not None: | |
raise RuntimeError( | |
'The `file_client_args` is deprecated, ' | |
'please use `backend_args` instead, please refer to' | |
'https://github.com/open-mmlab/mmdetection/blob/main/configs/_base_/datasets/coco_detection.py' # noqa: E501 | |
) | |
# if ann_file is not specified, | |
# initialize lvis api with the converted dataset | |
if ann_file is not None: | |
with get_local_path( | |
ann_file, backend_args=self.backend_args) as local_path: | |
self._lvis_api = LVIS(local_path) | |
else: | |
self._lvis_api = None | |
# handle dataset lazy init | |
self.cat_ids = None | |
self.img_ids = None | |
def fast_eval_recall(self, | |
results: List[dict], | |
proposal_nums: Sequence[int], | |
iou_thrs: Sequence[float], | |
logger: Optional[MMLogger] = None) -> np.ndarray: | |
"""Evaluate proposal recall with LVIS's fast_eval_recall. | |
Args: | |
results (List[dict]): Results of the dataset. | |
proposal_nums (Sequence[int]): Proposal numbers used for | |
evaluation. | |
iou_thrs (Sequence[float]): IoU thresholds used for evaluation. | |
logger (MMLogger, optional): Logger used for logging the recall | |
summary. | |
Returns: | |
np.ndarray: Averaged recall results. | |
""" | |
gt_bboxes = [] | |
pred_bboxes = [result['bboxes'] for result in results] | |
for i in range(len(self.img_ids)): | |
ann_ids = self._lvis_api.get_ann_ids(img_ids=[self.img_ids[i]]) | |
ann_info = self._lvis_api.load_anns(ann_ids) | |
if len(ann_info) == 0: | |
gt_bboxes.append(np.zeros((0, 4))) | |
continue | |
bboxes = [] | |
for ann in ann_info: | |
x1, y1, w, h = ann['bbox'] | |
bboxes.append([x1, y1, x1 + w, y1 + h]) | |
bboxes = np.array(bboxes, dtype=np.float32) | |
if bboxes.shape[0] == 0: | |
bboxes = np.zeros((0, 4)) | |
gt_bboxes.append(bboxes) | |
recalls = eval_recalls( | |
gt_bboxes, pred_bboxes, proposal_nums, iou_thrs, logger=logger) | |
ar = recalls.mean(axis=1) | |
return ar | |
# TODO: data_batch is no longer needed, consider adjusting the | |
# parameter position | |
def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None: | |
"""Process one batch of data samples and predictions. The processed | |
results should be stored in ``self.results``, which will be used to | |
compute the metrics when all batches have been processed. | |
Args: | |
data_batch (dict): A batch of data from the dataloader. | |
data_samples (Sequence[dict]): A batch of data samples that | |
contain annotations and predictions. | |
""" | |
for data_sample in data_samples: | |
result = dict() | |
pred = data_sample['pred_instances'] | |
result['img_id'] = data_sample['img_id'] | |
result['bboxes'] = pred['bboxes'].cpu().numpy() | |
result['scores'] = pred['scores'].cpu().numpy() | |
result['labels'] = pred['labels'].cpu().numpy() | |
# encode mask to RLE | |
if 'masks' in pred: | |
result['masks'] = encode_mask_results( | |
pred['masks'].detach().cpu().numpy()) | |
# some detectors use different scores for bbox and mask | |
if 'mask_scores' in pred: | |
result['mask_scores'] = pred['mask_scores'].cpu().numpy() | |
# parse gt | |
gt = dict() | |
gt['width'] = data_sample['ori_shape'][1] | |
gt['height'] = data_sample['ori_shape'][0] | |
gt['img_id'] = data_sample['img_id'] | |
if self._lvis_api is None: | |
# TODO: Need to refactor to support LoadAnnotations | |
assert 'instances' in data_sample, \ | |
'ground truth is required for evaluation when ' \ | |
'`ann_file` is not provided' | |
gt['anns'] = data_sample['instances'] | |
# add converted result to the results list | |
self.results.append((gt, result)) | |
def compute_metrics(self, results: list) -> Dict[str, float]: | |
"""Compute the metrics from processed results. | |
Args: | |
results (list): The processed results of each batch. | |
Returns: | |
Dict[str, float]: The computed metrics. The keys are the names of | |
the metrics, and the values are corresponding results. | |
""" | |
logger: MMLogger = MMLogger.get_current_instance() | |
# split gt and prediction list | |
gts, preds = zip(*results) | |
tmp_dir = None | |
if self.outfile_prefix is None: | |
tmp_dir = tempfile.TemporaryDirectory() | |
outfile_prefix = osp.join(tmp_dir.name, 'results') | |
else: | |
outfile_prefix = self.outfile_prefix | |
if self._lvis_api is None: | |
# use converted gt json file to initialize coco api | |
logger.info('Converting ground truth to coco format...') | |
coco_json_path = self.gt_to_coco_json( | |
gt_dicts=gts, outfile_prefix=outfile_prefix) | |
self._lvis_api = LVIS(coco_json_path) | |
# handle lazy init | |
if self.cat_ids is None: | |
self.cat_ids = self._lvis_api.get_cat_ids() | |
if self.img_ids is None: | |
self.img_ids = self._lvis_api.get_img_ids() | |
# convert predictions to coco format and dump to json file | |
result_files = self.results2json(preds, outfile_prefix) | |
eval_results = OrderedDict() | |
if self.format_only: | |
logger.info('results are saved in ' | |
f'{osp.dirname(outfile_prefix)}') | |
return eval_results | |
lvis_gt = self._lvis_api | |
for metric in self.metrics: | |
logger.info(f'Evaluating {metric}...') | |
# TODO: May refactor fast_eval_recall to an independent metric? | |
# fast eval recall | |
if metric == 'proposal_fast': | |
ar = self.fast_eval_recall( | |
preds, self.proposal_nums, self.iou_thrs, logger=logger) | |
log_msg = [] | |
for i, num in enumerate(self.proposal_nums): | |
eval_results[f'AR@{num}'] = ar[i] | |
log_msg.append(f'\nAR@{num}\t{ar[i]:.4f}') | |
log_msg = ''.join(log_msg) | |
logger.info(log_msg) | |
continue | |
try: | |
lvis_dt = LVISResults(lvis_gt, result_files[metric]) | |
except IndexError: | |
logger.info( | |
'The testing results of the whole dataset is empty.') | |
break | |
iou_type = 'bbox' if metric == 'proposal' else metric | |
lvis_eval = LVISEval(lvis_gt, lvis_dt, iou_type) | |
lvis_eval.params.imgIds = self.img_ids | |
metric_items = self.metric_items | |
if metric == 'proposal': | |
lvis_eval.params.useCats = 0 | |
lvis_eval.params.maxDets = list(self.proposal_nums) | |
lvis_eval.evaluate() | |
lvis_eval.accumulate() | |
lvis_eval.summarize() | |
if metric_items is None: | |
metric_items = ['AR@300', 'ARs@300', 'ARm@300', 'ARl@300'] | |
for k, v in lvis_eval.get_results().items(): | |
if k in metric_items: | |
val = float('{:.3f}'.format(float(v))) | |
eval_results[k] = val | |
else: | |
lvis_eval.evaluate() | |
lvis_eval.accumulate() | |
lvis_eval.summarize() | |
lvis_results = lvis_eval.get_results() | |
if self.classwise: # Compute per-category AP | |
# Compute per-category AP | |
# from https://github.com/facebookresearch/detectron2/ | |
precisions = lvis_eval.eval['precision'] | |
# precision: (iou, recall, cls, area range, max dets) | |
assert len(self.cat_ids) == precisions.shape[2] | |
results_per_category = [] | |
for idx, catId in enumerate(self.cat_ids): | |
# area range index 0: all area ranges | |
# max dets index -1: typically 100 per image | |
# the dimensions of precisions are | |
# [num_thrs, num_recalls, num_cats, num_area_rngs] | |
nm = self._lvis_api.load_cats([catId])[0] | |
precision = precisions[:, :, idx, 0] | |
precision = precision[precision > -1] | |
if precision.size: | |
ap = np.mean(precision) | |
else: | |
ap = float('nan') | |
results_per_category.append( | |
(f'{nm["name"]}', f'{float(ap):0.3f}')) | |
eval_results[f'{nm["name"]}_precision'] = round(ap, 3) | |
num_columns = min(6, len(results_per_category) * 2) | |
results_flatten = list( | |
itertools.chain(*results_per_category)) | |
headers = ['category', 'AP'] * (num_columns // 2) | |
results_2d = itertools.zip_longest(*[ | |
results_flatten[i::num_columns] | |
for i in range(num_columns) | |
]) | |
table_data = [headers] | |
table_data += [result for result in results_2d] | |
table = AsciiTable(table_data) | |
logger.info('\n' + table.table) | |
if metric_items is None: | |
metric_items = [ | |
'AP', 'AP50', 'AP75', 'APs', 'APm', 'APl', 'APr', | |
'APc', 'APf' | |
] | |
for k, v in lvis_results.items(): | |
if k in metric_items: | |
key = '{}_{}'.format(metric, k) | |
val = float('{:.3f}'.format(float(v))) | |
eval_results[key] = val | |
lvis_eval.print_results() | |
if tmp_dir is not None: | |
tmp_dir.cleanup() | |
return eval_results | |