File size: 4,244 Bytes
d423f18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
from .stream import Stream
from .operator import SingleStreamOperator, StreamInstanceOperator
from dataclasses import dataclass, field
from abc import abstractmethod, ABC
from typing import List, Dict, Any
def absrtact_factory():
return {}
def abstract_field():
return field(default_factory=absrtact_factory)
class UpdateStream(StreamInstanceOperator):
update: dict
def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
instance.update(self.update)
return instance
class Metric(ABC):
@property
@abstractmethod
def main_score(self):
pass
class GlobalMetric(SingleStreamOperator, Metric):
def process(self, stream: Stream):
references = []
predictions = []
global_score = {}
instances = []
for instance in stream:
if "score" not in instance:
instance["score"] = {"global": global_score, "instance": {}}
else:
global_score = instance["score"]["global"]
refs, pred = instance["references"], instance["prediction"]
instance_score = self._compute([refs], [pred])
instance["score"]["instance"].update(instance_score)
references.append(refs)
predictions.append(pred)
instances.append(instance)
result = self._compute(references, predictions)
global_score.update(result)
for instance in instances:
instance["score"]["global"] = global_score
yield instance
def _compute(self, references: List[List[str]], predictions: List[str]) -> dict:
result = self.compute(references, predictions)
result["score"] = result[self.main_score]
return result
@abstractmethod
def compute(self, references: List[List[str]], predictions: List[str]) -> dict:
pass
class InstanceMetric(SingleStreamOperator, Metric):
implemented_reductions: List[str] = field(default_factory=lambda: ["mean"])
@property
@abstractmethod
def reduction_map(self) -> dict:
pass
def process(self, stream: Stream):
global_score = {}
instances = []
for instance in stream:
refs, pred = instance["references"], instance["prediction"]
instance_score = self._compute(refs, pred)
if "score" not in instance:
instance["score"] = {"global": global_score, "instance": {}}
else:
global_score = instance["score"]["global"]
instance["score"]["instance"].update(instance_score)
instances.append(instance)
for reduction, fields in self.reduction_map.items():
assert (
reduction in self.implemented_reductions
), f"Reduction {reduction} is not implemented, use one of {self.implemented_reductions}"
if reduction == "mean":
from statistics import mean
for field in fields:
global_score[field] = mean([instance["score"]["instance"][field] for instance in instances])
if field == self.main_score:
global_score["score"] = global_score[field]
for instance in instances:
yield instance
def _compute(self, references: List[List[str]], predictions: List[str]) -> dict:
result = self.compute(references, predictions)
result["score"] = result[self.main_score]
return result
@abstractmethod
def compute(self, references: List[str], prediction: str) -> dict:
pass
class SingleReferenceInstanceMetric(InstanceMetric):
def _compute(self, references: List[str], prediction: str) -> dict:
result = self.compute(references[0], prediction)
result["score"] = result[self.main_score]
return result
@abstractmethod
def compute(self, reference, prediction: str) -> dict:
pass
class Accuracy(SingleReferenceInstanceMetric):
reduction_map = {"mean": ["accuracy"]}
main_score = "accuracy"
def compute(self, reference, prediction: str) -> dict:
return {"accuracy": float(str(reference) == str(prediction))}
|