File size: 18,798 Bytes
cc0dd3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# Copyright (c) OpenMMLab. All rights reserved.
from typing import List, Optional, Sequence, Union

import mmengine
import numpy as np
import torch
from mmengine.evaluator import BaseMetric
from mmengine.utils import is_seq_of

from mmpretrain.registry import METRICS
from mmpretrain.structures import label_to_onehot
from .single_label import to_tensor


@METRICS.register_module()
class RetrievalRecall(BaseMetric):
    r"""Recall evaluation metric for image retrieval.

    Args:
        topk (int | Sequence[int]): If the ground truth label matches one of
            the best **k** predictions, the sample will be regard as a positive
            prediction. If the parameter is a tuple, all of top-k recall will
            be calculated and outputted together. Defaults to 1.
        collect_device (str): Device name used for collecting results from
            different ranks during distributed training. Must be 'cpu' or
            'gpu'. Defaults to 'cpu'.
        prefix (str, optional): The prefix that will be added in the metric
            names to disambiguate homonymous metrics of different evaluators.
            If prefix is not provided in the argument, self.default_prefix
            will be used instead. Defaults to None.

    Examples:
        Use in the code:

        >>> import torch
        >>> from mmpretrain.evaluation import RetrievalRecall
        >>> # -------------------- The Basic Usage --------------------
        >>> y_pred = [[0], [1], [2], [3]]
        >>> y_true = [[0, 1], [2], [1], [0, 3]]
        >>> RetrievalRecall.calculate(
        >>>     y_pred, y_true, topk=1, pred_indices=True, target_indices=True)
        [tensor([50.])]
        >>> # Calculate the recall@1 and recall@5 for non-indices input.
        >>> y_score = torch.rand((1000, 10))
        >>> import torch.nn.functional as F
        >>> y_true = F.one_hot(torch.arange(0, 1000) % 10, num_classes=10)
        >>> RetrievalRecall.calculate(y_score, y_true, topk=(1, 5))
        [tensor(9.3000), tensor(48.4000)]
        >>>
        >>> # ------------------- Use with Evalutor -------------------
        >>> from mmpretrain.structures import DataSample
        >>> from mmengine.evaluator import Evaluator
        >>> data_samples = [
        ...     DataSample().set_gt_label([0, 1]).set_pred_score(
        ...     torch.rand(10))
        ...     for i in range(1000)
        ... ]
        >>> evaluator = Evaluator(metrics=RetrievalRecall(topk=(1, 5)))
        >>> evaluator.process(data_samples)
        >>> evaluator.evaluate(1000)
        {'retrieval/Recall@1': 20.700000762939453,
         'retrieval/Recall@5': 78.5999984741211}

        Use in OpenMMLab configs:

        .. code:: python

            val_evaluator = dict(type='RetrievalRecall', topk=(1, 5))
            test_evaluator = val_evaluator
    """
    default_prefix: Optional[str] = 'retrieval'

    def __init__(self,
                 topk: Union[int, Sequence[int]],
                 collect_device: str = 'cpu',
                 prefix: Optional[str] = None) -> None:
        topk = (topk, ) if isinstance(topk, int) else topk

        for k in topk:
            if k <= 0:
                raise ValueError('`topk` must be a ingter larger than 0 '
                                 'or seq of ingter larger than 0.')

        self.topk = topk
        super().__init__(collect_device=collect_device, prefix=prefix)

    def process(self, data_batch: Sequence[dict],
                data_samples: Sequence[dict]):
        """Process one batch of data and predictions.

        The processed results should be stored in ``self.results``, which will
        be used to computed the metrics when all batches have been processed.

        Args:
            data_batch (Sequence[dict]): A batch of data from the dataloader.
            predictions (Sequence[dict]): A batch of outputs from the model.
        """
        for data_sample in data_samples:
            pred_score = data_sample['pred_score'].clone()
            gt_label = data_sample['gt_label']

            if 'gt_score' in data_sample:
                target = data_sample.get('gt_score').clone()
            else:
                num_classes = pred_score.size()[-1]
                target = label_to_onehot(gt_label, num_classes)

            # Because the retrieval output logit vector will be much larger
            # compared to the normal classification, to save resources, the
            # evaluation results are computed each batch here and then reduce
            #  all results at the end.
            result = RetrievalRecall.calculate(
                pred_score.unsqueeze(0), target.unsqueeze(0), topk=self.topk)
            self.results.append(result)

    def compute_metrics(self, results: List):
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.

        Returns:
            Dict: The computed metrics. The keys are the names of the metrics,
            and the values are corresponding results.
        """
        result_metrics = dict()
        for i, k in enumerate(self.topk):
            recall_at_k = sum([r[i].item() for r in results]) / len(results)
            result_metrics[f'Recall@{k}'] = recall_at_k

        return result_metrics

    @staticmethod
    def calculate(pred: Union[np.ndarray, torch.Tensor],
                  target: Union[np.ndarray, torch.Tensor],
                  topk: Union[int, Sequence[int]],
                  pred_indices: (bool) = False,
                  target_indices: (bool) = False) -> float:
        """Calculate the average recall.

        Args:
            pred (torch.Tensor | np.ndarray | Sequence): The prediction
                results. A :obj:`torch.Tensor` or :obj:`np.ndarray` with
                shape ``(N, M)`` or a sequence of index/onehot
                format labels.
            target (torch.Tensor | np.ndarray | Sequence): The prediction
                results. A :obj:`torch.Tensor` or :obj:`np.ndarray` with
                shape ``(N, M)`` or a sequence of index/onehot
                format labels.
            topk (int, Sequence[int]): Predictions with the k-th highest
                scores are considered as positive.
            pred_indices (bool): Whether the ``pred`` is a sequence of
                category index labels. Defaults to False.
            target_indices (bool): Whether the ``target`` is a sequence of
                category index labels. Defaults to False.

        Returns:
            List[float]: the average recalls.
        """
        topk = (topk, ) if isinstance(topk, int) else topk
        for k in topk:
            if k <= 0:
                raise ValueError('`topk` must be a ingter larger than 0 '
                                 'or seq of ingter larger than 0.')

        max_keep = max(topk)
        pred = _format_pred(pred, max_keep, pred_indices)
        target = _format_target(target, target_indices)

        assert len(pred) == len(target), (
            f'Length of `pred`({len(pred)}) and `target` ({len(target)}) '
            f'must be the same.')

        num_samples = len(pred)
        results = []
        for k in topk:
            recalls = torch.zeros(num_samples)
            for i, (sample_pred,
                    sample_target) in enumerate(zip(pred, target)):
                sample_pred = np.array(to_tensor(sample_pred).cpu())
                sample_target = np.array(to_tensor(sample_target).cpu())
                recalls[i] = int(np.in1d(sample_pred[:k], sample_target).max())
            results.append(recalls.mean() * 100)
        return results


@METRICS.register_module()
class RetrievalAveragePrecision(BaseMetric):
    r"""Calculate the average precision for image retrieval.

    Args:
        topk (int, optional): Predictions with the k-th highest scores are
            considered as positive.
        mode (str, optional): The mode to calculate AP, choose from
                'IR'(information retrieval) and 'integrate'. Defaults to 'IR'.
        collect_device (str): Device name used for collecting results from
            different ranks during distributed training. Must be 'cpu' or
            'gpu'. Defaults to 'cpu'.
        prefix (str, optional): The prefix that will be added in the metric
            names to disambiguate homonymous metrics of different evaluators.
            If prefix is not provided in the argument, self.default_prefix
            will be used instead. Defaults to None.

    Note:
        If the ``mode`` set to 'IR', use the stanford AP calculation of
        information retrieval as in wikipedia page[1]; if set to 'integrate',
        the method implemented integrates over the precision-recall curve
        by averaging two adjacent precision points, then multiplying by the
        recall step like mAP in Detection task. This is the convention for
        the Revisited Oxford/Paris datasets[2].

    References:
        [1] `Wikipedia entry for the Average precision <https://en.wikipedia.
        org/wiki/Evaluation_measures_(information_retrieval)#Average_precision>`_

        [2] `The Oxford Buildings Dataset
        <https://www.robots.ox.ac.uk/~vgg/data/oxbuildings/>`_

    Examples:
        Use in code:

        >>> import torch
        >>> import numpy as np
        >>> from mmcls.evaluation import RetrievalAveragePrecision
        >>> # using index format inputs
        >>> pred = [ torch.Tensor([idx for idx in range(100)]) ] * 3
        >>> target = [[0, 3, 6, 8, 35], [1, 2, 54, 105], [2, 42, 205]]
        >>> RetrievalAveragePrecision.calculate(pred, target, 10, True, True)
        29.246031746031747
        >>> # using tensor format inputs
        >>> pred = np.array([np.linspace(0.95, 0.05, 10)] * 2)
        >>> target = torch.Tensor([[1, 0, 1, 0, 0, 1, 0, 0, 1, 1]] * 2)
        >>> RetrievalAveragePrecision.calculate(pred, target, 10)
        62.222222222222214

        Use in OpenMMLab config files:

        .. code:: python

            val_evaluator = dict(type='RetrievalAveragePrecision', topk=100)
            test_evaluator = val_evaluator
    """

    default_prefix: Optional[str] = 'retrieval'

    def __init__(self,
                 topk: Optional[int] = None,
                 mode: Optional[str] = 'IR',
                 collect_device: str = 'cpu',
                 prefix: Optional[str] = None) -> None:
        if topk is None or (isinstance(topk, int) and topk <= 0):
            raise ValueError('`topk` must be a ingter larger than 0.')

        mode_options = ['IR', 'integrate']
        assert mode in mode_options, \
            f'Invalid `mode` argument, please specify from {mode_options}.'

        self.topk = topk
        self.mode = mode
        super().__init__(collect_device=collect_device, prefix=prefix)

    def process(self, data_batch: Sequence[dict],
                data_samples: Sequence[dict]):
        """Process one batch of data and predictions.

        The processed results should be stored in ``self.results``, which will
        be used to computed the metrics when all batches have been processed.
        Args:
            data_batch (Sequence[dict]): A batch of data from the dataloader.
            predictions (Sequence[dict]): A batch of outputs from the model.
        """
        for data_sample in data_samples:
            pred_score = data_sample.get('pred_score').clone()

            if 'gt_score' in data_sample:
                target = data_sample.get('gt_score').clone()
            else:
                gt_label = data_sample.get('gt_label')
                num_classes = pred_score.size()[-1]
                target = label_to_onehot(gt_label, num_classes)

            # Because the retrieval output logit vector will be much larger
            # compared to the normal classification, to save resources, the
            # evaluation results are computed each batch here and then reduce
            #  all results at the end.
            result = RetrievalAveragePrecision.calculate(
                pred_score.unsqueeze(0),
                target.unsqueeze(0),
                self.topk,
                mode=self.mode)
            self.results.append(result)

    def compute_metrics(self, results: List):
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.
        Returns:
            Dict: The computed metrics. The keys are the names of the metrics,
            and the values are corresponding results.
        """
        result_metrics = dict()
        result_metrics[f'mAP@{self.topk}'] = np.mean(self.results).item()

        return result_metrics

    @staticmethod
    def calculate(pred: Union[np.ndarray, torch.Tensor],
                  target: Union[np.ndarray, torch.Tensor],
                  topk: Optional[int] = None,
                  pred_indices: (bool) = False,
                  target_indices: (bool) = False,
                  mode: str = 'IR') -> float:
        """Calculate the average precision.
        Args:
            pred (torch.Tensor | np.ndarray | Sequence): The prediction
                results. A :obj:`torch.Tensor` or :obj:`np.ndarray` with
                shape ``(N, M)`` or a sequence of index/onehot
                format labels.
            target (torch.Tensor | np.ndarray | Sequence): The prediction
                results. A :obj:`torch.Tensor` or :obj:`np.ndarray` with
                shape ``(N, M)`` or a sequence of index/onehot
                format labels.
            topk (int, optional): Predictions with the k-th highest scores
                 are considered as positive.
            pred_indices (bool): Whether the ``pred`` is a sequence of
                category index labels. Defaults to False.
            target_indices (bool): Whether the ``target`` is a sequence of
                category index labels. Defaults to False.
            mode (Optional[str]): The mode to calculate AP, choose from
                'IR'(information retrieval) and 'integrate'. Defaults to 'IR'.

        Note:
            If the ``mode`` set to 'IR', use the stanford AP calculation of
            information retrieval as in wikipedia page; if set to 'integrate',
            the method implemented integrates over the precision-recall curve
            by averaging two adjacent precision points, then multiplying by the
            recall step like mAP in Detection task. This is the convention for
            the Revisited Oxford/Paris datasets.

        Returns:
            float: the average precision of the query image.

        References:
            [1] `Wikipedia entry for Average precision(information_retrieval)
            <https://en.wikipedia.org/wiki/Evaluation_measures_

            (information_retrieval)#Average_precision>`_
            [2] `The Oxford Buildings Dataset <https://www.robots.ox.ac.uk/
            ~vgg/data/oxbuildings/`_
        """
        if topk is None or (isinstance(topk, int) and topk <= 0):
            raise ValueError('`topk` must be a ingter larger than 0.')

        mode_options = ['IR', 'integrate']
        assert mode in mode_options, \
            f'Invalid `mode` argument, please specify from {mode_options}.'

        pred = _format_pred(pred, topk, pred_indices)
        target = _format_target(target, target_indices)

        assert len(pred) == len(target), (
            f'Length of `pred`({len(pred)}) and `target` ({len(target)}) '
            f'must be the same.')

        num_samples = len(pred)
        aps = np.zeros(num_samples)
        for i, (sample_pred, sample_target) in enumerate(zip(pred, target)):
            aps[i] = _calculateAp_for_sample(sample_pred, sample_target, mode)

        return aps.mean()


def _calculateAp_for_sample(pred, target, mode):
    pred = np.array(to_tensor(pred).cpu())
    target = np.array(to_tensor(target).cpu())

    num_preds = len(pred)

    # TODO: use ``torch.isin`` in torch1.10.
    positive_ranks = np.arange(num_preds)[np.in1d(pred, target)]

    ap = 0
    for i, rank in enumerate(positive_ranks):
        if mode == 'IR':
            precision = (i + 1) / (rank + 1)
            ap += precision
        elif mode == 'integrate':
            # code are modified from https://www.robots.ox.ac.uk/~vgg/data/oxbuildings/compute_ap.cpp # noqa:
            old_precision = i / rank if rank > 0 else 1
            cur_precision = (i + 1) / (rank + 1)
            prediction = (old_precision + cur_precision) / 2
            ap += prediction
    ap = ap / len(target)

    return ap * 100


def _format_pred(label, topk=None, is_indices=False):
    """format various label to List[indices]."""
    if is_indices:
        assert isinstance(label, Sequence),  \
                '`pred` must be Sequence of indices when' \
                f' `pred_indices` set to True, but get {type(label)}'
        for i, sample_pred in enumerate(label):
            assert is_seq_of(sample_pred, int) or isinstance(
                sample_pred, (np.ndarray, torch.Tensor)), \
                '`pred` should be Sequence of indices when `pred_indices`' \
                f'set to True. but pred[{i}] is {sample_pred}'
            if topk:
                label[i] = sample_pred[:min(topk, len(sample_pred))]
        return label
    if isinstance(label, np.ndarray):
        label = torch.from_numpy(label)
    elif not isinstance(label, torch.Tensor):
        raise TypeError(f'The pred must be type of torch.tensor, '
                        f'np.ndarray or Sequence but get {type(label)}.')
    topk = topk if topk else label.size()[-1]
    _, indices = label.topk(topk)
    return indices


def _format_target(label, is_indices=False):
    """format various label to List[indices]."""
    if is_indices:
        assert isinstance(label, Sequence),  \
                '`target` must be Sequence of indices when' \
                f' `target_indices` set to True, but get {type(label)}'
        for i, sample_gt in enumerate(label):
            assert is_seq_of(sample_gt, int) or isinstance(
                sample_gt, (np.ndarray, torch.Tensor)), \
                '`target` should be Sequence of indices when ' \
                f'`target_indices` set to True. but target[{i}] is {sample_gt}'
        return label

    if isinstance(label, np.ndarray):
        label = torch.from_numpy(label)
    elif isinstance(label, Sequence) and not mmengine.is_str(label):
        label = torch.tensor(label)
    elif not isinstance(label, torch.Tensor):
        raise TypeError(f'The pred must be type of torch.tensor, '
                        f'np.ndarray or Sequence but get {type(label)}.')

    indices = [sample_gt.nonzero().squeeze(-1) for sample_gt in label]
    return indices