File size: 2,806 Bytes
e8292e5
 
d730802
 
 
e8292e5
 
d730802
 
 
 
e8292e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d730802
e8292e5
 
 
 
 
113addb
e8292e5
 
 
 
 
113addb
e8292e5
d730802
e8292e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40dfd73
e8292e5
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from functools import singledispatch
from typing import List, Optional

import pandas as pd

from .artifact import verbosed_fetch_artifact
from .metric_utils import get_remote_metrics_endpoint, get_remote_metrics_names
from .operator import SequentialOperator
from .stream import MultiStream


@singledispatch
def evaluate(
    dataset, metric_names: List[str], compute_conf_intervals: Optional[bool] = False
):
    """Placeholder for overloading the function, supporting both dataframe input and list input."""
    pass


@evaluate.register
def _(
    dataset: list,
    metric_names: List[str],
    compute_conf_intervals: Optional[bool] = False,
):
    global_scores = {}
    remote_metrics = get_remote_metrics_names()
    for metric_name in metric_names:
        multi_stream = MultiStream.from_iterables({"test": dataset}, copying=True)
        if metric_name in remote_metrics:
            metric = verbosed_fetch_artifact(metric_name)
            metric_step = as_remote_metric(metric)
        else:
            # The SequentialOperator below will handle the load of the metric from its name
            metric_step = metric_name
        metrics_operator = SequentialOperator(steps=[metric_step])

        if not compute_conf_intervals:
            first_step = metrics_operator.steps[0]
            first_step.disable_confidence_interval_calculation()

        instances = list(metrics_operator(multi_stream)["test"])
        for entry, instance in zip(dataset, instances):
            entry[metric_name] = instance["score"]["instance"]["score"]

        if len(instances) > 0:
            global_scores[metric_name] = instances[0]["score"].get("global", {})

    return dataset, global_scores


@evaluate.register
def _(
    dataset: pd.DataFrame,
    metric_names: List[str],
    compute_conf_intervals: Optional[bool] = False,
):
    results, global_scores = evaluate(
        dataset.to_dict("records"),
        metric_names=metric_names,
        compute_conf_intervals=compute_conf_intervals,
    )
    return pd.DataFrame(results), pd.DataFrame(global_scores)


def as_remote_metric(metric):
    """Wrap a metric with a RemoteMetric.

    Currently supported is wrapping the inner metric within a MetricPipeline.
    """
    from .metrics import MetricPipeline, RemoteMetric

    remote_metrics_endpoint = get_remote_metrics_endpoint()
    if isinstance(metric, MetricPipeline):
        metric = RemoteMetric.wrap_inner_metric_pipeline_metric(
            metric_pipeline=metric,
            remote_metrics_endpoint=remote_metrics_endpoint,
        )
    else:
        raise ValueError(
            f"Unexpected remote metric type {type(metric)} for the metric named '{metric.__id__}'. "
            f"Remotely executed metrics should be MetricPipeline objects."
        )
    return metric