Spaces:
Running
Running
# Copyright (c) OpenMMLab. All rights reserved. | |
import datetime | |
import itertools | |
import os.path as osp | |
import tempfile | |
from collections import OrderedDict | |
from typing import Dict, List, Optional, Sequence, Union | |
import numpy as np | |
import torch | |
from mmengine.evaluator import BaseMetric | |
from mmengine.fileio import FileClient, dump, load | |
from mmengine.logging import MMLogger | |
from terminaltables import AsciiTable | |
from mmdet.datasets.api_wrappers import COCO, COCOeval | |
from mmdet.registry import METRICS | |
from mmdet.structures.mask import encode_mask_results | |
# from ..functional import eval_recalls | |
from mmdet.evaluation.metrics import CocoMetric | |
class AnimeMangaMetric(CocoMetric): | |
def __init__(self, | |
manga109_annfile=None, | |
animeins_annfile=None, | |
ann_file: Optional[str] = None, | |
metric: Union[str, List[str]] = 'bbox', | |
classwise: bool = False, | |
proposal_nums: Sequence[int] = (100, 300, 1000), | |
iou_thrs: Optional[Union[float, Sequence[float]]] = None, | |
metric_items: Optional[Sequence[str]] = None, | |
format_only: bool = False, | |
outfile_prefix: Optional[str] = None, | |
file_client_args: dict = dict(backend='disk'), | |
collect_device: str = 'cpu', | |
prefix: Optional[str] = None, | |
sort_categories: bool = False) -> None: | |
super().__init__(ann_file, metric, classwise, proposal_nums, iou_thrs, metric_items, format_only, outfile_prefix, file_client_args, collect_device, prefix, sort_categories) | |
self.manga109_img_ids = set() | |
if manga109_annfile is not None: | |
with self.file_client.get_local_path(manga109_annfile) as local_path: | |
self._manga109_coco_api = COCO(local_path) | |
if sort_categories: | |
# 'categories' list in objects365_train.json and | |
# objects365_val.json is inconsistent, need sort | |
# list(or dict) before get cat_ids. | |
cats = self._manga109_coco_api.cats | |
sorted_cats = {i: cats[i] for i in sorted(cats)} | |
self._manga109_coco_api.cats = sorted_cats | |
categories = self._manga109_coco_api.dataset['categories'] | |
sorted_categories = sorted( | |
categories, key=lambda i: i['id']) | |
self._manga109_coco_api.dataset['categories'] = sorted_categories | |
self.manga109_img_ids = set(self._manga109_coco_api.get_img_ids()) | |
else: | |
self._manga109_coco_api = None | |
self.animeins_img_ids = set() | |
if animeins_annfile is not None: | |
with self.file_client.get_local_path(animeins_annfile) as local_path: | |
self._animeins_coco_api = COCO(local_path) | |
if sort_categories: | |
# 'categories' list in objects365_train.json and | |
# objects365_val.json is inconsistent, need sort | |
# list(or dict) before get cat_ids. | |
cats = self._animeins_coco_api.cats | |
sorted_cats = {i: cats[i] for i in sorted(cats)} | |
self._animeins_coco_api.cats = sorted_cats | |
categories = self._animeins_coco_api.dataset['categories'] | |
sorted_categories = sorted( | |
categories, key=lambda i: i['id']) | |
self._animeins_coco_api.dataset['categories'] = sorted_categories | |
self.animeins_img_ids = set(self._animeins_coco_api.get_img_ids()) | |
else: | |
self._animeins_coco_api = None | |
if self._animeins_coco_api is not None: | |
self._coco_api = self._animeins_coco_api | |
else: | |
self._coco_api = self._manga109_coco_api | |
def compute_metrics(self, results: list) -> Dict[str, float]: | |
# split gt and prediction list | |
gts, preds = zip(*results) | |
manga109_gts, animeins_gts = [], [] | |
manga109_preds, animeins_preds = [], [] | |
for gt, pred in zip(gts, preds): | |
if gt['img_id'] in self.manga109_img_ids: | |
manga109_gts.append(gt) | |
manga109_preds.append(pred) | |
else: | |
animeins_gts.append(gt) | |
animeins_preds.append(pred) | |
tmp_dir = None | |
if self.outfile_prefix is None: | |
tmp_dir = tempfile.TemporaryDirectory() | |
outfile_prefix = osp.join(tmp_dir.name, 'results') | |
else: | |
outfile_prefix = self.outfile_prefix | |
eval_results = OrderedDict() | |
if len(manga109_gts) > 0: | |
metrics = [] | |
for m in self.metrics: | |
if m != 'segm': | |
metrics.append(m) | |
self.cat_ids = self._manga109_coco_api.get_cat_ids(cat_names=self.dataset_meta['classes']) | |
self.img_ids = self._manga109_coco_api.get_img_ids() | |
rst = self._compute_metrics(metrics, self._manga109_coco_api, manga109_preds, outfile_prefix, tmp_dir) | |
for key, item in rst.items(): | |
eval_results['manga109_'+key] = item | |
if len(animeins_gts) > 0: | |
self.cat_ids = self._animeins_coco_api.get_cat_ids(cat_names=self.dataset_meta['classes']) | |
self.img_ids = self._animeins_coco_api.get_img_ids() | |
rst = self._compute_metrics(self.metrics, self._animeins_coco_api, animeins_preds, outfile_prefix, tmp_dir) | |
for key, item in rst.items(): | |
eval_results['animeins_'+key] = item | |
return eval_results | |
def results2json(self, results: Sequence[dict], | |
outfile_prefix: str) -> dict: | |
"""Dump the detection results to a COCO style json file. | |
There are 3 types of results: proposals, bbox predictions, mask | |
predictions, and they have different data types. This method will | |
automatically recognize the type, and dump them to json files. | |
Args: | |
results (Sequence[dict]): Testing results of the | |
dataset. | |
outfile_prefix (str): The filename prefix of the json files. If the | |
prefix is "somepath/xxx", the json files will be named | |
"somepath/xxx.bbox.json", "somepath/xxx.segm.json", | |
"somepath/xxx.proposal.json". | |
Returns: | |
dict: Possible keys are "bbox", "segm", "proposal", and | |
values are corresponding filenames. | |
""" | |
bbox_json_results = [] | |
segm_json_results = [] if 'masks' in results[0] else None | |
for idx, result in enumerate(results): | |
image_id = result.get('img_id', idx) | |
labels = result['labels'] | |
bboxes = result['bboxes'] | |
scores = result['scores'] | |
# bbox results | |
for i, label in enumerate(labels): | |
data = dict() | |
data['image_id'] = image_id | |
data['bbox'] = self.xyxy2xywh(bboxes[i]) | |
data['score'] = float(scores[i]) | |
data['category_id'] = self.cat_ids[label] | |
bbox_json_results.append(data) | |
if segm_json_results is None: | |
continue | |
# segm results | |
masks = result['masks'] | |
mask_scores = result.get('mask_scores', scores) | |
for i, label in enumerate(labels): | |
data = dict() | |
data['image_id'] = image_id | |
data['bbox'] = self.xyxy2xywh(bboxes[i]) | |
data['score'] = float(mask_scores[i]) | |
data['category_id'] = self.cat_ids[label] | |
if isinstance(masks[i]['counts'], bytes): | |
masks[i]['counts'] = masks[i]['counts'].decode() | |
data['segmentation'] = masks[i] | |
segm_json_results.append(data) | |
logger: MMLogger = MMLogger.get_current_instance() | |
logger.info('dumping predictions ... ') | |
result_files = dict() | |
result_files['bbox'] = f'{outfile_prefix}.bbox.json' | |
result_files['proposal'] = f'{outfile_prefix}.bbox.json' | |
dump(bbox_json_results, result_files['bbox']) | |
if segm_json_results is not None: | |
result_files['segm'] = f'{outfile_prefix}.segm.json' | |
dump(segm_json_results, result_files['segm']) | |
return result_files | |
def _compute_metrics(self, metrics, tgt_api, preds, outfile_prefix, tmp_dir): | |
logger: MMLogger = MMLogger.get_current_instance() | |
result_files = self.results2json(preds, outfile_prefix) | |
eval_results = OrderedDict() | |
if self.format_only: | |
logger.info('results are saved in ' | |
f'{osp.dirname(outfile_prefix)}') | |
return eval_results | |
for metric in metrics: | |
logger.info(f'Evaluating {metric}...') | |
# TODO: May refactor fast_eval_recall to an independent metric? | |
# fast eval recall | |
if metric == 'proposal_fast': | |
ar = self.fast_eval_recall( | |
preds, self.proposal_nums, self.iou_thrs, logger=logger) | |
log_msg = [] | |
for i, num in enumerate(self.proposal_nums): | |
eval_results[f'AR@{num}'] = ar[i] | |
log_msg.append(f'\nAR@{num}\t{ar[i]:.4f}') | |
log_msg = ''.join(log_msg) | |
logger.info(log_msg) | |
continue | |
# evaluate proposal, bbox and segm | |
iou_type = 'bbox' if metric == 'proposal' else metric | |
if metric not in result_files: | |
raise KeyError(f'{metric} is not in results') | |
try: | |
predictions = load(result_files[metric]) | |
if iou_type == 'segm': | |
# Refer to https://github.com/cocodataset/cocoapi/blob/master/PythonAPI/pycocotools/coco.py#L331 # noqa | |
# When evaluating mask AP, if the results contain bbox, | |
# cocoapi will use the box area instead of the mask area | |
# for calculating the instance area. Though the overall AP | |
# is not affected, this leads to different | |
# small/medium/large mask AP results. | |
for x in predictions: | |
x.pop('bbox') | |
coco_dt = tgt_api.loadRes(predictions) | |
except IndexError: | |
logger.error( | |
'The testing results of the whole dataset is empty.') | |
break | |
coco_eval = COCOeval(tgt_api, coco_dt, iou_type) | |
coco_eval.params.catIds = self.cat_ids | |
coco_eval.params.imgIds = self.img_ids | |
coco_eval.params.maxDets = list(self.proposal_nums) | |
coco_eval.params.iouThrs = self.iou_thrs | |
# mapping of cocoEval.stats | |
coco_metric_names = { | |
'mAP': 0, | |
'mAP_50': 1, | |
'mAP_75': 2, | |
'mAP_s': 3, | |
'mAP_m': 4, | |
'mAP_l': 5, | |
'AR@100': 6, | |
'AR@300': 7, | |
'AR@1000': 8, | |
'AR_s@1000': 9, | |
'AR_m@1000': 10, | |
'AR_l@1000': 11 | |
} | |
metric_items = self.metric_items | |
if metric_items is not None: | |
for metric_item in metric_items: | |
if metric_item not in coco_metric_names: | |
raise KeyError( | |
f'metric item "{metric_item}" is not supported') | |
if metric == 'proposal': | |
coco_eval.params.useCats = 0 | |
coco_eval.evaluate() | |
coco_eval.accumulate() | |
coco_eval.summarize() | |
if metric_items is None: | |
metric_items = [ | |
'AR@100', 'AR@300', 'AR@1000', 'AR_s@1000', | |
'AR_m@1000', 'AR_l@1000' | |
] | |
for item in metric_items: | |
val = float( | |
f'{coco_eval.stats[coco_metric_names[item]]:.3f}') | |
eval_results[item] = val | |
else: | |
coco_eval.evaluate() | |
coco_eval.accumulate() | |
coco_eval.summarize() | |
if self.classwise: # Compute per-category AP | |
# Compute per-category AP | |
# from https://github.com/facebookresearch/detectron2/ | |
precisions = coco_eval.eval['precision'] | |
# precision: (iou, recall, cls, area range, max dets) | |
assert len(self.cat_ids) == precisions.shape[2] | |
results_per_category = [] | |
for idx, cat_id in enumerate(self.cat_ids): | |
# area range index 0: all area ranges | |
# max dets index -1: typically 100 per image | |
nm = tgt_api.loadCats(cat_id)[0] | |
precision = precisions[:, :, idx, 0, -1] | |
precision = precision[precision > -1] | |
if precision.size: | |
ap = np.mean(precision) | |
else: | |
ap = float('nan') | |
results_per_category.append( | |
(f'{nm["name"]}', f'{round(ap, 3)}')) | |
eval_results[f'{nm["name"]}_precision'] = round(ap, 3) | |
num_columns = min(6, len(results_per_category) * 2) | |
results_flatten = list( | |
itertools.chain(*results_per_category)) | |
headers = ['category', 'AP'] * (num_columns // 2) | |
results_2d = itertools.zip_longest(*[ | |
results_flatten[i::num_columns] | |
for i in range(num_columns) | |
]) | |
table_data = [headers] | |
table_data += [result for result in results_2d] | |
table = AsciiTable(table_data) | |
logger.info('\n' + table.table) | |
if metric_items is None: | |
metric_items = [ | |
'mAP', 'mAP_50', 'mAP_75', 'mAP_s', 'mAP_m', 'mAP_l' | |
] | |
for metric_item in metric_items: | |
key = f'{metric}_{metric_item}' | |
val = coco_eval.stats[coco_metric_names[metric_item]] | |
eval_results[key] = float(f'{round(val, 3)}') | |
ap = coco_eval.stats[:6] | |
logger.info(f'{metric}_mAP_copypaste: {ap[0]:.3f} ' | |
f'{ap[1]:.3f} {ap[2]:.3f} {ap[3]:.3f} ' | |
f'{ap[4]:.3f} {ap[5]:.3f}') | |
if tmp_dir is not None: | |
tmp_dir.cleanup() | |
return eval_results |