|
import argparse |
|
import json |
|
import scipy |
|
import numpy as np |
|
import sklearn |
|
import nltk |
|
from nltk import word_tokenize |
|
|
|
|
|
def pairwise_meteor(candidate, reference): |
|
return nltk.translate.meteor_score.single_meteor_score( |
|
word_tokenize(reference), word_tokenize(candidate) |
|
) |
|
|
|
|
|
def compute_all_pairwise_scores(src_data, tgt_data, metric): |
|
scores = np.empty((len(src_data), len(tgt_data))) |
|
|
|
for i, src in enumerate(src_data): |
|
for j, tgt in enumerate(tgt_data): |
|
scores[i][j] = metric(src, tgt) |
|
|
|
return scores |
|
|
|
|
|
def print_with_space(left, right, left_space=45): |
|
print_spaces = " " * (left_space - len(left)) |
|
print(left + print_spaces + right) |
|
|
|
|
|
class AVeriTeCEvaluator: |
|
|
|
verdicts = [ |
|
"Supported", |
|
"Refuted", |
|
"Not Enough Evidence", |
|
"Conflicting Evidence/Cherrypicking", |
|
] |
|
pairwise_metric = None |
|
max_questions = 10 |
|
metric = None |
|
averitec_reporting_levels = [0.1, 0.2, 0.25, 0.3, 0.4, 0.5] |
|
|
|
def __init__(self, metric="meteor"): |
|
self.metric = metric |
|
if metric == "meteor": |
|
self.pairwise_metric = pairwise_meteor |
|
|
|
def evaluate_averitec_veracity_by_type(self, srcs, tgts, threshold=0.25): |
|
types = {} |
|
for src, tgt in zip(srcs, tgts): |
|
score = self.compute_pairwise_evidence_score(src, tgt) |
|
|
|
if score <= threshold: |
|
score = 0 |
|
|
|
for t in tgt["claim_types"]: |
|
if t not in types: |
|
types[t] = [] |
|
|
|
types[t].append(score) |
|
|
|
return {t: np.mean(v) for t, v in types.items()} |
|
|
|
def evaluate_averitec_score(self, srcs, tgts): |
|
scores = [] |
|
for src, tgt in zip(srcs, tgts): |
|
score = self.compute_pairwise_evidence_score(src, tgt) |
|
|
|
this_example_scores = [0.0 for _ in self.averitec_reporting_levels] |
|
for i, level in enumerate(self.averitec_reporting_levels): |
|
if score > level: |
|
this_example_scores[i] = src["pred_label"] == tgt["label"] |
|
|
|
scores.append(this_example_scores) |
|
|
|
return np.mean(np.array(scores), axis=0) |
|
|
|
def evaluate_veracity(self, src, tgt): |
|
src_labels = [x["pred_label"] for x in src] |
|
tgt_labels = [x["label"] for x in tgt] |
|
|
|
acc = np.mean([s == t for s, t in zip(src_labels, tgt_labels)]) |
|
|
|
f1 = { |
|
self.verdicts[i]: x |
|
for i, x in enumerate( |
|
sklearn.metrics.f1_score( |
|
tgt_labels, src_labels, labels=self.verdicts, average=None |
|
) |
|
) |
|
} |
|
f1["macro"] = sklearn.metrics.f1_score( |
|
tgt_labels, src_labels, labels=self.verdicts, average="macro" |
|
) |
|
f1["acc"] = acc |
|
return f1 |
|
|
|
def evaluate_questions_only(self, srcs, tgts): |
|
all_utils = [] |
|
for src, tgt in zip(srcs, tgts): |
|
if "evidence" not in src: |
|
|
|
src_questions = self.extract_full_comparison_strings( |
|
src, is_target=False |
|
)[: self.max_questions] |
|
else: |
|
src_questions = [ |
|
qa["question"] for qa in src["evidence"][: self.max_questions] |
|
] |
|
tgt_questions = [qa["question"] for qa in tgt["questions"]] |
|
|
|
pairwise_scores = compute_all_pairwise_scores( |
|
src_questions, tgt_questions, self.pairwise_metric |
|
) |
|
|
|
assignment = scipy.optimize.linear_sum_assignment( |
|
pairwise_scores, maximize=True |
|
) |
|
|
|
assignment_utility = pairwise_scores[assignment[0], assignment[1]].sum() |
|
|
|
|
|
reweight_term = 1 / float(len(tgt_questions)) |
|
assignment_utility *= reweight_term |
|
|
|
all_utils.append(assignment_utility) |
|
|
|
return np.mean(all_utils) |
|
|
|
def get_n_best_qau(self, srcs, tgts, n=3): |
|
all_utils = [] |
|
for src, tgt in zip(srcs, tgts): |
|
assignment_utility = self.compute_pairwise_evidence_score(src, tgt) |
|
|
|
all_utils.append(assignment_utility) |
|
|
|
idxs = np.argsort(all_utils)[::-1][:n] |
|
|
|
examples = [ |
|
( |
|
( |
|
srcs[i]["questions"] |
|
if "questions" in srcs[i] |
|
else srcs[i]["string_evidence"] |
|
), |
|
tgts[i]["questions"], |
|
all_utils[i], |
|
) |
|
for i in idxs |
|
] |
|
|
|
return examples |
|
|
|
def compute_pairwise_evidence_score(self, src, tgt): |
|
"""Different key is used for reference_data and prediction. |
|
For the prediction, the format is |
|
{"evidence": [ |
|
{ |
|
"question": "What does the increased federal medical assistance percentage mean for you?", |
|
"answer": "Appendix A: Applicability of the Increased Federal Medical Assistance Percentage ", |
|
"url": "https://www.medicaid.gov/federal-policy-guidance/downloads/smd21003.pdf" |
|
}], |
|
"pred_label": "Supported"} |
|
|
|
And for the data with fold label: |
|
{"questions": [ |
|
{ |
|
"question": "Where was the claim first published", |
|
"answers": [ |
|
{ |
|
"answer": "It was first published on Sccopertino", |
|
"answer_type": "Abstractive", |
|
"source_url": "https://web.archive.org/web/20201129141238/https://scoopertino.com/exposed-the-imac-disaster-that-almost-was/", |
|
"source_medium": "Web text", |
|
"cached_source_url": "https://web.archive.org/web/20201129141238/https://scoopertino.com/exposed-the-imac-disaster-that-almost-was/" |
|
} |
|
] |
|
}] |
|
"label": "Refuted"} |
|
""" |
|
|
|
src_strings = self.extract_full_comparison_strings(src, is_target=False)[ |
|
: self.max_questions |
|
] |
|
tgt_strings = self.extract_full_comparison_strings(tgt) |
|
pairwise_scores = compute_all_pairwise_scores( |
|
src_strings, tgt_strings, self.pairwise_metric |
|
) |
|
assignment = scipy.optimize.linear_sum_assignment( |
|
pairwise_scores, maximize=True |
|
) |
|
assignment_utility = pairwise_scores[assignment[0], assignment[1]].sum() |
|
|
|
|
|
reweight_term = 1 / float(len(tgt_strings)) |
|
assignment_utility *= reweight_term |
|
return assignment_utility |
|
|
|
def evaluate_questions_and_answers(self, srcs, tgts): |
|
all_utils = [] |
|
for src, tgt in zip(srcs, tgts): |
|
src_strings = self.extract_full_comparison_strings(src, is_target=False)[ |
|
: self.max_questions |
|
] |
|
tgt_strings = self.extract_full_comparison_strings(tgt) |
|
|
|
pairwise_scores = compute_all_pairwise_scores( |
|
src_strings, tgt_strings, self.pairwise_metric |
|
) |
|
|
|
assignment = scipy.optimize.linear_sum_assignment( |
|
pairwise_scores, maximize=True |
|
) |
|
|
|
assignment_utility = pairwise_scores[assignment[0], assignment[1]].sum() |
|
|
|
|
|
reweight_term = 1 / float(len(tgt_strings)) |
|
assignment_utility *= reweight_term |
|
|
|
all_utils.append(assignment_utility) |
|
|
|
return np.mean(all_utils) |
|
|
|
def extract_full_comparison_strings(self, example, is_target=True): |
|
example_strings = [] |
|
|
|
if is_target: |
|
if "questions" in example: |
|
for evidence in example["questions"]: |
|
|
|
if not isinstance(evidence["answers"], list): |
|
evidence["answers"] = [evidence["answers"]] |
|
|
|
for answer in evidence["answers"]: |
|
example_strings.append( |
|
evidence["question"] + " " + answer["answer"] |
|
) |
|
if ( |
|
"answer_type" in answer |
|
and answer["answer_type"] == "Boolean" |
|
): |
|
example_strings[-1] += ". " + answer["boolean_explanation"] |
|
if len(evidence["answers"]) == 0: |
|
example_strings.append( |
|
evidence["question"] + " No answer could be found." |
|
) |
|
else: |
|
if "evidence" in example: |
|
for evidence in example["evidence"]: |
|
example_strings.append( |
|
evidence["question"] + " " + evidence["answer"] |
|
) |
|
|
|
if "string_evidence" in example: |
|
for full_string_evidence in example["string_evidence"]: |
|
example_strings.append(full_string_evidence) |
|
return example_strings |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser(description="Evaluate the veracity prediction.") |
|
parser.add_argument( |
|
"-i", |
|
"--prediction_file", |
|
default="data_store/dev_veracity.json", |
|
help="Json file with claim, evidence, and veracity prediction.", |
|
) |
|
parser.add_argument( |
|
"--label_file", |
|
default="data/dev.json", |
|
help="Json file with labels.", |
|
) |
|
args = parser.parse_args() |
|
|
|
with open(args.prediction_file) as f: |
|
predictions = json.load(f) |
|
|
|
with open(args.label_file) as f: |
|
references = json.load(f) |
|
|
|
scorer = AVeriTeCEvaluator() |
|
q_score = scorer.evaluate_questions_only(predictions, references) |
|
print_with_space("Question-only score (HU-" + scorer.metric + "):", str(q_score)) |
|
p_score = scorer.evaluate_questions_and_answers(predictions, references) |
|
print_with_space("Question-answer score (HU-" + scorer.metric + "):", str(p_score)) |
|
print("====================") |
|
|
|
v_score = scorer.evaluate_veracity(predictions, references) |
|
print("Veracity F1 scores:") |
|
for k, v in v_score.items(): |
|
print_with_space(" * " + k + ":", str(v)) |
|
|
|
print("--------------------") |
|
print("AVeriTeC scores:") |
|
|
|
v_score = scorer.evaluate_averitec_score(predictions, references) |
|
|
|
for i, level in enumerate(scorer.averitec_reporting_levels): |
|
print_with_space( |
|
" * Veracity scores (" + scorer.metric + " @ " + str(level) + "):", |
|
str(v_score[i]), |
|
) |
|
print("--------------------") |
|
print("AVeriTeC scores by type @ 0.25:") |
|
type_scores = scorer.evaluate_averitec_veracity_by_type( |
|
predictions, references, threshold=0.25 |
|
) |
|
for t, v in type_scores.items(): |
|
print_with_space(" * Veracity scores (" + t + "):", str(v)) |
|
|