File size: 6,935 Bytes
744eb4e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import argparse
import json
import os
import random
random.seed(0)
import nltk
nltk.download('wordnet')
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from rouge import Rouge
from sentence_transformers import SentenceTransformer, util
from scipy.spatial.distance import cosine
from transformers import AutoModel, AutoTokenizer
import torch
import numpy as np
from tqdm import tqdm
class TraditionalMetricEvaluator():
def __init__(self, inputs, output_dir, output_file):
self.results = inputs['results']
self.inference_prompt = inputs['prompt']
self.output_dir = output_dir
self.output_file = output_file
self.rouge = Rouge()
self.response_data = []
self.ground_truths = []
self.generated_captions = []
self.sbert_model = SentenceTransformer('all-mpnet-base-v2')
self.simcse_tokenizer = AutoTokenizer.from_pretrained("princeton-nlp/sup-simcse-roberta-large")
self.simcse_model = AutoModel.from_pretrained("princeton-nlp/sup-simcse-roberta-large")
self.scores = {
'bleu-1': [],
'bleu-2': [],
'bleu-3': [],
'bleu-4': [],
'rouge-1': [],
'rouge-2': [],
'rouge-l': [],
'meteor': [],
'sbert_similarity': [],
'simcse_similarity': []
}
def evaluate_result(self, result):
object_id = result['object_id']
ground_truth = result['ground_truth']
model_output = result['model_output']
if model_output == "":
# * all score should be 0
model_output = "##"
# create a SmoothingFunction object
smoothing_function = SmoothingFunction().method1 # * used to deal with non-overlap n-gram
# calculate BLEU-1 score with smoothing function
bleu_1_score = sentence_bleu([ground_truth.split()], model_output.split(), weights=(1, 0, 0, 0), smoothing_function=smoothing_function)
# calculate BLEU-2, BLEU-3, and BLEU-4 scores
bleu_2_score = sentence_bleu([ground_truth.split()], model_output.split(), weights=(0.5, 0.5, 0, 0), smoothing_function=smoothing_function)
bleu_3_score = sentence_bleu([ground_truth.split()], model_output.split(), weights=(0.33, 0.33, 0.33, 0), smoothing_function=smoothing_function)
bleu_4_score = sentence_bleu([ground_truth.split()], model_output.split(), weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothing_function)
# calculate ROUGE-L score
rouge_scores_l = self.rouge.get_scores(model_output, ground_truth)[0]['rouge-l']
rouge_scores_1 = self.rouge.get_scores(model_output, ground_truth)[0]['rouge-1']
rouge_scores_2 = self.rouge.get_scores(model_output, ground_truth)[0]['rouge-2']
# calculate METEOR score
meteor_scores = meteor_score([ground_truth.split()], model_output.split())
# Calculate SBERT similarity
embeddings = self.sbert_model.encode([ground_truth, model_output])
sbert_similarity = util.cos_sim(embeddings[0], embeddings[1])[0][0].item()
# calculate SimCSE similarity
# Tokenize input texts
inputs = self.simcse_tokenizer([ground_truth, model_output], padding=True, truncation=True, return_tensors="pt")
# Get the embeddings
with torch.no_grad():
embeddings = self.simcse_model(**inputs, output_hidden_states=True, return_dict=True).pooler_output
# Calculate cosine similarity
simcse_similarity = 1 - cosine(embeddings[0], embeddings[1]) # * consine actually calculates consine distance, which is 1 - consine similarity
scores = {
'bleu-1': bleu_1_score * 100,
'bleu-2': bleu_2_score * 100,
'bleu-3': bleu_3_score * 100,
'bleu-4': bleu_4_score * 100,
'rouge-l': rouge_scores_l['f'] * 100,
'rouge-1': rouge_scores_1['f'] * 100,
'rouge-2': rouge_scores_2['f'] * 100,
'meteor': meteor_scores * 100,
'sbert_similarity': sbert_similarity * 100,
'simcse_similarity': simcse_similarity * 100
}
return object_id, model_output, ground_truth, scores
def evaluate(self):
print("Starting evaluation...")
for result in tqdm(self.results, desc="Evaluating"):
object_id, model_output, ground_truth, scores = self.evaluate_result(result)
# save the object_id, model_output, ground_truth, and scores for each result
self.response_data.append({
'object_id': object_id,
'ground_truth': ground_truth,
'model_output': model_output,
'scores': scores,
})
# save the scores for overall results
for metric, score in scores.items():
self.scores[metric].append(score)
print("Evaluation finished.")
self.save_results()
self.print_results()
def save_results(self):
output_path = os.path.join(self.output_dir, self.output_file)
with open(output_path, 'w') as f:
results_to_save = {
'inference_prompt': self.inference_prompt,
'overall_scores': {metric: f"{np.mean(scores):.4f}" for metric, scores in self.scores.items()},
'results': self.response_data,
}
json.dump(results_to_save, f, indent=2)
print(f"Results saved to {output_path}")
def print_results(self):
print('-' * 80)
print("Results:")
for metric, scores in self.scores.items():
print(f"Average {metric.upper()} Score: {np.mean(scores):.4f}")
def start_evaluation(results, output_dir, output_file,
parallel=True, num_workers=20):
"""
Args:
results: dict or file path to the json file containing the dict
output_file: the path the final evaluation results to be saved.
"""
if isinstance(results, str):
with open(results, 'r') as fp:
results = json.load(fp)
evaluator = TraditionalMetricEvaluator(results, output_dir, output_file)
evaluator.evaluate()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--results_path", type=str, \
default="", help="Path to the results file.")
parser.add_argument("--output_dir", type=str, default=None, help="Path to the output directory.")
args = parser.parse_args()
if args.output_dir is None:
args.output_dir = os.path.dirname(args.results_path)
output_file = os.path.basename(args.results_path).replace(".json", f"_evaluated_traditional.json")
start_evaluation(results=args.results_path, output_dir=args.output_dir, output_file=output_file)
|