Spaces:
Running
Running
import json | |
import re | |
from collections import defaultdict | |
import evaluate | |
# import nltk | |
import numpy as np | |
from nervaluate import Evaluator | |
from rouge_score import rouge_scorer | |
from sacrebleu.metrics import BLEU, CHRF | |
from sklearn.metrics import f1_score | |
from tqdm import tqdm | |
from transformers import AutoTokenizer | |
from ner_helpers import span2bio | |
def load_json(file_path): | |
with open(file_path, "r") as f: | |
return json.load(f) | |
def get_micro_at_k(gold, pred, k): | |
gold_set = set(gold) | |
pred_set = set(pred[:k]) | |
return len(gold_set & pred_set), len(gold_set), len(pred_set) | |
def evaluate_bail(gold_data, pred_data): | |
gold_labels = [] | |
pred_labels = [] | |
for id, label in gold_data.items(): | |
gold_labels.append(label) | |
pred_labels.append(pred_data.get(id, 0)) | |
f1 = f1_score(gold_labels, pred_labels, average="macro") | |
print("Macro-F1 on HLDC-all-districts test set:", f1) | |
return f"{f1:.2f}" | |
def evaluate_cjpe(gold_data, pred_data): | |
# Evaluate prediction | |
gold_labels = [] | |
pred_labels = [] | |
for id, label in gold_data["prediction"].items(): | |
gold_labels.append(label) | |
pred_labels.append(pred_data["prediction"].get(id, 0)) | |
f1 = f1_score(gold_labels, pred_labels, average="macro") | |
prediction_result = {"cjpe-eval": f1} | |
# Evaluate explanation | |
rouge = evaluate.load("rouge") | |
bleu = evaluate.load("bleu") | |
gold_explanations = [exp["expert_1"] for exp in gold_data["explanation"].values()] | |
pred_explanations = [exp["expert_1"] for exp in pred_data["explanation"].values()] | |
rouge_scores = rouge.compute( | |
predictions=pred_explanations, references=gold_explanations | |
) | |
bleu_score = bleu.compute( | |
predictions=pred_explanations, references=gold_explanations | |
) | |
explanation_result = { | |
"cjpe-exp-eval": { | |
"rouge": [rouge_scores], | |
"bleu": [bleu_score], | |
} | |
} | |
return {**prediction_result, **explanation_result} | |
def evaluate_lner(gold_data, pred_data, text_data): | |
with open("labels.txt") as f: | |
labels = f.read().strip().split("\n") | |
results_per_fold = {} | |
for fold in range(1, 4): | |
gold = gold_data[f"fold_{fold}"] | |
pred = pred_data[f"fold_{fold}"] | |
text = text_data[f"fold_{fold}"] | |
texts, gold_labels, pred_labels = [], [], [] | |
for id, gold_label in tqdm(gold.items()): | |
txt = text[id] | |
pred_label = pred.get(id, []) | |
txt_seg, gold_bio = span2bio(txt, gold_label) | |
_, pred_bio = span2bio(txt, pred_label) | |
texts.append(txt_seg) | |
gold_labels.append(gold_bio) | |
pred_labels.append(pred_bio) | |
evaluator = Evaluator(gold_labels, pred_labels, tags=labels, loader="list") | |
results, results_per_tag, _, _ = evaluator.evaluate() | |
f1_scores = [results_per_tag[l]["strict"]["f1"] for l in results_per_tag] | |
avg_f1 = sum(f1_scores) / len(f1_scores) | |
print(f"Strict Macro-F1 on Fold {fold}:", avg_f1) | |
results_per_fold[f"fold_{fold}"] = avg_f1 | |
return {"strict mF1": f"{np.mean(list(results_per_fold.values()))}:.2f"} | |
def evaluate_rr(gold_data, pred_data): | |
all_gold_labels = [] | |
all_pred_labels = [] | |
for id, gold_labels in gold_data.items(): | |
pred_labels = pred_data.get(id, ["None"] * len(gold_labels)) | |
all_gold_labels.extend(gold_labels) | |
all_pred_labels.extend(pred_labels) | |
mf1 = f1_score(all_gold_labels, all_pred_labels, average="macro") | |
print(f"Macro-F1 on combined test set:", mf1) | |
return {"mF1": f"{mf1:.2f}"} | |
def evaluate_lsi(gold_data, pred_data): | |
with open("lsi_label_vocab.json") as f: | |
label_vocab = json.load(f) | |
gold_matrix = np.zeros((len(gold_data), len(label_vocab))) | |
pred_matrix = np.zeros((len(gold_data), len(label_vocab))) | |
for i, (id, gold_labels) in enumerate(gold_data.items()): | |
pred_labels = pred_data.get(id, []) | |
for label in gold_labels: | |
if label in label_vocab: | |
gold_matrix[i, label_vocab[label]] = 1 | |
for label in pred_labels: | |
if label in label_vocab: | |
pred_matrix[i, label_vocab[label]] = 1 | |
f1 = f1_score(gold_matrix, pred_matrix, average="macro") | |
print("Macro-F1 on ILSI test set:", f1) | |
return f1 | |
def evaluate_pcr(gold_data, pred_data): | |
f1_scores = [] | |
for k in range(1, 21): | |
correct, gold_total, pred_total = 0, 0, 0 | |
for id, gold_candidates in gold_data.items(): | |
pred_candidates = pred_data.get(id, []) | |
gold_candidates = [c for c in gold_candidates if c != id] | |
pred_candidates = [c for c in pred_candidates if c != id] | |
c, g, p = get_micro_at_k(gold_candidates, pred_candidates, k) | |
correct += c | |
gold_total += g | |
pred_total += p | |
precision = correct / pred_total if pred_total > 0 else 0 | |
recall = correct / gold_total if gold_total > 0 else 0 | |
f1 = ( | |
2 * precision * recall / (precision + recall) | |
if precision + recall > 0 | |
else 0 | |
) | |
f1_scores.append(f1) | |
print(f"Micro-F1@{k} on IL-PCR test set:", f1) | |
return np.mean(f1_scores) | |
def evaluate_summ(gold_data, pred_data): | |
gold_summaries = [] | |
pred_summaries = [] | |
for id, gold_summary in gold_data.items(): | |
if id in pred_data: | |
gold_summary = re.sub(r"\s+", " ", gold_summary.replace("\n", " ")).strip() | |
pred_summary = re.sub(r"\s+", " ", pred_data[id].replace("\n", " ")).strip() | |
gold_summaries.append(gold_summary) | |
pred_summaries.append(pred_summary) | |
rouge = evaluate.load("rouge") | |
rouge_scores = rouge.compute(predictions=pred_summaries, references=gold_summaries) | |
print("Rouge-L:", rouge_scores) | |
return {"ROUGE-L": rouge_scores, "BERTSCORE": "-"} | |
def evaluate_lmt(gold_data, pred_data): | |
tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert") | |
bleu = BLEU() | |
chrfp = CHRF(word_order=2) | |
gleu = evaluate.load("google_bleu") | |
G = defaultdict(lambda: defaultdict(list)) | |
P = defaultdict(lambda: defaultdict(list)) | |
for dataset in gold_data: | |
for id, gold_text in gold_data[dataset].items(): | |
lang = id.split("/")[1].strip() | |
gold_tokens = " ".join(tokenizer.tokenize(gold_text)) | |
pred_tokens = " ".join(tokenizer.tokenize(pred_data[dataset][id])) | |
G[dataset][lang].append(gold_tokens) | |
P[dataset][lang].append(pred_tokens) | |
bleu_scores, chrfpp_scores, gleu_scores = [], [], [] | |
for dataset in G: | |
print("Dataset", dataset) | |
dataset_bleu, dataset_chrfpp, dataset_gleu = [], [], [] | |
for lang in G[dataset]: | |
gold = G[dataset][lang] | |
pred = P[dataset][lang] | |
bleu_score = bleu.corpus_score(pred, [gold]).score | |
chrfpp_score = chrfp.corpus_score(pred, [gold]).score | |
gleu_score = gleu.compute(predictions=pred, references=gold)["google_bleu"] | |
dataset_bleu.append(bleu_score) | |
dataset_chrfpp.append(chrfpp_score) | |
dataset_gleu.append(gleu_score) | |
bleu_scores.append(sum(dataset_bleu) / len(dataset_bleu)) | |
chrfpp_scores.append(sum(dataset_chrfpp) / len(dataset_chrfpp)) | |
gleu_scores.append(sum(dataset_gleu) / len(dataset_gleu)) | |
return { | |
"BLEU": sum(bleu_scores) / len(bleu_scores), | |
"GLEU": sum(gleu_scores) / len(gleu_scores), | |
"chrF++": sum(chrfpp_scores) / len(chrfpp_scores), | |
} | |
def create_output_json(evaluation_results): | |
output = { | |
"Method": "GPT-5 (2-shot)", | |
"Submitted By": "IL-TUR", | |
"Github Link": "dummy submission", | |
"L-NER": {"strict mF1": evaluation_results["lner"]["strict mF1"]}, | |
"RR": {"mF1": evaluation_results["rr"]["mF1"]}, | |
"CJPE": { | |
"mF1": evaluation_results["cjpe"]["mF1"], | |
"ROUGE-L": evaluation_results["cjpe"]["ROUGE-L"], | |
"BLEU": evaluation_results["cjpe"]["BLEU"], | |
}, | |
"BAIL": {"mF1": evaluation_results["bail"]}, | |
"LSI": {"mF1": evaluation_results["lsi"]}, | |
"PCR": {"muF1@K": evaluation_results["pcr"]}, | |
"SUMM": { | |
"ROUGE-L": evaluation_results["summ"]["ROUGE-L"], | |
"BERTSCORE": "-", # Placeholder BERTSCORE | |
}, | |
"L-MT": { | |
"BLEU": evaluation_results["lmt"]["BLEU"], | |
"GLEU": evaluation_results["lmt"]["GLEU"], | |
"chrF++": evaluation_results["lmt"]["chrF++"], | |
}, | |
} | |
return [output] # Wrap in a list to match the desired format | |
def main(): | |
# gold_data = load_json("IL_TUR_eval_gold.json") | |
# pred_data = load_json("IL_TUR_eval_submission2.json") | |
gold_data = load_json("submissions/baseline/IL_TUR_eval_gold_small.json") | |
pred_data = load_json("submissions/baseline/IL_TUR_eval_submission_small.json") | |
pred_data = gold_data | |
evaluation_results = {} | |
for task in pred_data.keys(): | |
print(f"Task: {task}") | |
if task == "bail": | |
evaluation_results[task] = evaluate_bail(gold_data[task], pred_data[task]) | |
elif task == "cjpe": | |
evaluation_results.update(evaluate_cjpe(gold_data[task], pred_data[task])) | |
elif task == "lner": | |
text_data = load_json("lner-text.json") | |
evaluation_results[task] = evaluate_lner( | |
gold_data[task], pred_data[task], text_data | |
) | |
elif task == "rr": | |
evaluation_results[task] = evaluate_rr(gold_data[task], pred_data[task]) | |
elif task == "lsi": | |
evaluation_results[task] = evaluate_lsi(gold_data[task], pred_data[task]) | |
elif task == "pcr": | |
evaluation_results[task] = evaluate_pcr(gold_data[task], pred_data[task]) | |
elif task == "summ": | |
evaluation_results[task] = evaluate_summ(gold_data[task], pred_data[task]) | |
elif task == "lmt": | |
evaluation_results[task] = evaluate_lmt(gold_data[task], pred_data[task]) | |
# convert the evaluation results to the required format | |
for task, result in evaluation_results.items(): | |
if isinstance(result, dict): | |
for subtask, subresult in result.items(): | |
if isinstance(subresult, dict): | |
for subsubtask, subsubresult in subresult.items(): | |
evaluation_results[task][subtask][ | |
subsubtask | |
] = f"{subsubresult:.2f}" | |
else: | |
if isinstance(subresult, str): | |
evaluation_results[task][subtask] = subresult | |
else: | |
evaluation_results[task][subtask] = f"{subresult:.2f}" | |
else: | |
if isinstance(result, str): | |
evaluation_results[task] = result | |
else: | |
evaluation_results[task] = f"{result:.2f}" | |
blank_scores = { | |
"lner": {"strict mF1": "-"}, | |
"rr": {"mF1": "-"}, | |
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"}, | |
"bail": {"mF1": "-"}, | |
"lsi": {"mF1": "-"}, | |
"pcr": {"muF1@K": "-"}, | |
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"}, | |
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"}, | |
} | |
print("--------------------------Evaluation Summary--------------------------") | |
for task, result in evaluation_results.items(): | |
print(f"{task}: {result}") | |
print("---------------------------------------------------------------------") | |
# for tasks that were not present in the submission, add blank scores | |
for task in gold_data.keys(): | |
if task not in pred_data: | |
evaluation_results[task] = blank_scores[task] | |
# Generate the output JSON | |
output_json = create_output_json(evaluation_results) | |
with open("evaluation_results.json", "w") as f: | |
json.dump(output_json, f, indent=2) | |
print("Evaluation results saved to evaluation_results.json") | |
def get_evaluation_scores(gold_data, submission_data): | |
evaluation_results = {} | |
for task in submission_data.keys(): | |
print(f"Task: {task}") | |
if task == "bail": | |
evaluation_results[task] = evaluate_bail( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "cjpe": | |
evaluation_results.update( | |
evaluate_cjpe(gold_data[task], submission_data[task]) | |
) | |
elif task == "lner": | |
text_data = load_json("lner-text.json") | |
evaluation_results[task] = evaluate_lner( | |
gold_data[task], submission_data[task], text_data | |
) | |
elif task == "rr": | |
evaluation_results[task] = evaluate_rr( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "lsi": | |
evaluation_results[task] = evaluate_lsi( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "pcr": | |
evaluation_results[task] = evaluate_pcr( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "summ": | |
evaluation_results[task] = evaluate_summ( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "lmt": | |
evaluation_results[task] = evaluate_lmt( | |
gold_data[task], submission_data[task] | |
) | |
# convert the evaluation results to the required format | |
for task, result in evaluation_results.items(): | |
if isinstance(result, dict): | |
for subtask, subresult in result.items(): | |
if isinstance(subresult, dict): | |
for subsubtask, subsubresult in subresult.items(): | |
evaluation_results[task][subtask][ | |
subsubtask | |
] = f"{subsubresult:.2f}" | |
else: | |
if isinstance(subresult, str): | |
evaluation_results[task][subtask] = subresult | |
else: | |
evaluation_results[task][subtask] = f"{subresult:.2f}" | |
else: | |
if isinstance(result, str): | |
evaluation_results[task] = result | |
else: | |
evaluation_results[task] = f"{result:.2f}" | |
blank_scores = { | |
"lner": {"strict mF1": "-"}, | |
"rr": {"mF1": "-"}, | |
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"}, | |
"bail": {"mF1": "-"}, | |
"lsi": {"mF1": "-"}, | |
"pcr": {"muF1@K": "-"}, | |
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"}, | |
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"}, | |
} | |
# for tasks that were not present in the submission, add blank scores | |
for task in gold_data.keys(): | |
if task not in submission_data: | |
evaluation_results[task] = blank_scores[task] | |
print("--------------------------Evaluation Summary--------------------------") | |
for task, result in evaluation_results.items(): | |
print(f"{task}: {result}") | |
print("---------------------------------------------------------------------") | |
output_json = create_output_json(evaluation_results) | |
return output_json | |
if __name__ == "__main__": | |
main() | |