Meteor / eval /create_evaluator.py
BK-Lee's picture
v1
6957169
raw
history blame
26.1 kB
import os
import re
import json
import shortuuid
import numpy as np
import pandas as pd
from config import *
from collections import defaultdict
from eval.utils import *
class BaseEvaluator:
def __init__(self):
super(BaseEvaluator, self).__init__()
# Create evaluation results folder
self.save_dir = os.path.join(DATASET_ROOT, "eval_results")
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
def reset(self):
# Reset results for new dataset evaluation
self.gen_answers = []
self.inputs = []
def process(self, inputs, outputs):
# Merge results
self.inputs.extend(inputs)
self.gen_answers.extend(outputs)
class Evaluator(BaseEvaluator):
def __init__(self):
"""
Eval Datasets
- VQAv2
- GQA
- SQA-IMG
- VizWiz
- TextVQA
- POPE
- MME
- MMBench
- MMBench-CN
- QBench
- MM-Vet
- MMMU
- MathVista
- AI2D
- HallusionBench
- ChartQA
- SEED
- LLaVA Wild
- BLINK
- MathVerse
"""
super().__init__()
def evaluate(self, model, dataset, accel):
# gathering all gpu to one device
self.inputs = accel.gather_for_metrics(self.inputs)
self.gen_answers = accel.gather_for_metrics(self.gen_answers)
if accel.is_main_process:
# check for duplicates
self.inputs, self.gen_answers = remove_duplicate(dataset, self.inputs, self.gen_answers)
# Select evaluation for dataset
if dataset == "vqav2":
return self.evaluate_vqa(model, accel)
elif dataset == "gqa":
return self.evaluate_gqa(model, accel)
elif dataset == "sqa":
return self.evaluate_sqa(model, accel)
elif dataset == "vizwiz":
return self.evaluate_vizwiz(model, accel)
elif dataset == "textvqa":
return self.evaluate_textvqa(model, accel)
elif dataset == "pope":
return self.evaluate_pope(model, accel)
elif dataset == "mme":
return self.evaluate_mme(model, accel)
elif dataset == "mmbench":
return self.evaluate_mmbench(model, accel)
elif dataset == "mmbench_dev":
return self.evaluate_mmbench_dev(model, accel)
elif dataset == "mmbench_cn":
return self.evaluate_mmbench_cn(model, accel)
elif dataset == "mmbench_cn_dev":
return self.evaluate_mmbench_cn_dev(model, accel)
elif dataset == "qbench":
return self.evaluate_qbench(model, accel)
elif dataset == "mm-vet":
return self.evaluate_mmvet(model, accel)
elif dataset == "mmmu":
return self.evaluate_mmmu(model, accel)
elif dataset == "mathvista":
return self.evaluate_mathvista(model, accel)
elif dataset == "ai2d":
return self.evaluate_ai2d(model, accel)
elif dataset == "hallusionbench":
return self.evaluate_hallusionbench(model, accel)
elif dataset == "chartqa":
return self.evaluate_chartqa(model, accel)
elif dataset == "seed":
return self.evaluate_seed(model, accel)
elif dataset == "llava":
return self.evaluate_llava(model, accel)
elif dataset == "blink":
return self.evaluate_blink(model, accel)
elif dataset == "mathverse":
return self.evaluate_mathverse(model, accel)
elif dataset == "mmstar":
return self.evaluate_mmstar(model, accel)
else:
raise ValueError(
f'{dataset} is not an available dataset.')
else:
return None
def evaluate_vqa(self, model, accel):
# VQAv2 Evaluation for EvalAI server
pred_answers = [{'question_id': inputs['id'], 'answer': answer} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_vqav2_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
accel.print(f"Finished evaluating VQAv2. Evaluate the result file saved to {pred_pth} on EvalAI server.")
return
def evaluate_gqa(self, model, accel):
# GQA Evaluation
pred_answers = {inputs['id']: answer for inputs, answer in zip(self.inputs, self.gen_answers)}
# pred_answers = [{'question_id': inputs['id'], 'answer': answer} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_gqa_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
accel.print("GQA Results:")
results = eval_gqa(pred_answers, json.load(open(os.path.join(DATASET_ROOT, GQA))))
return results['accuracy']
def evaluate_sqa(self, model, accel):
# SQA Evaluation
pred_answers = [{'question_id': inputs['id'], 'answer': convert_to_choice(answer, inputs['candidates']), 'gt': inputs['gt']} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_sqa_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
# Compute accuracy
results = [(answer['answer'] == answer['gt']) for answer in pred_answers]
accel.print (f"SQA Accuracy: {np.mean(results)*100} %")
return np.mean(results)*100
def evaluate_vizwiz(self, model, accel):
# VizWiz Evaluation
evaluator = EvalAIAnswerProcessor()
pred_answers = [{'image': inputs['id'], 'answer': evaluator(answer)} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_vizwiz_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
accel.print(f"Finished evaluating VizWiz. Evaluate the result file saved to {pred_pth} on EvalAI server.")
return
def evaluate_textvqa(self, model, accel):
# TextVQA Evaluation
pred_answers = [{'question_id': inputs['id'], 'pred_answer': answer, 'question': inputs['question'], 'gt_answers': inputs['gt']} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_textvqa_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
evaluator = TextVQAAccuracyEvaluator()
results = evaluator.eval_pred_list(pred_answers)*100
accel.print (f"TextVQA Accuracy: {results} %")
return results
def evaluate_pope(self, model, accel):
# POPE Evaluation
pred_answers = [{'question_id': inputs['id'], 'answer': answer, 'question': inputs['question'], 'category': inputs['category']} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_pope_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
pope_results = {}
pope_results['adversarial'] = None
pope_results['popular'] = None
pope_results['random'] = None
categories = ['adversarial', 'popular', 'random']
files = [POPE_ADVERSARIAL, POPE_POPULAR, POPE_RANDOM]
for category, file in zip(categories, files):
cur_answers = [x for x in pred_answers if x['category'] == category]
cur_answers = sorted(cur_answers, key=lambda x:x["question_id"])
pope_results[category] = eval_pope(cur_answers, os.path.join(DATASET_ROOT, file))
accel.print (f"POPE Adversarial Accuracy: {pope_results['adversarial']} %")
accel.print (f"POPE Popular Accuracy: {pope_results['popular']} %")
accel.print (f"POPE Random Accuracy: {pope_results['random']} %")
return pope_results
def evaluate_mme(self, model, accel):
# MME Evaluation
pred_answers = [{'question_id': inputs['id'], 'answer': answer, "question": inputs['question'], 'category': inputs['category']} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_mme_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
ground_truth = get_gt(data_path=os.path.join(DATASET_ROOT, MME_DIR))
result_dir = os.path.join(self.save_dir, 'mme')
os.makedirs(result_dir, exist_ok=True)
results = defaultdict(list)
for answer in pred_answers:
file = answer['question_id'].split('/')[-1].split('.')[0] + '.txt'
results[answer['category']].append((file, answer['question'], answer['answer']))
for category, cate_tups in results.items():
with open(os.path.join(result_dir, f'{category}.txt'), 'w') as fp:
questions = set() # check for duplicates
for file, prompt, answer in cate_tups:
if 'Answer the question using a single word or phrase.' in prompt:
prompt = prompt.replace('Answer the question using a single word or phrase.', '').strip()
if 'Please answer yes or no.' not in prompt:
prompt = prompt + ' Please answer yes or no.'
if (category, file, prompt) not in ground_truth:
prompt = prompt.replace(' Please answer yes or no.', ' Please answer yes or no.')
gt_ans = ground_truth[category, file, prompt]
dup = file, prompt, gt_ans
tup = file, prompt, gt_ans, answer
if dup in questions:
continue
questions.add(dup)
fp.write('\t'.join(tup) + '\n')
evaluator = MMEEvaluator()
scores = evaluator.process_result(result_dir)
accel.print("MME Scores:")
accel.print(scores)
for eval_type, eval_scores in scores.items():
accel.print("===========", eval_type, "===========")
accel.print("total score:", eval_scores['total'], "\n")
for task_name, score in eval_scores.items():
accel.print("\t", task_name, " score:", score)
accel.print("\n")
return scores
def evaluate_mmbench(self, model, accel):
# MMBench Evaluation
df = pd.read_table(os.path.join(DATASET_ROOT, MMBENCH))
cur_df = df.copy()
cur_df = cur_df.drop(columns=['hint', 'category', 'source', 'image', 'comment', 'l2-category'])
cur_df.insert(6, 'prediction', None)
for inputs, answer in zip(self.inputs, self.gen_answers):
cur_df.loc[df['index'] == inputs['id'], 'prediction'] = answer
pred_pth = os.path.join(self.save_dir, f"{model}_mmbench_results.xlsx")
cur_df.to_excel(pred_pth, index=False, engine='openpyxl')
accel.print(f"Finished evaluating MMBench. Change {pred_pth} name to submission.xlsx and evaluate the result file saved to {pred_pth} on OpenCompass server.")
return
def evaluate_mmbench_dev(self, model, accel):
# MMBench Dev Evaluation
df = pd.read_table(os.path.join(DATASET_ROOT, MMBENCH_DEV))
cur_df = df.copy()
cur_df = cur_df.drop(columns=['hint', 'category', 'source', 'image', 'comment', 'l2-category'])
cur_df.insert(6, 'prediction', None)
for inputs, answer in zip(self.inputs, self.gen_answers):
cur_df.loc[df['index'] == inputs['id'], 'prediction'] = answer[0]
pred_pth = os.path.join(self.save_dir, f"{model}_mmbench_dev_results.xlsx")
cur_df.to_excel(pred_pth, index=False, engine='openpyxl')
accuracy = (cur_df['prediction'] == cur_df['answer']).mean()
accel.print(f'MMBench_dev Accuracy: {accuracy:.2%}')
return
def evaluate_mmbench_cn(self, model, accel):
# MMBench_CN Evaluation
df = pd.read_table(os.path.join(DATASET_ROOT, MMBENCH_CN))
cur_df = df.copy()
cur_df = cur_df.drop(columns=['hint', 'category', 'source', 'image', 'comment', 'l2-category'])
cur_df.insert(6, 'prediction', None)
for inputs, answer in zip(self.inputs, self.gen_answers):
cur_df.loc[df['index'] == inputs['id'], 'prediction'] = answer
pred_pth = os.path.join(self.save_dir, f"{model}_mmbench_cn_results.xlsx")
cur_df.to_excel(pred_pth, index=False, engine='openpyxl')
accel.print(f"Finished evaluating MMBench_CN. Change {pred_pth} name to submission.xlsx and evaluate the result file saved to {pred_pth} on OpenCompass server.")
return
def evaluate_mmbench_cn_dev(self, model, accel):
# MMBench_CN Dev Evaluation
df = pd.read_table(os.path.join(DATASET_ROOT, MMBENCH_CN_DEV))
cur_df = df.copy()
cur_df = cur_df.drop(columns=['hint', 'category', 'source', 'image', 'comment', 'l2-category'])
cur_df.insert(6, 'prediction', None)
for inputs, answer in zip(self.inputs, self.gen_answers):
cur_df.loc[df['index'] == inputs['id'], 'prediction'] = answer[0]
pred_pth = os.path.join(self.save_dir, f"{model}_mmbench_cn_dev_results.xlsx")
cur_df.to_excel(pred_pth, index=False, engine='openpyxl')
accuracy = (cur_df['prediction'] == cur_df['answer']).mean()
accel.print(f'MMBench_CN_dev Accuracy: {accuracy:.2%}')
return
def evaluate_qbench(self, model, accel):
# QBench Evaluation
pred_answers = [{'id': inputs['id'], 'answer': convert_to_choice(answer, inputs['candidates']), 'gt': inputs['gt'], 'candidates': inputs['candidates']} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f'{model}_qbench_results.jsonl')
with open(pred_pth, "w") as pf:
pf.write(json.dumps(pred_answers) + "\n")
results = [(pred['candidates'][pred['answer']] == pred['gt']) for pred in pred_answers]
accel.print (f"QBench Accuracy: {np.mean(results)*100} %")
return np.mean(results)*100
def evaluate_mmvet(self, model, accel):
# MM-Vet Evaluation
cur_result = {f"{inputs['id']}": answer for inputs, answer in zip(self.inputs, self.gen_answers)}
pred_pth = os.path.join(self.save_dir, f'{model}_mmvet_results.json')
with open(pred_pth, 'w') as f:
json.dump(cur_result, f, indent=2)
accel.print(f"Finished evaluating MM-Vet. Evaluate the result file saved to {pred_pth}.")
return
def evaluate_mmmu(self, model, accel):
# MMMU Evaluation
predictions = {inputs['id']: answer for inputs, answer in zip(self.inputs, self.gen_answers)}
answers = {inputs['id']: {'ground_truth': inputs['gt'], 'question_type': inputs['question_type']} for inputs, answer in zip(self.inputs, self.gen_answers)}
pred_pth = os.path.join(self.save_dir, f'{model}_mmmu_results.json')
with open(pred_pth, "w") as f:
json.dump(predictions, f, indent=2)
ans_pth = os.path.join(self.save_dir, 'mmmu_answers.json')
with open(ans_pth, "w") as pf:
json.dump(answers, pf, indent=2)
# group by category
output_dict_w_cat = {}
for data_id, parsed_pred in predictions.items():
category = "_".join(data_id.split("_")[1:-1])
if category not in output_dict_w_cat:
output_dict_w_cat.update({category: {}})
output_dict_w_cat[category].update({data_id: parsed_pred})
# group by category
answer_dict_w_cat = {}
for data_id, parsed_pred in answers.items():
category = "_".join(data_id.split("_")[1:-1])
if category not in answer_dict_w_cat:
answer_dict_w_cat.update({category: {}})
answer_dict_w_cat[category].update({data_id: parsed_pred})
evaluation_result = {}
for category in CAT_SHORT2LONG.values():
accel.print("Evaluating: {}".format(category))
# get cat_outputs and cat_answers
try:
cat_outputs = output_dict_w_cat[category]
cat_answers = answer_dict_w_cat[category]
except KeyError:
accel.print("Skipping {} for not found".format(category))
continue
exampels_to_eval = []
for data_id, parsed_pred in cat_outputs.items():
question_type = cat_answers[data_id]['question_type']
if question_type != 'multiple-choice':
parsed_pred = parse_open_response(parsed_pred) # mainly for type consistency (make it number, etc.)
else:
parsed_pred = parsed_pred
exampels_to_eval.append({
"id": data_id,
"question_type": question_type,
"answer": cat_answers[data_id]['ground_truth'],
"parsed_pred": parsed_pred
})
judge_dict, metric_dict = evaluate(exampels_to_eval)
metric_dict.update({"num_example": len(exampels_to_eval)})
evaluation_result[category] = metric_dict
printable_results = {}
# add domain Subject
for domain, in_domain_cats in DOMAIN_CAT2SUB_CAT.items():
in_domain_cat_results = {}
for cat_name in in_domain_cats: # use the order in DOMAIN_CAT2SUB_CAT
if cat_name in evaluation_result.keys():
in_domain_cat_results[cat_name] = evaluation_result[cat_name]
else:
pass
in_domain_ins_acc = calculate_ins_level_acc(in_domain_cat_results)
in_domain_data_num = sum([cat_results['num_example'] for cat_results in in_domain_cat_results.values()])
printable_results['Overall-' + domain] = {"num": int(in_domain_data_num),
"acc": round(in_domain_ins_acc, 3)
}
# add sub category
for cat_name, cat_results in in_domain_cat_results.items():
printable_results[cat_name] = {"num": int(cat_results['num_example']),
"acc": round(cat_results['acc'], 3)
}
# table.append(["-----------------------------", "-----", "----"])
all_ins_acc = calculate_ins_level_acc(evaluation_result)
printable_results['Overall'] = {"num": sum([cat_results['num_example'] for cat_results in evaluation_result.values()]),
"acc": round(all_ins_acc, 3)
}
accel.print(printable_results)
return
def evaluate_mathvista(self, model, accel):
# MathVista Evaluation
pred_answers = [{'pid': inputs['id'], 'image': inputs['id'], 'response': answer,
'question_type': inputs['question_type'], 'answer_type': inputs['answer_type'], 'metadata': inputs['metadata'],
'choices': inputs['choices'], 'query': inputs['question'], 'precision': inputs['precision'],} for inputs, answer in zip(self.inputs, self.gen_answers)]
predictions = {pred['pid']: pred for pred in pred_answers}
pred_pth = os.path.join(self.save_dir, f"{model}_mathvista_results.json")
json.dump(predictions, open(pred_pth, "w"))
accel.print(f"Finished evaluating MathVista. Evaluate the result file saved to {pred_pth}.")
return
def evaluate_ai2d(self, model, accel):
# AI2D Evaluation
pred_answers = [{'question_id': inputs['id'], 'answer': answer, 'gt': inputs['gt']} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_ai2d_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
# Compute accuracy
pattern = re.compile(r'[A-Z]')
results = [(char_to_int(pattern.findall(answer)[0]) == inputs['gt']) for inputs, answer in zip(self.inputs, self.gen_answers)]
accel.print(f"AI2D Accuracy: {np.mean(results)*100} %")
return np.mean(results)*100
def evaluate_hallusionbench(self, model, accel):
# HallusionBench Evaluation
pred_answers = [{'answer': '1' if answer.lower().find('yes') != -1 else '0', 'question': inputs['question'], 'gt': inputs['gt']} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_hallusionbench_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
# Compute accuracy
results = [(answer['answer'] == answer['gt']) for answer in pred_answers]
accel.print(f"HallusionBench Accuracy: {np.mean(results)*100} %")
return np.mean(results)*100
def evaluate_chartqa(self, model, accel):
# ChartQA Evaluation
# post processing
processed_answers = []
for x in self.gen_answers:
if any(i.isdigit() for i in x):
processed_answers.append(x.split(" ")[0])
else:
processed_answers.append(x)
pred_answers = [{'answer': answer, 'question': inputs['question'], 'annotation': inputs['gt']} for inputs, answer in zip(self.inputs, processed_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_chartqa_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
# Compute accuracy
acc = evaluate_relaxed_accuracy(pred_answers)
accel.print(f"ChartQA Accuracy: {acc*100}%")
return acc
def evaluate_seed(self, model, accel):
# SEED Evaluation
pred_answers = [{'answer': answer, 'question': inputs['question'], 'question_id': inputs['id'], 'gt': inputs['gt'], 'question_type': inputs['question_type']} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f"{model}_seed_results.json")
json.dump(pred_answers, open(pred_pth, "w"))
# Compute accuracy
results = [(answer['answer'] == answer['gt']) for answer in pred_answers]
accel.print (f"SEED Accuracy: {np.mean(results)*100} %")
# Per question type accuracy
for k, v in SEED_TYPES.items():
sub_results = []
for pred in pred_answers:
if pred['question_type'] == k:
sub_results.append(pred['answer'] == pred['gt'])
accel.print (f"{v}: {np.mean(sub_results)*100} %")
return np.mean(results)*100
def evaluate_llava(self, model, accel):
# LLaVA-in-the-Wild Evaluation
pred_answers = [{'question_id': inputs['id'], 'prompt': inputs['question'], 'text': answer, "answer_id": shortuuid.uuid()} for inputs, answer in zip(self.inputs, self.gen_answers)]
sorted_answers = sorted(pred_answers, key=lambda x: x['question_id'])
pred_pth = os.path.join(self.save_dir, f'{model}_llava_results.jsonl')
ans_file = open(pred_pth, "w")
for pred in sorted_answers:
ans_file.write(json.dumps(pred) + "\n")
ans_file.flush()
ans_file.close()
accel.print(f"Finished evaluating LLaVA-in-the-wild. Evaluate the result file saved to {pred_pth}.")
return
def evaluate_blink(self, model, accel):
# BLINK Evaluation
# TODO
return
def evaluate_mathverse(self, model, accel):
# Mathverse Evaluation
pred_answers = [{'sample_index' : inputs['id'], 'problem_index' : inputs['problem_index'], 'problem_version' : inputs['problem_version'],
'question' : inputs['origin_question'], 'answer' : inputs['gt'],
'question_type': inputs['question_type'], 'question_type': inputs['question_type'],
'metadata': inputs['metadata'], 'query_wo': inputs['question'], 'query_cot' : inputs['query_cot'], 'model_answer' : answer} for inputs, answer in zip(self.inputs, self.gen_answers)]
# answers = [item for item in pred_answers if item['problem_version'] != 'Text_Only']
# text_only_answers = [item for item in pred_answers if item['problem_version'] == 'Text_Only']
pred_pth = os.path.join(self.save_dir, f'{model}_mathverse_results.json')
json.dump(pred_answers, open(pred_pth, "w"))
pred_pth = os.path.join(self.save_dir, f'{model}_mathverse_scores.json')
eval_mathverse(self.save_dir, pred_answers,f'{model}_mathverse_extracts.json', f'{model}_mathverse_scores.json')
accel.print(f"Finished evaluating MathVerse. Evaluate the result file saved to {pred_pth}.")
# TODO
return
def evaluate_mmstar(self, model, accel):
pred_answers = [{'question': inputs['question'],
'answer': inputs['answer'],
'category': inputs['category'],
'l2_category': inputs['l2_category'],
# 'bench': inputs['bench'],
'prediction' : answer} for inputs, answer in zip(self.inputs, self.gen_answers)]
pred_pth = os.path.join(self.save_dir, f'{model}_mmstar_results.json')
json.dump(pred_answers, open(pred_pth, "w"))
df = pd.DataFrame(pred_answers)
eval_mmstar(df, self.save_dir, f'{model}_mmstar_scores.json')
pred_pth = os.path.join(self.save_dir, f'{model}_mmstar_scores.json')
accel.print(f"Finished evaluating MMStar. Evaluate the result file saved to {pred_pth}.")