metric / metric.py
Elron's picture
Upload metric.py with huggingface_hub
8977100
raw
history blame
7.76 kB
from typing import Dict, Iterable, List
import evaluate
from datasets import Features, Value
from .artifact import __file__ as _
from .blocks import __file__ as _
from .card import __file__ as _
from .catalog import __file__ as _
from .collections import __file__ as _
from .dataclass import __file__ as _
from .dict_utils import __file__ as _
from .file_utils import __file__ as _
from .formats import __file__ as _
from .fusion import __file__ as _
from .generator_utils import __file__ as _
from .hf_utils import __file__ as _
from .instructions import __file__ as _
from .load import __file__ as _
from .loaders import __file__ as _
from .metrics import __file__ as _
from .normalizers import __file__ as _
from .operator import (MultiStreamOperator, SequentialOperator,
SequentialOperatorInitilizer, StreamInitializerOperator)
from .operator import __file__ as _
from .operators import (Apply, ApplyMetric, ApplyOperatorsField,
ApplyStreamOperatorsField, FlattenInstances,
MergeStreams, SplitByValue)
from .operators import __file__ as _
from .processors import __file__ as _
from .random_utils import __file__ as _
from .recipe import __file__ as _
from .register import __file__ as _
from .register import _reset_env_local_catalogs, register_all_artifacts
from .renderers import __file__ as _
from .schema import UNITXT_DATASET_SCHEMA
from .schema import __file__ as _
from .split_utils import __file__ as _
from .splitters import __file__ as _
from .standard import __file__ as _
from .stream import MultiStream, Stream
from .stream import __file__ as _
from .task import __file__ as _
from .templates import __file__ as _
from .text_utils import __file__ as _
from .type_utils import __file__ as _
from .utils import __file__ as _
from .validate import __file__ as _
from .version import __file__ as _
class MultiStreamScoreMean(MultiStreamOperator):
def aggegate_results(self, multi_stream: MultiStream):
scores = []
for stream in multi_stream.values():
instance = stream.peak()
scores.append(instance["score"]["global"]["score"])
from statistics import mean
return mean(scores)
def spread_results(self, stream: Stream, score: float):
for instance in stream:
instance["score"]["global"]["groups_mean_score"] = score
yield instance
def spread_results_one_stream(self, stream: Stream):
for instance in stream:
instance["score"]["global"]["groups_mean_score"] = instance["score"][
"global"
]["score"]
yield instance
def process(self, multi_stream: MultiStream) -> MultiStream:
result = {}
# optimization in to avoid double calculation of metrics
# when aggregating results, if there is only one stream.
if len(multi_stream) == 1:
for stream_name, stream in multi_stream.items():
result[stream_name] = Stream(
self.spread_results_one_stream, gen_kwargs={"stream": stream}
)
return MultiStream(result)
mean_score = self.aggegate_results(multi_stream)
result = {}
for stream_name, stream in multi_stream.items():
result[stream_name] = Stream(
self.spread_results, gen_kwargs={"stream": stream, "score": mean_score}
)
return MultiStream(result)
class FromPredictionsAndOriginalData(StreamInitializerOperator):
def zip(self, predictions, references):
for prediction, original in zip(predictions, references):
yield {**original, "prediction": prediction}
def process(
self, predictions: List[str], references: Iterable, split_name: str = "all"
) -> MultiStream:
return MultiStream(
{
split_name: Stream(
self.zip,
gen_kwargs={"predictions": predictions, "references": references},
)
}
)
# The additional_inputs field in the schema is defined as
# Sequence({"key": Value(dtype="string"), "value": Value("string")})
# When receiving instances from this scheme, the keys and values are returned as two separate
# lists, and are converted to a dictionary.
def _from_key_value_pairs(key_value_list: Dict[str, list]) -> Dict[str, str]:
return dict(zip(key_value_list["key"], key_value_list["value"]))
class MetricRecipe(SequentialOperatorInitilizer):
calc_confidence_intervals: bool = True
def prepare(self):
register_all_artifacts()
self.steps = [
FromPredictionsAndOriginalData(),
Apply(
"additional_inputs",
function=_from_key_value_pairs,
to_field="additional_inputs",
),
ApplyOperatorsField(
inputs_fields=["prediction", "references"],
fields_to_treat_as_list=["references"],
operators_field="postprocessors",
default_operators=["processors.to_string_stripped"],
),
SplitByValue(["group"]),
ApplyMetric(
"metrics",
calc_confidence_intervals=self.calc_confidence_intervals,
),
MultiStreamScoreMean(),
MergeStreams(),
]
UNITXT_METRIC_SCHEMA = Features(
{"predictions": Value("string"), "references": dict(UNITXT_DATASET_SCHEMA)}
)
def _compute(
predictions: List[str],
references: Iterable,
flatten: bool = False,
split_name: str = "all",
calc_confidence_intervals: bool = True,
):
_reset_env_local_catalogs()
register_all_artifacts()
recipe = MetricRecipe(calc_confidence_intervals=calc_confidence_intervals)
multi_stream = recipe(
predictions=predictions, references=references, split_name=split_name
)
if flatten:
operator = FlattenInstances()
multi_stream = operator(multi_stream)
stream = multi_stream[split_name]
return list(stream)
# TODO: currently we have two classes with this name. metric.Metric and matrics.Metric...
# @evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION)
class Metric(evaluate.Metric):
calc_confidence_intervals: bool = True
def _info(self):
return evaluate.MetricInfo(
description="_DESCRIPTION",
citation="_CITATION",
# inputs_description=_KWARGS_DESCRIPTION,
features=UNITXT_METRIC_SCHEMA,
codebase_urls=["https://"],
reference_urls=[
"https://",
"https://",
],
)
def _compute(
self,
predictions: List[str],
references: Iterable,
flatten: bool = False,
split_name: str = "all",
):
try:
from unitxt.dataset import \
get_dataset_artifact as get_dataset_artifact_installed
unitxt_installed = True
except ImportError:
unitxt_installed = False
if unitxt_installed:
from unitxt.metric import _compute as _compute_installed
return _compute_installed(
predictions=predictions,
references=references,
flatten=flatten,
split_name=split_name,
calc_confidence_intervals=self.calc_confidence_intervals,
)
return _compute(
predictions=predictions,
references=references,
flatten=flatten,
split_name=split_name,
calc_confidence_intervals=self.calc_confidence_intervals,
)