|
import itertools |
|
import logging |
|
import os.path as osp |
|
import tempfile |
|
from collections import OrderedDict |
|
|
|
import mmcv |
|
import numpy as np |
|
import pycocotools |
|
from mmcv.utils import print_log |
|
from pycocotools.coco import COCO |
|
from pycocotools.cocoeval import COCOeval |
|
from terminaltables import AsciiTable |
|
|
|
from mmdet.core import eval_recalls |
|
from .builder import DATASETS |
|
from .custom import CustomDatasetLocal |
|
|
|
|
|
def bounding_box(points): |
|
"""returns a list containing the bottom left and the top right |
|
points in the sequence |
|
Here, we traverse the collection of points only once, |
|
to find the min and max for x and y |
|
""" |
|
bot_left_x, bot_left_y = float('inf'), float('inf') |
|
top_right_x, top_right_y = float('-inf'), float('-inf') |
|
for point in points: |
|
x = point[0] |
|
y = point[1] |
|
if x < 0 or y < 0: |
|
continue |
|
bot_left_x = min(bot_left_x, x) |
|
bot_left_y = min(bot_left_y, y) |
|
top_right_x = max(top_right_x, x) |
|
top_right_y = max(top_right_y, y) |
|
|
|
return [bot_left_x, bot_left_y, top_right_x, top_right_y] |
|
|
|
lines = [[0,1],[1,3],[0,2],[3,2],[0,4],[1,5],[2,6],[3,7],[4,5],[5,7],[4,6],[7,6]] |
|
|
|
def get_boundingbox2d3d(cameraname, gt_data, extrinsics_path): |
|
f = open(extrinsics_path,"r") |
|
while True: |
|
a = f.readline() |
|
print(cameraname, a.split('\n')[0].split(' ')[0]) |
|
if cameraname in a.split('\n')[0].split(' ')[0]: |
|
a = a.split('\n')[0].split(' ') |
|
break |
|
|
|
K = np.reshape(np.array(a[1:10]),[3,3]).astype(float) |
|
R = np.reshape(a[10:19], [3,3]) |
|
T = np.array([[a[19]],[a[20]],[a[21]]]) |
|
RT = np.hstack((R,T)).astype(float) |
|
KRT = np.matmul(K, RT) |
|
bb_3d_connected = [] |
|
bb_3d_all = [] |
|
bb_2d_all = [] |
|
bb_3d_proj_all = [] |
|
|
|
for indice, keypoints_3d in enumerate(gt_data['arr_0'][1]): |
|
parking_space = gt_data['arr_0'][0][indice][0] |
|
|
|
if gt_data['arr_0'][0][indice][1] == 0: |
|
continue |
|
points2d_all = [] |
|
parking_space = np.vstack([parking_space, parking_space+[0,0,2]]) |
|
parking_space_tranformed = [] |
|
for point in parking_space: |
|
point = [point[0], point[1], point[2], 1] |
|
point = np.matmul(RT, point) |
|
parking_space_tranformed.append(list(point)) |
|
point2d = np.matmul(K, point) |
|
if point2d[2] < 0: |
|
points2d_all.append([-100,-100,1]) |
|
continue |
|
point2d = point2d/point2d[2] |
|
if point2d[0] < 0 or point2d[0] >2048: |
|
points2d_all.append([-100,-100,1]) |
|
continue |
|
if point2d[1] < 0 or point2d[1] >2048: |
|
points2d_all.append([-100,-100,1]) |
|
continue |
|
|
|
points2d_all.append(point2d) |
|
|
|
bb_3d_proj_all.append(points2d_all) |
|
bbox = bounding_box(points2d_all) |
|
if float('inf') in bbox: |
|
continue |
|
bb_2d_all.append(bbox) |
|
bb_3d_all.append(parking_space) |
|
|
|
|
|
|
|
|
|
return bb_3d_all, bb_2d_all, bb_3d_proj_all |
|
|
|
|
|
@DATASETS.register_module() |
|
class Walt3DDataset(CustomDatasetLocal): |
|
|
|
CLASSES = ('person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', |
|
'train', 'truck', 'boat', 'traffic light', 'fire hydrant', |
|
'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', |
|
'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', |
|
'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', |
|
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', |
|
'baseball glove', 'skateboard', 'surfboard', 'tennis racket', |
|
'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', |
|
'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', |
|
'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', |
|
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', |
|
'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', |
|
'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', |
|
'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush') |
|
|
|
def load_annotations(self, ann_file): |
|
import glob |
|
count = 0 |
|
data_infos = [] |
|
self.data_annotations = [] |
|
for i in glob.glob(ann_file + '*'): |
|
gt_data = np.load(i , allow_pickle = True) |
|
for img_folder in glob.glob(ann_file.replace('GT_data','images') + '/*'): |
|
cam_name = img_folder.split('/')[-1] |
|
img_name = i.split('/')[-1].replace('.npz','.png') |
|
info = dict(license=3, height=2048, width=2048, file_name = cam_name+'/' + img_name, date_captured = i.split('/')[-1].split('.')[0], id = count, filename = cam_name+'/' + img_name) |
|
|
|
|
|
count = count+1 |
|
data_infos.append(info) |
|
bb_3d_all, bb_2d_all, bb_3d_proj_all = get_boundingbox2d3d(cam_name, gt_data, ann_file.replace('GT_data','Extrinsics') + '/frame_par.txt') |
|
self.data_annotations.append([bb_3d_all, bb_2d_all, bb_3d_proj_all]) |
|
break |
|
return data_infos |
|
|
|
|
|
def get_ann_info(self, idx): |
|
data = self.data_annotations[idx] |
|
gt_bboxes = np.array(data[1]) |
|
gt_bboxes_3d = np.array(data[0]) |
|
gt_bboxes_3d_proj = np.array(data[2]) |
|
|
|
|
|
ann = dict( |
|
bboxes=gt_bboxes, |
|
bboxes_3d = gt_bboxes_3d, |
|
bboxes_3d_proj = gt_bboxes_3d_proj, |
|
labels = (np.zeros(len(gt_bboxes))+2).astype(int), |
|
bboxes_ignore=np.zeros((0, 4), dtype=np.float32), |
|
|
|
seg_map=np.array([])) |
|
return ann |
|
|
|
def get_cat_ids(self, idx): |
|
data = self.data_annotations[idx] |
|
gt_bboxes = np.array(data[1]) |
|
return (np.zeros(len(gt_bboxes))+2).astype(int) |
|
|
|
|
|
def _filter_imgs(self, min_size=32): |
|
"""Filter images too small or without ground truths.""" |
|
valid_inds = [] |
|
for data_info in self.data_infos: |
|
valid_inds.append(data_info['id']) |
|
print(valid_inds) |
|
|
|
return valid_inds |
|
|
|
|
|
def xyxy2xywh(self, bbox): |
|
"""Convert ``xyxy`` style bounding boxes to ``xywh`` style for COCO |
|
evaluation. |
|
|
|
Args: |
|
bbox (numpy.ndarray): The bounding boxes, shape (4, ), in |
|
``xyxy`` order. |
|
|
|
Returns: |
|
list[float]: The converted bounding boxes, in ``xywh`` order. |
|
""" |
|
|
|
_bbox = bbox.tolist() |
|
return [ |
|
_bbox[0], |
|
_bbox[1], |
|
_bbox[2] - _bbox[0], |
|
_bbox[3] - _bbox[1], |
|
] |
|
|
|
def _proposal2json(self, results): |
|
"""Convert proposal results to COCO json style.""" |
|
json_results = [] |
|
for idx in range(len(self)): |
|
img_id = self.img_ids[idx] |
|
bboxes = results[idx] |
|
for i in range(bboxes.shape[0]): |
|
data = dict() |
|
data['image_id'] = img_id |
|
data['bbox'] = self.xyxy2xywh(bboxes[i]) |
|
data['score'] = float(bboxes[i][4]) |
|
data['category_id'] = 1 |
|
json_results.append(data) |
|
return json_results |
|
|
|
def _det2json(self, results): |
|
"""Convert detection results to COCO json style.""" |
|
json_results = [] |
|
for idx in range(len(self)): |
|
img_id = self.img_ids[idx] |
|
result = results[idx] |
|
for label in range(len(result)): |
|
bboxes = result[label] |
|
for i in range(bboxes.shape[0]): |
|
data = dict() |
|
data['image_id'] = img_id |
|
data['bbox'] = self.xyxy2xywh(bboxes[i]) |
|
data['score'] = float(bboxes[i][4]) |
|
data['category_id'] = self.cat_ids[label] |
|
json_results.append(data) |
|
return json_results |
|
|
|
def _segm2json(self, results): |
|
"""Convert instance segmentation results to COCO json style.""" |
|
bbox_json_results = [] |
|
segm_json_results = [] |
|
for idx in range(len(self)): |
|
img_id = self.img_ids[idx] |
|
det, seg = results[idx] |
|
for label in range(len(det)): |
|
|
|
bboxes = det[label] |
|
for i in range(bboxes.shape[0]): |
|
data = dict() |
|
data['image_id'] = img_id |
|
data['bbox'] = self.xyxy2xywh(bboxes[i]) |
|
data['score'] = float(bboxes[i][4]) |
|
data['category_id'] = self.cat_ids[label] |
|
bbox_json_results.append(data) |
|
|
|
|
|
|
|
if isinstance(seg, tuple): |
|
segms = seg[0][label] |
|
mask_score = seg[1][label] |
|
else: |
|
segms = seg[label] |
|
mask_score = [bbox[4] for bbox in bboxes] |
|
for i in range(bboxes.shape[0]): |
|
data = dict() |
|
data['image_id'] = img_id |
|
data['bbox'] = self.xyxy2xywh(bboxes[i]) |
|
data['score'] = float(mask_score[i]) |
|
data['category_id'] = self.cat_ids[label] |
|
if isinstance(segms[i]['counts'], bytes): |
|
segms[i]['counts'] = segms[i]['counts'].decode() |
|
data['segmentation'] = segms[i] |
|
segm_json_results.append(data) |
|
return bbox_json_results, segm_json_results |
|
|
|
def results2json(self, results, outfile_prefix): |
|
"""Dump the detection results to a COCO style json file. |
|
|
|
There are 3 types of results: proposals, bbox predictions, mask |
|
predictions, and they have different data types. This method will |
|
automatically recognize the type, and dump them to json files. |
|
|
|
Args: |
|
results (list[list | tuple | ndarray]): Testing results of the |
|
dataset. |
|
outfile_prefix (str): The filename prefix of the json files. If the |
|
prefix is "somepath/xxx", the json files will be named |
|
"somepath/xxx.bbox.json", "somepath/xxx.segm.json", |
|
"somepath/xxx.proposal.json". |
|
|
|
Returns: |
|
dict[str: str]: Possible keys are "bbox", "segm", "proposal", and \ |
|
values are corresponding filenames. |
|
""" |
|
result_files = dict() |
|
if isinstance(results[0], list): |
|
json_results = self._det2json(results) |
|
result_files['bbox'] = f'{outfile_prefix}.bbox.json' |
|
result_files['proposal'] = f'{outfile_prefix}.bbox.json' |
|
mmcv.dump(json_results, result_files['bbox']) |
|
elif isinstance(results[0], tuple): |
|
json_results = self._segm2json(results) |
|
result_files['bbox'] = f'{outfile_prefix}.bbox.json' |
|
result_files['proposal'] = f'{outfile_prefix}.bbox.json' |
|
result_files['segm'] = f'{outfile_prefix}.segm.json' |
|
mmcv.dump(json_results[0], result_files['bbox']) |
|
mmcv.dump(json_results[1], result_files['segm']) |
|
elif isinstance(results[0], np.ndarray): |
|
json_results = self._proposal2json(results) |
|
result_files['proposal'] = f'{outfile_prefix}.proposal.json' |
|
mmcv.dump(json_results, result_files['proposal']) |
|
else: |
|
raise TypeError('invalid type of results') |
|
return result_files |
|
|
|
def fast_eval_recall(self, results, proposal_nums, iou_thrs, logger=None): |
|
gt_bboxes = [] |
|
for i in range(len(self.img_ids)): |
|
ann_ids = self.coco.get_ann_ids(img_ids=self.img_ids[i]) |
|
ann_info = self.coco.load_anns(ann_ids) |
|
if len(ann_info) == 0: |
|
gt_bboxes.append(np.zeros((0, 4))) |
|
continue |
|
bboxes = [] |
|
for ann in ann_info: |
|
if ann.get('ignore', False) or ann['iscrowd']: |
|
continue |
|
x1, y1, w, h = ann['bbox'] |
|
bboxes.append([x1, y1, x1 + w, y1 + h]) |
|
bboxes = np.array(bboxes, dtype=np.float32) |
|
if bboxes.shape[0] == 0: |
|
bboxes = np.zeros((0, 4)) |
|
gt_bboxes.append(bboxes) |
|
|
|
recalls = eval_recalls( |
|
gt_bboxes, results, proposal_nums, iou_thrs, logger=logger) |
|
ar = recalls.mean(axis=1) |
|
return ar |
|
|
|
def format_results(self, results, jsonfile_prefix=None, **kwargs): |
|
"""Format the results to json (standard format for COCO evaluation). |
|
|
|
Args: |
|
results (list[tuple | numpy.ndarray]): Testing results of the |
|
dataset. |
|
jsonfile_prefix (str | None): The prefix of json files. It includes |
|
the file path and the prefix of filename, e.g., "a/b/prefix". |
|
If not specified, a temp file will be created. Default: None. |
|
|
|
Returns: |
|
tuple: (result_files, tmp_dir), result_files is a dict containing \ |
|
the json filepaths, tmp_dir is the temporal directory created \ |
|
for saving json files when jsonfile_prefix is not specified. |
|
""" |
|
assert isinstance(results, list), 'results must be a list' |
|
assert len(results) == len(self), ( |
|
'The length of results is not equal to the dataset len: {} != {}'. |
|
format(len(results), len(self))) |
|
|
|
if jsonfile_prefix is None: |
|
tmp_dir = tempfile.TemporaryDirectory() |
|
jsonfile_prefix = osp.join(tmp_dir.name, 'results') |
|
else: |
|
tmp_dir = None |
|
result_files = self.results2json(results, jsonfile_prefix) |
|
return result_files, tmp_dir |
|
|
|
def evaluate(self, |
|
results, |
|
metric='bbox', |
|
logger=None, |
|
jsonfile_prefix=None, |
|
classwise=False, |
|
proposal_nums=(100, 300, 1000), |
|
iou_thrs=None, |
|
metric_items=None): |
|
"""Evaluation in COCO protocol. |
|
|
|
Args: |
|
results (list[list | tuple]): Testing results of the dataset. |
|
metric (str | list[str]): Metrics to be evaluated. Options are |
|
'bbox', 'segm', 'proposal', 'proposal_fast'. |
|
logger (logging.Logger | str | None): Logger used for printing |
|
related information during evaluation. Default: None. |
|
jsonfile_prefix (str | None): The prefix of json files. It includes |
|
the file path and the prefix of filename, e.g., "a/b/prefix". |
|
If not specified, a temp file will be created. Default: None. |
|
classwise (bool): Whether to evaluating the AP for each class. |
|
proposal_nums (Sequence[int]): Proposal number used for evaluating |
|
recalls, such as recall@100, recall@1000. |
|
Default: (100, 300, 1000). |
|
iou_thrs (Sequence[float], optional): IoU threshold used for |
|
evaluating recalls/mAPs. If set to a list, the average of all |
|
IoUs will also be computed. If not specified, [0.50, 0.55, |
|
0.60, 0.65, 0.70, 0.75, 0.80, 0.85, 0.90, 0.95] will be used. |
|
Default: None. |
|
metric_items (list[str] | str, optional): Metric items that will |
|
be returned. If not specified, ``['AR@100', 'AR@300', |
|
'AR@1000', 'AR_s@1000', 'AR_m@1000', 'AR_l@1000' ]`` will be |
|
used when ``metric=='proposal'``, ``['mAP', 'mAP_50', 'mAP_75', |
|
'mAP_s', 'mAP_m', 'mAP_l']`` will be used when |
|
``metric=='bbox' or metric=='segm'``. |
|
|
|
Returns: |
|
dict[str, float]: COCO style evaluation metric. |
|
""" |
|
|
|
metrics = metric if isinstance(metric, list) else [metric] |
|
allowed_metrics = ['bbox', 'segm', 'proposal', 'proposal_fast'] |
|
for metric in metrics: |
|
if metric not in allowed_metrics: |
|
raise KeyError(f'metric {metric} is not supported') |
|
if iou_thrs is None: |
|
iou_thrs = np.linspace( |
|
.5, 0.95, int(np.round((0.95 - .5) / .05)) + 1, endpoint=True) |
|
if metric_items is not None: |
|
if not isinstance(metric_items, list): |
|
metric_items = [metric_items] |
|
|
|
result_files, tmp_dir = self.format_results(results, jsonfile_prefix) |
|
|
|
eval_results = OrderedDict() |
|
cocoGt = self.coco |
|
for metric in metrics: |
|
msg = f'Evaluating {metric}...' |
|
if logger is None: |
|
msg = '\n' + msg |
|
print_log(msg, logger=logger) |
|
|
|
if metric == 'proposal_fast': |
|
ar = self.fast_eval_recall( |
|
results, proposal_nums, iou_thrs, logger='silent') |
|
log_msg = [] |
|
for i, num in enumerate(proposal_nums): |
|
eval_results[f'AR@{num}'] = ar[i] |
|
log_msg.append(f'\nAR@{num}\t{ar[i]:.4f}') |
|
log_msg = ''.join(log_msg) |
|
print_log(log_msg, logger=logger) |
|
continue |
|
|
|
if metric not in result_files: |
|
raise KeyError(f'{metric} is not in results') |
|
try: |
|
cocoDt = cocoGt.loadRes(result_files[metric]) |
|
except IndexError: |
|
print_log( |
|
'The testing results of the whole dataset is empty.', |
|
logger=logger, |
|
level=logging.ERROR) |
|
break |
|
|
|
iou_type = 'bbox' if metric == 'proposal' else metric |
|
cocoEval = COCOeval(cocoGt, cocoDt, iou_type) |
|
cocoEval.params.catIds = self.cat_ids |
|
cocoEval.params.imgIds = self.img_ids |
|
cocoEval.params.maxDets = list(proposal_nums) |
|
cocoEval.params.iouThrs = iou_thrs |
|
|
|
coco_metric_names = { |
|
'mAP': 0, |
|
'mAP_50': 1, |
|
'mAP_75': 2, |
|
'mAP_s': 3, |
|
'mAP_m': 4, |
|
'mAP_l': 5, |
|
'AR@100': 6, |
|
'AR@300': 7, |
|
'AR@1000': 8, |
|
'AR_s@1000': 9, |
|
'AR_m@1000': 10, |
|
'AR_l@1000': 11 |
|
} |
|
if metric_items is not None: |
|
for metric_item in metric_items: |
|
if metric_item not in coco_metric_names: |
|
raise KeyError( |
|
f'metric item {metric_item} is not supported') |
|
|
|
if metric == 'proposal': |
|
cocoEval.params.useCats = 0 |
|
cocoEval.evaluate() |
|
cocoEval.accumulate() |
|
cocoEval.summarize() |
|
if metric_items is None: |
|
metric_items = [ |
|
'AR@100', 'AR@300', 'AR@1000', 'AR_s@1000', |
|
'AR_m@1000', 'AR_l@1000' |
|
] |
|
|
|
for item in metric_items: |
|
val = float( |
|
f'{cocoEval.stats[coco_metric_names[item]]:.3f}') |
|
eval_results[item] = val |
|
else: |
|
cocoEval.evaluate() |
|
cocoEval.accumulate() |
|
cocoEval.summarize() |
|
if classwise: |
|
|
|
|
|
precisions = cocoEval.eval['precision'] |
|
|
|
assert len(self.cat_ids) == precisions.shape[2] |
|
|
|
results_per_category = [] |
|
for idx, catId in enumerate(self.cat_ids): |
|
|
|
|
|
nm = self.coco.loadCats(catId)[0] |
|
precision = precisions[:, :, idx, 0, -1] |
|
precision = precision[precision > -1] |
|
if precision.size: |
|
ap = np.mean(precision) |
|
else: |
|
ap = float('nan') |
|
results_per_category.append( |
|
(f'{nm["name"]}', f'{float(ap):0.3f}')) |
|
|
|
num_columns = min(6, len(results_per_category) * 2) |
|
results_flatten = list( |
|
itertools.chain(*results_per_category)) |
|
headers = ['category', 'AP'] * (num_columns // 2) |
|
results_2d = itertools.zip_longest(*[ |
|
results_flatten[i::num_columns] |
|
for i in range(num_columns) |
|
]) |
|
table_data = [headers] |
|
table_data += [result for result in results_2d] |
|
table = AsciiTable(table_data) |
|
print_log('\n' + table.table, logger=logger) |
|
|
|
if metric_items is None: |
|
metric_items = [ |
|
'mAP', 'mAP_50', 'mAP_75', 'mAP_s', 'mAP_m', 'mAP_l' |
|
] |
|
|
|
for metric_item in metric_items: |
|
key = f'{metric}_{metric_item}' |
|
val = float( |
|
f'{cocoEval.stats[coco_metric_names[metric_item]]:.3f}' |
|
) |
|
eval_results[key] = val |
|
ap = cocoEval.stats[:6] |
|
eval_results[f'{metric}_mAP_copypaste'] = ( |
|
f'{ap[0]:.3f} {ap[1]:.3f} {ap[2]:.3f} {ap[3]:.3f} ' |
|
f'{ap[4]:.3f} {ap[5]:.3f}') |
|
if tmp_dir is not None: |
|
tmp_dir.cleanup() |
|
return eval_results |
|
|