Spaces:
Running
Running
# https://github.com/maszhongming/UniEval/tree/main | |
from dataclasses import dataclass, field | |
from tqdm import tqdm | |
from graphgen.models.text.text_pair import TextPair | |
def _add_questions(dimension: str, question: str, answer: str): | |
if dimension == "naturalness": | |
cur_input = 'question: Is this a natural response in the dialogue? </s> response: ' + answer | |
elif dimension == "coherence": | |
cur_input = 'question: Is this a coherent response given the dialogue history? </s> response: ' \ | |
+ answer + ' </s> dialogue history: ' + question | |
elif dimension == "understandability": | |
cur_input = 'question: Is this an understandable response in the dialogue? </s> response: ' + answer | |
else: | |
raise NotImplementedError( | |
'The input format for this dimension is still undefined. Please customize it first.') | |
return cur_input | |
class UniEvaluator: | |
model_name: str = "MingZhong/unieval-sum" | |
dimensions: list = field(default_factory=lambda: ['naturalness', 'coherence', 'understandability']) | |
max_length: int = 2560 | |
results: dict = None | |
def __post_init__(self): | |
import torch | |
self.num_gpus = torch.cuda.device_count() | |
self.results = {} | |
def process_chunk(rank, pairs, model_name, max_length, dimension, return_dict): | |
import torch | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
device = f'cuda:{rank}' | |
torch.cuda.set_device(rank) | |
rank_model = AutoModelForSeq2SeqLM.from_pretrained(model_name) | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
rank_model.to(device) | |
rank_model.eval() | |
softmax = torch.nn.Softmax(dim=1) | |
pos_id = tokenizer("Yes")["input_ids"][0] | |
neg_id = tokenizer("No")["input_ids"][0] | |
results = [] | |
with torch.no_grad(): | |
for pair in tqdm(pairs): | |
text = _add_questions(dimension, pair.question, pair.answer) | |
tgt = "No" | |
encoded_src = tokenizer( | |
text, | |
max_length=max_length, | |
truncation=True, | |
padding=True, | |
return_tensors='pt' | |
) | |
encoded_tgt = tokenizer( | |
tgt, | |
max_length=max_length, | |
truncation=True, | |
padding=True, | |
return_tensors='pt' | |
) | |
src_tokens = encoded_src['input_ids'].to(device) | |
src_mask = encoded_src['attention_mask'].to(device) | |
tgt_tokens = encoded_tgt['input_ids'].to(device)[:, 0].unsqueeze(-1) | |
output = rank_model( | |
input_ids=src_tokens, | |
attention_mask=src_mask, | |
labels=tgt_tokens, | |
use_cache = False | |
) | |
logits = output.logits.view(-1, rank_model.config.vocab_size) | |
pos_score = softmax(logits)[:, pos_id] # Yes | |
neg_score = softmax(logits)[:, neg_id] | |
score = pos_score / (pos_score + neg_score) | |
results.append(score.item()) | |
return_dict[rank] = results | |
def evaluate(self, pairs: list[TextPair]) -> list[dict]: | |
import torch.multiprocessing as mp | |
final_results = [] | |
for dimension in self.dimensions: | |
chunk_size = len(pairs) // self.num_gpus | |
chunks = [] | |
for i in range(self.num_gpus): | |
start = i * chunk_size | |
end = start + chunk_size | |
if i == self.num_gpus - 1: | |
end = len(pairs) | |
chunks.append(pairs[start:end]) | |
# multi-process | |
manager = mp.Manager() | |
return_dict = manager.dict() | |
processes = [] | |
for rank, chunk in enumerate(chunks): | |
p = mp.Process( | |
target=self.process_chunk, | |
args=(rank, chunk, self.model_name, self.max_length, dimension, return_dict) | |
) | |
p.start() | |
processes.append(p) | |
for p in processes: | |
p.join() | |
# 合并结果 | |
results = [] | |
for rank in range(len(chunks)): | |
results.extend(return_dict[rank]) | |
for p in processes: | |
if p.is_alive(): | |
p.terminate() | |
p.join() | |
final_results.append({ | |
dimension: results | |
}) | |
return final_results | |
def get_average_score(self, pairs: list[TextPair]) -> dict: | |
""" | |
Get the average score of a batch of texts. | |
""" | |
results = self.evaluate(pairs) | |
final_results = {} | |
for result in results: | |
for key, value in result.items(): | |
final_results[key] = sum(value) / len(value) | |
self.results[key] = value | |
return final_results | |
def get_min_max_score(self, pairs: list[TextPair]) -> dict: | |
""" | |
Get the min and max score of a batch of texts. | |
""" | |
if self.results is None: | |
self.get_average_score(pairs) | |
final_results = {} | |
for key, value in self.results.items(): | |
final_results[key] = min(value), max(value) | |
return final_results | |