|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
import scipy |
|
import sklearn.utils.validation |
|
|
|
from dataclasses import dataclass |
|
from functools import cached_property |
|
from typing import Any |
|
import pandas as pd |
|
import numpy as np |
|
import numpy.typing as npt |
|
from collections import OrderedDict |
|
from sklearn import metrics as skm |
|
import evaluate as HF_evaluate |
|
|
|
ArrayType = npt.NDArray[np.floating] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def f1_w(y_true, p_hat, y_hat=None): |
|
if y_hat is None: |
|
y_hat = np.argmax(p_hat, axis=-1) |
|
return skm.f1_score(y_true, y_hat, average="weighted") |
|
|
|
|
|
def f1_micro(y_true, p_hat, y_hat=None): |
|
if y_hat is None: |
|
y_hat = np.argmax(p_hat, axis=-1) |
|
return skm.f1_score(y_true, y_hat, average="micro") |
|
|
|
|
|
def f1_macro(y_true, p_hat, y_hat=None): |
|
if y_hat is None: |
|
y_hat = np.argmax(p_hat, axis=-1) |
|
return skm.f1_score(y_true, y_hat, average="macro") |
|
|
|
|
|
|
|
|
|
|
|
def brier_loss(y_true, p_hat): |
|
r"""Brier score. |
|
If the true label is k, while the predicted vector of probabilities is |
|
[y_1, ..., y_n], then the Brier score is equal to |
|
\sum_{i != k} y_i^2 + (y_k - 1)^2. |
|
|
|
The smaller the Brier score, the better, hence the naming with "loss". |
|
Across all items in a set N predictions, the Brier score measures the |
|
mean squared difference between (1) the predicted probability assigned |
|
to the possible outcomes for item i, and (2) the actual outcome. |
|
Therefore, the lower the Brier score is for a set of predictions, the |
|
better the predictions are calibrated. Note that the Brier score always |
|
takes on a value between zero and one, since this is the largest |
|
possible difference between a predicted probability (which must be |
|
between zero and one) and the actual outcome (which can take on values |
|
of only 0 and 1). The Brier loss is composed of refinement loss and |
|
calibration loss. |
|
|
|
""" |
|
N = len(y_true) |
|
K = p_hat.shape[-1] |
|
|
|
if y_true.shape != p_hat.shape: |
|
zeros = scipy.sparse.lil_matrix((N, K)) |
|
for i in range(N): |
|
zeros[i, y_true[i]] = 1 |
|
|
|
if not np.isclose(np.sum(p_hat), len(p_hat)): |
|
p_hat = scipy.special.softmax(p_hat, axis=-1) |
|
|
|
return np.mean(np.sum(np.array(p_hat - zeros) ** 2, axis=1)) |
|
|
|
|
|
def nll(y_true, p_hat): |
|
r"""Multi-class negative log likelihood. |
|
If the true label is k, while the predicted vector of probabilities is |
|
[p_1, ..., p_K], then the negative log likelihood is -log(p_k). |
|
Does not require onehot encoding |
|
""" |
|
labels = np.arange(p_hat.shape[-1]) |
|
return skm.log_loss(y_true, p_hat, labels=labels) |
|
|
|
|
|
def accuracy(y_true, p_hat): |
|
y_pred = np.argmax(p_hat, axis=-1) |
|
return sklearn.metrics.accuracy_score(y_true=y_true, y_pred=y_pred) |
|
|
|
|
|
AURC_DISPLAY_SCALE = 1 |
|
|
|
""" |
|
From: https://web.stanford.edu/class/archive/cs/cs224n/cs224n.1204/reports/custom/report52.pdf |
|
|
|
The risk-coverage (RC) curve [28, 16] is a measure of the trade-off between the |
|
coverage (the proportion of test data encountered), and the risk (the error rate under this coverage). Since each |
|
prediction comes with a confidence score, given a list of prediction correctness Z paired up with the confidence |
|
scores C, we sort C in reverse order to obtain sorted C' |
|
, and its corresponding correctness Z' |
|
. Note that the correctness is computed based on Exact Match (EM) as described in [22]. The RC curve is then obtained by |
|
computing the risk of the coverage from the beginning of Z' |
|
(most confident) to the end (least confident). In particular, these metrics evaluate |
|
the relative order of the confidence score, which means that we want wrong |
|
answers have lower confidence score than the correct ones, ignoring their absolute values. |
|
|
|
Source: https://github.com/kjdhfg/fd-shifts |
|
|
|
References: |
|
----------- |
|
|
|
[1] Jaeger, P.F., Lüth, C.T., Klein, L. and Bungert, T.J., 2022. A Call to Reflect on Evaluation Practices for Failure Detection in Image Classification. arXiv preprint arXiv:2211.15259. |
|
|
|
[2] Kamath, A., Jia, R. and Liang, P., 2020. Selective Question Answering under Domain Shift. In Proceedings of the 58th Annual Meeting of the Association for Computational Linguistics (pp. 5684-5696). |
|
|
|
""" |
|
|
|
|
|
@dataclass |
|
class StatsCache: |
|
"""Cache for stats computed by scikit used by multiple metrics. |
|
|
|
Attributes: |
|
confids (array_like): Confidence values |
|
correct (array_like): Boolean array (best converted to int) where predictions were correct |
|
""" |
|
|
|
confids: npt.NDArray[Any] |
|
correct: npt.NDArray[Any] |
|
|
|
@cached_property |
|
def roc_curve_stats(self) -> tuple[npt.NDArray[Any], npt.NDArray[Any]]: |
|
fpr, tpr, _ = skm.roc_curve(self.correct, self.confids) |
|
return fpr, tpr |
|
|
|
@property |
|
def residuals(self) -> npt.NDArray[Any]: |
|
return 1 - self.correct |
|
|
|
@cached_property |
|
def rc_curve_stats(self) -> tuple[list[float], list[float], list[float]]: |
|
coverages = [] |
|
risks = [] |
|
|
|
n_residuals = len(self.residuals) |
|
idx_sorted = np.argsort(self.confids) |
|
|
|
coverage = n_residuals |
|
error_sum = sum(self.residuals[idx_sorted]) |
|
|
|
coverages.append(coverage / n_residuals) |
|
risks.append(error_sum / n_residuals) |
|
|
|
weights = [] |
|
|
|
tmp_weight = 0 |
|
for i in range(0, len(idx_sorted) - 1): |
|
coverage = coverage - 1 |
|
error_sum = error_sum - self.residuals[idx_sorted[i]] |
|
selective_risk = error_sum / (n_residuals - 1 - i) |
|
tmp_weight += 1 |
|
if i == 0 or self.confids[idx_sorted[i]] != self.confids[idx_sorted[i - 1]]: |
|
coverages.append(coverage / n_residuals) |
|
risks.append(selective_risk) |
|
weights.append(tmp_weight / n_residuals) |
|
tmp_weight = 0 |
|
|
|
|
|
if tmp_weight > 0: |
|
coverages.append(0) |
|
risks.append(risks[-1]) |
|
weights.append(tmp_weight / n_residuals) |
|
return coverages, risks, weights |
|
|
|
|
|
def aurc(stats_cache: StatsCache): |
|
"""auc metric function |
|
Args: |
|
stats_cache (StatsCache): StatsCache object |
|
Returns: |
|
metric value |
|
Important for assessment: LOWER is better! |
|
""" |
|
_, risks, weights = stats_cache.rc_curve_stats |
|
return sum([(risks[i] + risks[i + 1]) * 0.5 * weights[i] for i in range(len(weights))]) * AURC_DISPLAY_SCALE |
|
|
|
|
|
def aurc_logits(references, predictions, plot=False, get_cache=False, use_as_is=False): |
|
if not use_as_is: |
|
if not np.isclose(np.sum(references), len(references)): |
|
references = (np.argmax(predictions, -1) == references).astype(int) |
|
|
|
if not np.isclose(np.sum(predictions), len(predictions)): |
|
predictions = scipy.special.softmax(predictions, axis=-1) |
|
|
|
if predictions.ndim == 2: |
|
predictions = np.max(predictions, -1) |
|
|
|
cache = StatsCache(confids=predictions, correct=references) |
|
|
|
if plot: |
|
coverages, risks, weights = cache.rc_curve_stats |
|
pd.options.plotting.backend = "plotly" |
|
df = pd.DataFrame(zip(coverages, risks, weights), columns=["% Coverage", "% Risk", "weights"]) |
|
fig = df.plot(x="% Coverage", y="% Risk") |
|
fig.show() |
|
if get_cache: |
|
return {"aurc": aurc(cache), "cache": cache} |
|
return aurc(cache) |
|
|
|
|
|
def multi_aurc_plot(caches, names, aurcs=None, verbose=False): |
|
pd.options.plotting.backend = "plotly" |
|
df = pd.DataFrame() |
|
for cache, name in zip(caches, names): |
|
coverages, risks, weights = cache.rc_curve_stats |
|
df[name] = pd.Series(risks, index=coverages) |
|
if verbose: |
|
print(df.head(), df.index, df.columns) |
|
fig = df.plot() |
|
title = "" |
|
if aurcs is not None: |
|
title = "AURC: " + " - ".join([str(round(aurc, 4)) for aurc in aurcs]) |
|
fig.update_layout(title=title, xaxis_title="% Coverage", yaxis_title="% Risk") |
|
fig.show() |
|
|
|
|
|
def ece_logits(references, predictions, use_as_is=False): |
|
if not use_as_is: |
|
if not np.isclose(np.sum(predictions), len(predictions)): |
|
predictions = scipy.special.softmax(predictions, axis=-1) |
|
|
|
metric = HF_evaluate.load("jordyvl/ece") |
|
kwargs = dict( |
|
n_bins=min(len(predictions) - 1, 100), |
|
scheme="equal-mass", |
|
bin_range=[0, 1], |
|
proxy="upper-edge", |
|
p=1, |
|
detail=False, |
|
) |
|
|
|
ece_result = metric.compute( |
|
references=references, |
|
predictions=predictions, |
|
**kwargs, |
|
) |
|
return ece_result["ECE"] |
|
|
|
|
|
METRICS = [accuracy, brier_loss, nll, f1_w, f1_macro, ece_logits, aurc_logits] |
|
|
|
|
|
def apply_metrics(y_true, y_probs, metrics=METRICS): |
|
predictive_performance = OrderedDict() |
|
for metric in metrics: |
|
try: |
|
predictive_performance[f"{metric.__name__.replace('_logits', '')}"] = metric(y_true, y_probs) |
|
except Exception as e: |
|
print(e) |
|
|
|
return predictive_performance |
|
|
|
|
|
def evaluate_coverages( |
|
logits, labels, confidence, coverages=[100, 99, 98, 97, 95, 90, 85, 80, 75, 70, 60, 50, 40, 30, 20, 10] |
|
): |
|
correctness = np.equal(logits.argmax(-1), labels) |
|
abstention_results = list(zip(list(confidence), list(correctness))) |
|
|
|
abstention_results.sort(key=lambda x: x[0]) |
|
|
|
sorted_correct = list(map(lambda x: int(x[1]), abstention_results)) |
|
size = len(sorted_correct) |
|
print("Abstention Logit: accuracy of coverage ") |
|
for coverage in coverages: |
|
covered_correct = sorted_correct[: round(size / 100 * coverage)] |
|
print("{:.0f}: {:.3f}, ".format(coverage, sum(covered_correct) / len(covered_correct) * 100.0), end="") |
|
print("") |
|
|
|
sr_results = list(zip(list(logits.max(-1)), list(correctness))) |
|
|
|
sr_results.sort(key=lambda x: -x[0]) |
|
|
|
sorted_correct = list(map(lambda x: int(x[1]), sr_results)) |
|
size = len(sorted_correct) |
|
print("Softmax Response: accuracy of coverage ") |
|
for coverage in coverages: |
|
covered_correct = sorted_correct[: round(size / 100 * coverage)] |
|
print("{:.0f}: {:.3f}, ".format(coverage, sum(covered_correct) / len(covered_correct) * 100.0), end="") |
|
print("") |
|
|
|
|
|
def compute_metrics(eval_preds): |
|
logits, labels = eval_preds |
|
if isinstance(logits, tuple): |
|
confidence = logits[1] |
|
logits = logits[0] |
|
if confidence.size == logits.shape[0]: |
|
evaluate_coverages(logits, labels, confidence) |
|
results = apply_metrics(labels, logits) |
|
return results |
|
|