matching_series / matching_series.py
bowdbeg's picture
fix non-seriarizable bug
5238cd4
raw
history blame
16.9 kB
# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TODO: Add a description here."""
import concurrent.futures
import math
import statistics
from typing import List, Optional, Union
import datasets
import evaluate
import numpy as np
# TODO: Add BibTeX citation
_CITATION = """\
@InProceedings{huggingface:module,
title = {A great new module},
authors={huggingface, Inc.},
year={2020}
}
"""
# TODO: Add description of the module here
_DESCRIPTION = """\
This new module is designed to solve this great ML task and is crafted with a lot of care.
"""
# TODO: Add description of the arguments of the module here
_KWARGS_DESCRIPTION = """
Calculates how good are predictions given some references, using certain scores
Args:
predictions: list of generated time series.
shape: (num_generation, num_timesteps, num_features)
references: list of reference
shape: (num_reference, num_timesteps, num_features)
Returns:
Examples:
Examples should be written in doctest format, and should illustrate how
to use the function.
>>> my_new_module = evaluate.load("bowdbeg/matching_series")
>>> results = my_new_module.compute(references=[[[0.0, 1.0]]], predictions=[[[0.0, 1.0]]])
>>> print(results)
{'matchin': 1.0}
"""
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION)
class matching_series(evaluate.Metric):
"""TODO: Short description of my evaluation module."""
def _info(self):
# TODO: Specifies the evaluate.EvaluationModuleInfo object
return evaluate.MetricInfo(
# This is the description that will appear on the modules page.
module_type="metric",
description=_DESCRIPTION,
citation=_CITATION,
inputs_description=_KWARGS_DESCRIPTION,
# This defines the format of each prediction and reference
features=datasets.Features(
{
"predictions": datasets.Sequence(datasets.Sequence(datasets.Value("float"))),
"references": datasets.Sequence(datasets.Sequence(datasets.Value("float"))),
}
),
# Homepage of the module for documentation
homepage="https://huggingface.co/spaces/bowdbeg/matching_series",
# Additional links to the codebase or references
codebase_urls=["http://github.com/path/to/codebase/of/new_module"],
reference_urls=["http://path.to.reference.url/new_module"],
)
def _download_and_prepare(self, dl_manager):
"""Optional: download external resources useful to compute the scores"""
pass
def compute(self, *, predictions=None, references=None, **kwargs) -> Optional[dict]:
"""Compute the evaluation module.
Usage of positional arguments is not allowed to prevent mistakes.
Args:
predictions (`list/array/tensor`, *optional*):
Predictions.
references (`list/array/tensor`, *optional*):
References.
**kwargs (optional):
Keyword arguments that will be forwarded to the evaluation module [`~evaluate.EvaluationModule.compute`]
method (see details in the docstring).
Return:
`dict` or `None`
- Dictionary with the results if this evaluation module is run on the main process (`process_id == 0`).
- `None` if the evaluation module is not run on the main process (`process_id != 0`).
```py
>>> import evaluate
>>> accuracy = evaluate.load("accuracy")
>>> accuracy.compute(predictions=[0, 1, 1, 0], references=[0, 1, 0, 1])
```
"""
all_kwargs = {"predictions": predictions, "references": references, **kwargs}
if predictions is None and references is None:
missing_kwargs = {k: None for k in self._feature_names() if k not in all_kwargs}
all_kwargs.update(missing_kwargs)
else:
missing_inputs = [k for k in self._feature_names() if k not in all_kwargs]
if missing_inputs:
raise ValueError(
f"Evaluation module inputs are missing: {missing_inputs}. All required inputs are {list(self._feature_names())}"
)
inputs = {input_name: all_kwargs[input_name] for input_name in self._feature_names()}
compute_kwargs = {k: kwargs[k] for k in kwargs if k not in self._feature_names()}
return self._compute(**inputs, **compute_kwargs)
def _compute(
self,
predictions: Union[List, np.ndarray],
references: Union[List, np.ndarray],
batch_size: Optional[int] = None,
cuc_n_calculation: int = 3,
cuc_n_samples: Union[List[int], str] = "auto",
metric: str = "mse",
num_process: int = 1,
return_distance: bool = False,
return_matching: bool = False,
return_each_features: bool = False,
return_coverages: bool = False,
return_all: bool = False,
dtype=np.float32,
):
"""
Compute the scores of the module given the predictions and references
Args:
predictions: list of generated time series.
shape: (num_generation, num_timesteps, num_features)
references: list of reference
shape: (num_reference, num_timesteps, num_features)
batch_size: batch size to use for the computation. If None, the whole dataset is processed at once.
cuc_n_calculation: number of Coverage Under Curve calculate times
cuc_n_samples: number of samples to use for Coverage Under Curve calculation. If "auto", it uses the number of samples of the predictions.
Returns:
"""
if return_all:
return_distance = True
return_matching = True
return_each_features = True
return_coverages = True
predictions = np.array(predictions).astype(dtype)
references = np.array(references).astype(dtype)
if predictions.shape[1:] != references.shape[1:]:
raise ValueError(
"The number of features in the predictions and references should be the same. predictions: {}, references: {}".format(
predictions.shape[1:], references.shape[1:]
)
)
# at first, convert the inputs to numpy arrays
# distance between predictions and references for all example combinations for each features
# shape: (num_generation, num_reference, num_features)
if batch_size is not None:
if num_process > 1:
distance = np.zeros((len(predictions), len(references), predictions.shape[-1]), dtype=dtype)
idxs = [
(i, j)
for i in range(0, len(predictions) + batch_size, batch_size)
for j in range(0, len(references) + batch_size, batch_size)
]
args = [
(predictions[i : i + batch_size, None], references[None, j : j + batch_size], metric, -2)
for i, j in idxs
]
with concurrent.futures.ProcessPoolExecutor(max_workers=num_process) as executor:
results = executor.map(
self._compute_metric,
*zip(*args),
)
for (i, j), d in zip(idxs, results):
distance[i : i + batch_size, j : j + batch_size] = d
else:
distance = np.zeros((len(predictions), len(references), predictions.shape[-1]), dtype=dtype)
# iterate over the predictions and references in batches
for i in range(0, len(predictions) + batch_size, batch_size):
for j in range(0, len(references) + batch_size, batch_size):
d = self._compute_metric(
predictions[i : i + batch_size, None],
references[None, j : j + batch_size],
metric=metric,
axis=-2,
)
distance[i : i + batch_size, j : j + batch_size] = d
else:
distance = self._compute_metric(predictions[:, None], references, metric=metric, axis=1)
index_distance = distance.diagonal(axis1=0, axis2=1).mean().item()
# matching scores
distance_mean = distance.mean(axis=-1)
# best match for each generated time series
# shape: (num_generation,)
best_match = np.argmin(distance_mean, axis=-1)
# matching distance
# shape: (num_generation,)
precision_distance = distance_mean[np.arange(len(best_match)), best_match].mean().item()
# best match for each reference time series
# shape: (num_reference,)
best_match_inv = np.argmin(distance_mean, axis=0)
recall_distance = distance_mean[best_match_inv, np.arange(len(best_match_inv))].mean().item()
f1_distance = 2 / (1 / precision_distance + 1 / recall_distance)
mean_distance = (precision_distance + recall_distance) / 2
# matching precision, recall and f1
matching_recall = np.unique(best_match).size / len(best_match_inv)
matching_precision = np.unique(best_match_inv).size / len(best_match)
matching_f1 = 2 / (1 / matching_precision + 1 / matching_recall)
# take matching for each feature and compute metrics for them
precision_distance_features = []
recall_distance_features = []
f1_distance_features = []
mean_distance_features = []
matching_precision_features = []
matching_recall_features = []
matching_f1_features = []
index_distance_features = []
coverages_features = []
cuc_features = []
for f in range(predictions.shape[-1]):
distance_f = distance[:, :, f]
index_distance_f = (distance_f.diagonal(axis1=0, axis2=1).mean()).item()
best_match_f = np.argmin(distance_f, axis=-1)
precision_distance_f = (distance_f[np.arange(len(best_match_f)), best_match_f].mean()).item()
best_match_inv_f = np.argmin(distance_f, axis=0)
recall_distance_f = (distance_f[best_match_inv_f, np.arange(len(best_match_inv_f))].mean()).item()
f1_distance_f = 2 / (1 / precision_distance_f + 1 / recall_distance_f)
mean_distance_f = (precision_distance_f + recall_distance_f) / 2
precision_distance_features.append(precision_distance_f)
recall_distance_features.append(recall_distance_f)
f1_distance_features.append(f1_distance_f)
index_distance_features.append(index_distance_f)
mean_distance_features.append(mean_distance_f)
matching_recall_f = np.unique(best_match_f).size / len(best_match_f)
matching_precision_f = np.unique(best_match_inv_f).size / len(best_match_inv_f)
matching_f1_f = 2 / (1 / matching_precision_f + 1 / matching_recall_f)
matching_precision_features.append(matching_precision_f)
matching_recall_features.append(matching_recall_f)
matching_f1_features.append(matching_f1_f)
coverages_f, cuc_f = self.compute_cuc(best_match_f, len(references), cuc_n_calculation, cuc_n_samples)
coverages_features.append(coverages_f)
cuc_features.append(cuc_f)
macro_precision_distance = statistics.mean(precision_distance_features)
macro_recall_distance = statistics.mean(recall_distance_features)
macro_f1_distance = statistics.mean(f1_distance_features)
macro_mean_distance = statistics.mean(mean_distance_features)
macro_index_distance = statistics.mean(index_distance_features)
macro_matching_precision = statistics.mean(matching_precision_features)
macro_matching_recall = statistics.mean(matching_recall_features)
macro_matching_f1 = statistics.mean(matching_f1_features)
# cuc
coverages, cuc = self.compute_cuc(best_match, len(references), cuc_n_calculation, cuc_n_samples)
macro_cuc = statistics.mean(cuc_features)
macro_coverages = [statistics.mean(c) for c in zip(*coverages_features)]
out = {
"precision_distance": precision_distance,
"f1_distance": f1_distance,
"recall_distance": recall_distance,
"mean_distance": mean_distance,
"index_distance": index_distance,
"macro_precision_distance": macro_precision_distance,
"macro_recall_distance": macro_recall_distance,
"macro_f1_distance": macro_f1_distance,
"macro_mean_distance": macro_mean_distance,
"macro_index_distance": macro_index_distance,
"matching_precision": matching_precision,
"matching_recall": matching_recall,
"matching_f1": matching_f1,
"macro_matching_precision": macro_matching_precision,
"macro_matching_recall": macro_matching_recall,
"macro_matching_f1": macro_matching_f1,
"cuc": cuc,
"macro_cuc": macro_cuc,
}
if return_distance:
out["distance"] = distance
if return_matching:
out["match"] = best_match
out["match_inv"] = best_match_inv
if return_each_features:
if return_distance:
out["distance_features"] = distance_mean
out.update(
{
"precision_distance_features": precision_distance_features,
"f1_distance_features": f1_distance_features,
"recall_distance_features": recall_distance_features,
"index_distance_features": index_distance_features,
"matching_precision_features": matching_precision_features,
"matching_recall_features": matching_recall_features,
"matching_f1_features": matching_f1_features,
"cuc_features": cuc_features,
"coverages_features": coverages_features,
}
)
if return_coverages:
out["coverages"] = coverages
out["macro_coverages"] = macro_coverages
return out
def compute_cuc(
self,
match: np.ndarray,
n_reference: int,
n_calculation: int,
n_samples: Union[List[int], str],
):
"""
Compute Coverage Under Curve
Args:
match: best match for each generated time series
n_reference: number of reference time series
n_calculation: number of Coverage Under Curve calculate times
n_samples: number of samples to use for Coverage Under Curve calculation. If "auto", it uses the number of samples of the predictions.
Returns:
"""
n_generaiton = len(match)
if n_samples == "auto":
exp = int(math.log2(n_generaiton))
n_samples = [int(2**i) for i in range(exp)]
n_samples.append(n_generaiton)
assert isinstance(n_samples, list) and all(isinstance(n, int) for n in n_samples)
coverages = []
for n_sample in n_samples:
coverage = 0
for _ in range(n_calculation):
sample = np.random.choice(match, size=n_sample, replace=False) # type: ignore
coverage += len(np.unique(sample)) / n_reference
coverages.append(coverage / n_calculation)
cuc = (np.trapz(coverages, n_samples) / len(n_samples) / max(n_samples)).item()
return coverages, cuc
@staticmethod
def _compute_metric(x, y, metric: str = "mse", axis: int = -1):
if metric.lower() == "mse":
return np.mean((x - y) ** 2, axis=axis)
elif metric.lower() == "mae":
return np.mean(np.abs(x - y), axis=axis)
elif metric.lower() == "rmse":
return np.sqrt(np.mean((x - y) ** 2, axis=axis))
else:
raise ValueError("Unknown metric: {}".format(metric))