CuMo-7b-zero / cumo /eval /calculate_score.py
jiachenl
update
c3f3b0b
import os
import re
import argparse
import pandas as pd
# !pip install python-Levenshtein
from Levenshtein import distance
import json
import sys
sys.path.append('../')
#from utilities import *
def read_json(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def save_json(data, path):
with open(path, 'w') as f:
json.dump(data, f, indent=4)
def get_most_similar(prediction, choices):
"""
Use the Levenshtein distance (or edit distance) to determine which of the choices is most similar to the given prediction
"""
distances = [distance(prediction, choice) for choice in choices]
ind = distances.index(min(distances))
return choices[ind]
# return min(choices, key=lambda choice: distance(prediction, choice))
def normalize_extracted_answer(extraction, choices, question_type, answer_type, precision):
"""
Normalize the extracted answer to match the answer type
"""
if question_type == 'multi_choice':
# make sure the extraction is a string
if isinstance(extraction, str):
extraction = extraction.strip()
else:
try:
extraction = str(extraction)
except:
extraction = ""
# extract "A" from "(A) text"
letter = re.findall(r'\(([a-zA-Z])\)', extraction)
if len(letter) > 0:
extraction = letter[0].upper()
options = [chr(ord('A') + i) for i in range(len(choices))]
if extraction in options:
# convert option letter to text, e.g. "A" -> "text"
ind = options.index(extraction)
extraction = choices[ind]
else:
# select the most similar option
extraction = get_most_similar(extraction, choices)
assert extraction in choices
elif answer_type == 'integer':
try:
extraction = str(int(float(extraction)))
except:
extraction = None
elif answer_type == 'float':
try:
extraction = str(round(float(extraction), precision))
except:
extraction = None
elif answer_type == 'list':
try:
extraction = str(extraction)
except:
extraction = None
return extraction
def safe_equal(prediction, answer):
"""
Check if the prediction is equal to the answer, even if they are of different types
"""
try:
if prediction == answer:
return True
return False
except Exception as e:
print(e)
return False
def get_acc_with_contion(res_pd, key, value):
if key == 'skills':
# if value in res_pd[key]:
total_pd = res_pd[res_pd[key].apply(lambda x: value in x)]
else:
total_pd = res_pd[res_pd[key] == value]
correct_pd = total_pd[total_pd['true_false'] == True]
acc = "{:.2f}".format(len(correct_pd) / len(total_pd) * 100)
return len(correct_pd), len(total_pd), acc
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--output_dir', type=str, default='../results')
parser.add_argument('--output_file', type=str, default='output.json')
parser.add_argument('--score_file', type=str, default='scores.json')
parser.add_argument('--gt_file', type=str, default='../data/testmini.json', help='ground truth file')
parser.add_argument('--number', type=int, default=-1, help='number of problems to run')
parser.add_argument('--rerun', action='store_true', help='rerun the evaluation')
parser.add_argument('--caculate_gain', action='store_true', help='caculate the socre gains over random guess')
parser.add_argument('--random_file', type=str, default='score_random_guess.json')
args = parser.parse_args()
# args
output_file = os.path.join(args.output_dir, args.output_file)
# # quick test
# output_file = '../results/llava-llama-2-13b/output_llava_llama_2_13b.json'
# read json
print(f"Reading {output_file}...")
results = read_json(output_file)
# read ground truth
print(f"Reading {args.gt_file}...")
gts = read_json(args.gt_file)
# full pids
full_pids = list(results.keys())
if args.number > 0:
full_pids = full_pids[:min(args.number, len(full_pids))]
print("Number of testing problems:", len(full_pids))
## [1] Evaluate if the prediction is true or false
print("\nEvaluating the predictions...")
update_json_flag = False
for pid in full_pids:
problem = results[pid]
# print(problem)
if args.rerun:
if 'prediction' in problem:
del problem['prediction']
if 'true_false' in problem:
del problem['true_false']
choices = problem['choices']
question_type = problem['question_type']
answer_type = problem['answer_type']
precision = problem['precision']
extraction = problem['extraction']
if 'answer' in problem:
answer = problem['answer']
else:
answer = gts[pid]['answer']
problem['answer'] = answer
# normalize the extracted answer to match the answer type
prediction = normalize_extracted_answer(extraction, choices, question_type, answer_type, precision)
# verify the prediction is true or false
true_false = safe_equal(prediction, answer)
# update the problem
if "true_false" not in problem:
update_json_flag = True
elif true_false != problem['true_false']:
update_json_flag = True
if "prediction" not in problem:
update_json_flag = True
elif prediction != problem['prediction']:
update_json_flag = True
problem['prediction'] = prediction
problem['true_false'] = true_false
# save the updated json
if update_json_flag:
print("\n!!!Some problems are updated.!!!")
print(f"\nSaving {output_file}...")
save_json(results, output_file)
## [2] Calculate the average accuracy
total = len(full_pids)
correct = 0
for pid in full_pids:
if results[pid]['true_false']:
correct += 1
accuracy = str(round(correct / total * 100, 2))
print(f"\nCorrect: {correct}, Total: {total}, Accuracy: {accuracy}%")
scores = {"average": {"accuracy": accuracy, "correct": correct, "total": total}}
## [3] Calculate the fine-grained accuracy scores
# merge the 'metadata' attribute into the data
for pid in results:
results[pid].update(results[pid].pop('metadata'))
# convert the data to a pandas DataFrame
df = pd.DataFrame(results).T
print(len(df))
print("Number of test problems:", len(df))
# assert len(df) == 1000 # Important!!!
# asign the target keys for evaluation
target_keys = ['question_type', 'answer_type', 'language', 'source', 'category', 'task', 'context', 'grade', 'skills']
for key in target_keys:
print(f"\nType: [{key}]")
# get the unique values of the key
if key == 'skills':
# the value is a list
values = []
for i in range(len(df)):
values += df[key][i]
values = list(set(values))
else:
values = df[key].unique()
#print(values)
# calculate the accuracy for each value
scores[key] = {}
for value in values:
correct, total, acc = get_acc_with_contion(df, key, value)
if total > 0:
print(f"[{value}]: {acc}% ({correct}/{total})")
scores[key][value] = {"accuracy": acc, "correct": correct, "total": total}
# sort the scores by accuracy
scores[key] = dict(sorted(scores[key].items(), key=lambda item: float(item[1]['accuracy']), reverse=True))
# save the scores
scores_file = os.path.join(args.output_dir, args.score_file)
print(f"\nSaving {scores_file}...")
save_json(scores, scores_file)
print("\nDone!")
# [4] Calculate the score gains over random guess
if args.caculate_gain:
random_file = os.path.join(args.output_dir, args.random_file)
random_scores = json.load(open(random_file))
print("\nCalculating the score gains...")
for key in scores:
if key == 'average':
gain = round(float(scores[key]['accuracy']) - float(random_scores[key]['accuracy']), 2)
scores[key]['acc_gain'] = gain
else:
for sub_key in scores[key]:
gain = round(float(scores[key][sub_key]['accuracy']) - float(random_scores[key][sub_key]['accuracy']), 2)
scores[key][sub_key]['acc_gain'] = str(gain)
# save the score gains
print(f"\nSaving {scores_file}...")
save_json(scores, scores_file)
print("\nDone!")