File size: 6,995 Bytes
2f044c1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
from typing import Dict, List
from lightning.pytorch.callbacks import Callback
from relik.reader.data.relik_reader_sample import RelikReaderSample
from relik.reader.relik_reader_predictor import RelikReaderPredictor
from relik.reader.utils.metrics import f1_measure, safe_divide
from relik.reader.utils.special_symbols import NME_SYMBOL
class StrongMatching:
def __call__(self, predicted_samples: List[RelikReaderSample]) -> Dict:
# accumulators
correct_predictions = 0
correct_predicted_entities = 0
correct_predictions_at_k = 0
total_predictions = 0
total_gold = 0
total_entities_predictions = 0
total_entities_gold = 0
correct_span_predictions = 0
miss_due_to_candidates = 0
# prediction index stats
avg_correct_predicted_index = []
avg_wrong_predicted_index = []
less_index_predictions = []
# collect data from samples
for sample in predicted_samples:
predicted_annotations = sample.predicted_window_labels_chars
predicted_annotations_probabilities = sample.probs_window_labels_chars
gold_annotations = {
(ss, se, entity)
for ss, se, entity in sample.window_labels
if entity != NME_SYMBOL
}
total_predictions += len(predicted_annotations)
total_gold += len(gold_annotations)
gold_entities = {
entity
for ss, se, entity in sample.window_labels
if entity != NME_SYMBOL
}
total_entities_gold += len(gold_entities)
pred_entities = {
entity
for ss, se, entity in predicted_annotations
if entity != NME_SYMBOL
}
total_entities_predictions += len(pred_entities)
# correct named entity detection
predicted_spans = {(s, e) for s, e, _ in predicted_annotations}
gold_spans = {(s, e) for s, e, _ in gold_annotations}
correct_span_predictions += len(predicted_spans.intersection(gold_spans))
# correct entity linking
correct_predictions += len(
predicted_annotations.intersection(gold_annotations)
)
for ss, se, ge in gold_annotations.difference(predicted_annotations):
if ge not in sample.window_candidates:
miss_due_to_candidates += 1
if ge in predicted_annotations_probabilities.get((ss, se), set()):
correct_predictions_at_k += 1
# correct entity disambiguation
correct_predicted_entities += len(
pred_entities.intersection(gold_entities)
)
# indices metrics
predicted_spans_index = {
(ss, se): ent for ss, se, ent in predicted_annotations
}
gold_spans_index = {(ss, se): ent for ss, se, ent in gold_annotations}
for pred_span, pred_ent in predicted_spans_index.items():
gold_ent = gold_spans_index.get(pred_span)
if pred_span not in gold_spans_index:
continue
# missing candidate
if gold_ent not in sample.window_candidates:
continue
gold_idx = sample.window_candidates.index(gold_ent)
if gold_idx is None:
continue
pred_idx = sample.window_candidates.index(pred_ent)
if gold_ent != pred_ent:
avg_wrong_predicted_index.append(pred_idx)
if gold_idx is not None:
if pred_idx > gold_idx:
less_index_predictions.append(0)
else:
less_index_predictions.append(1)
else:
avg_correct_predicted_index.append(pred_idx)
# compute NED metrics
span_precision = safe_divide(correct_span_predictions, total_predictions)
span_recall = safe_divide(correct_span_predictions, total_gold)
span_f1 = f1_measure(span_precision, span_recall)
# compute EL metrics
precision = safe_divide(correct_predictions, total_predictions)
recall = safe_divide(correct_predictions, total_gold)
recall_at_k = safe_divide(
(correct_predictions + correct_predictions_at_k), total_gold
)
f1 = f1_measure(precision, recall)
# comput ED metrics
precision_entities = safe_divide(correct_predicted_entities, total_entities_predictions)
recall_entities = safe_divide(correct_predicted_entities, total_entities_gold)
span_entities_f1 = f1_measure(precision_entities, recall_entities)
wrong_for_candidates = safe_divide(miss_due_to_candidates, total_gold)
out_dict = {
"span_precision": span_precision,
"span_recall": span_recall,
"span_f1": span_f1,
"entities_precision": precision_entities,
"entities_recall": recall_entities,
"entities_f1": span_entities_f1,
"core_precision": precision,
"core_recall": recall,
"core_recall-at-k": recall_at_k,
"core_f1": round(f1, 4),
"wrong-for-candidates": wrong_for_candidates,
"index_errors_avg-index": safe_divide(
sum(avg_wrong_predicted_index), len(avg_wrong_predicted_index)
),
"index_correct_avg-index": safe_divide(
sum(avg_correct_predicted_index), len(avg_correct_predicted_index)
),
"index_avg-index": safe_divide(
sum(avg_correct_predicted_index + avg_wrong_predicted_index),
len(avg_correct_predicted_index + avg_wrong_predicted_index),
),
"index_percentage-favoured-smaller-idx": safe_divide(
sum(less_index_predictions), len(less_index_predictions)
),
}
return {k: round(v, 5) for k, v in out_dict.items()}
class ELStrongMatchingCallback(Callback):
def __init__(self, dataset_path: str, dataset_conf) -> None:
super().__init__()
self.dataset_path = dataset_path
self.dataset_conf = dataset_conf
self.strong_matching_metric = StrongMatching()
def on_validation_epoch_start(self, trainer, pl_module) -> None:
relik_reader_predictor = RelikReaderPredictor(pl_module.relik_reader_core_model)
predicted_samples = relik_reader_predictor.predict(
self.dataset_path,
samples=None,
dataset_conf=self.dataset_conf,
)
predicted_samples = list(predicted_samples)
for k, v in self.strong_matching_metric(predicted_samples).items():
pl_module.log(f"val_{k}", v)
|