|
import transformers |
|
from transformers import PretrainedConfig |
|
import os |
|
from pathlib import Path |
|
import numpy as np |
|
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union |
|
import torch.nn as nn |
|
import torch |
|
from nltk.tokenize import sent_tokenize |
|
from tqdm import tqdm |
|
|
|
class AlignScoreCS(transformers.XLMRobertaModel): |
|
""" |
|
AlignScoreCS class |
|
Description: |
|
Model ALIGNSCORECS has been trained according the paper for 3 days on 4GPUs AMD NVIDIA. |
|
(3 epochs, 1e-5 learning rate, 1e-6 AdamWeps, batchsize 32, WarmupRatio 0.06, 0.1 WeighDecay) |
|
- XLMROBERTA-large model with 3 classification HEAD {regression,binary,3way} using shared encoder |
|
- trained on 7M docs incorporating various NLP tasks (QA,STS,Summarization,FactVer,InforRetrievel,NLI,Paraphrase..) |
|
- English and Czech translated datasets |
|
|
|
TRY: .show_examples() to see some examples |
|
|
|
USAGE: AlignScore.py |
|
- .from_pretrained - loads the model, usage as transformers.model |
|
|
|
- .score(context, claim) - function |
|
- returns probs of the ALIGNED class using 3way class head as in the paper. |
|
|
|
- .classify(context, claim) - function |
|
- returns predicted class using bin class head as in the paper. |
|
|
|
alignScoreCS = AlignScoreCS.from_pretrained("/mnt/data/factcheck/AlignScore-data/AAmodel/MTLModel/mo |
|
alignScoreCS.score(context,claim) |
|
|
|
If you want to try different classification head use parameter: |
|
- task_name = "re" : regression head |
|
- task_name = "bin" : binary classification head |
|
- task_name = "3way" : 3way classification head |
|
""" |
|
_regression_model = "re_model" |
|
_binary_class_model = "bin_model" |
|
_3way_class_model = "3way_model" |
|
|
|
def __init__(self, encoder, taskmodels_dict, model_name= "xlm-roberta-large", **kwargs): |
|
super().__init__(transformers.XLMRobertaConfig(), **kwargs) |
|
self.encoder = encoder |
|
self.taskmodels_dict = nn.ModuleDict(taskmodels_dict) |
|
self.model_name = model_name |
|
self.tokenizer = None |
|
self.inferencer = None |
|
self.init_inferencer(device = "cpu") |
|
|
|
|
|
def init_inferencer(self, device = "cuda"): |
|
self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) if not self.tokenizer else self.tokenizer |
|
self.inferencer = self.InferenceHandler(self, self.tokenizer, device) |
|
|
|
""" |
|
Score: scores the context and claim with Aligned probabitlity of given classification head |
|
- using altered code inferencer from ALignScore |
|
|
|
context : list or str |
|
claim : list or str |
|
eval_mode : {nli, bin, re, nli_sp, bin_sp or re_sp} |
|
nli - 3way head |
|
bin - 2way head |
|
re - regression head |
|
(sp - indicates whether to apply alignscore function chunk context and split claim into sentences |
|
otherwise it truncates the text and returns probability of Aligned from that) |
|
eval_question_answer : list or str representing question if you want to evaluate context-answer question |
|
DEFAULT: nli_sp |
|
Returns the consistency score (probability of Aligned class of 3-way head) between context text and claim text |
|
- using 2way classification head |
|
|
|
""" |
|
def score(self, context, claim, eval_mode = "nli_sp", eval_question_answer = None, **kwargs): |
|
|
|
scores = self.inferencer.nlg_eval(context, claim, eval_mode=eval_mode, question = eval_question_answer) |
|
return scores |
|
|
|
|
|
""" |
|
Classify: classify the context and claim to the class label given the eval model |
|
context : list or str |
|
claim : list or str |
|
eval mode : {nli, bin, re, nli_sp, bin_sp or re_sp} |
|
nli - 3way head |
|
bin - 2way head |
|
re - regression head |
|
(sp - indicates whether to apply alignscore classification function chunk context and split claim into sentences |
|
otherwise it truncates the text and returns predicted class) |
|
DEFAULT: bin_sp |
|
Returns the class of {Contradict, Aligned} between context text and claim text |
|
- using 2way classification head |
|
""" |
|
def classify(self, context, claim, eval_mode = "bin_sp", **kwargs): |
|
eval_mode = eval_mode+"_cls" if ("cls" not in eval_mode) and ("class" not in eval_mode) else eval_mode |
|
scores = self.inferencer.nlg_eval(context, claim, eval_mode=eval_mode) |
|
return scores |
|
|
|
|
|
def forward(self, task_name = "3way", **kwargs): |
|
return self.taskmodels_dict[task_name](**kwargs) |
|
|
|
def __call__(self, task_name, **kwargs): |
|
return self.taskmodels_dict[task_name](**kwargs) |
|
|
|
|
|
def to(self, device, **kwargs): |
|
self.init_inferencer(device = device) |
|
return super().to(device) |
|
|
|
|
|
return self |
|
|
|
@classmethod |
|
def get_encoder_attr_name(cls, model): |
|
""" |
|
The encoder transformer is named differently in each model "architecture". |
|
This method lets us get the name of the encoder attribute |
|
""" |
|
model_class_name = model.__class__.__name__ |
|
if model_class_name.startswith("XLMRoberta"): |
|
return "roberta" |
|
else: |
|
raise KeyError(f"Add support for new model {model_class_name}") |
|
""" |
|
pretrained_model_name_or_path :str "krotima1/AlignScoreCS" // but it is possible to use another NLI model but specify load_specific_head to 3way |
|
- path to the directory of AlignScoreCS |
|
- or pass "build_new" to create new multitask AlignScore architecture. |
|
load_specific_head :str ["re", "bin", "3way"] or None // use this, and it will load only one architecture |
|
load_another_model |
|
""" |
|
@classmethod |
|
def from_pretrained( |
|
cls, |
|
pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], |
|
model_name : str = "xlm-roberta-large", |
|
load_specific_head = None, |
|
*model_args, |
|
config: Optional[Union[PretrainedConfig, str, os.PathLike]] = None, |
|
cache_dir: Optional[Union[str, os.PathLike]] = None, |
|
ignore_mismatched_sizes: bool = False, |
|
force_download: bool = False, |
|
local_files_only: bool = False, |
|
token: Optional[Union[str, bool]] = None, |
|
revision: str = "main", |
|
use_safetensors: bool = None, |
|
**kwargs, |
|
): |
|
|
|
architectures = [cls._3way_class_model, cls._regression_model, cls._binary_class_model] if load_specific_head is None else {"re": [cls._regression_model], "bin": [cls._binary_class_model], "3way" : [cls._3way_class_model]}[load_specific_head] |
|
is_3way_included = "3way" == load_specific_head or load_specific_head is None |
|
|
|
|
|
if all(os.path.exists(os.path.join(pretrained_model_name_or_path, model_dir)) for model_dir in architectures): |
|
|
|
|
|
transformers.logging.set_verbosity_error() |
|
|
|
shared_encoder = None |
|
taskmodels_dict = {} |
|
for path_name in tqdm(architectures, desc='DEPRECATED: Loading architectures from a local directory'): |
|
task_name = path_name.split("_")[0] |
|
|
|
|
|
task_config = transformers.XLMRobertaConfig.from_json_file("{}/{}/config.json".format(pretrained_model_name_or_path,path_name)) |
|
|
|
model = transformers.XLMRobertaForSequenceClassification.from_pretrained(model_name, config=task_config,*model_args,**kwargs) |
|
|
|
model.load_state_dict(torch.load("{}/{}/pytorch_model.bin".format(pretrained_model_name_or_path,path_name), map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu'))) |
|
|
|
if shared_encoder is None: |
|
shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model)) |
|
else: |
|
setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder) |
|
taskmodels_dict[task_name] = model |
|
|
|
|
|
return AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name) |
|
|
|
if all(os.path.exists(os.path.join(pretrained_model_name_or_path, {"3way_model" : "pytorch_model.bin"}.get(model_dir,model_dir))) for model_dir in architectures): |
|
shared_encoder = None |
|
taskmodels_dict = {} |
|
for path_name in tqdm(architectures, desc='Loading architectures from a local directory'): |
|
task_name = path_name.split("_")[0] |
|
load_path = pretrained_model_name_or_path if task_name == "3way" else "{}/{}".format(pretrained_model_name_or_path,path_name) |
|
task_config = transformers.XLMRobertaConfig.from_json_file("{}/config.json".format(load_path)) |
|
model = transformers.XLMRobertaForSequenceClassification.from_pretrained("{}".format(load_path), config=task_config,*model_args,**kwargs) |
|
if shared_encoder is None: |
|
shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model)) |
|
else: |
|
setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder) |
|
taskmodels_dict[task_name] = model |
|
return AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name) |
|
|
|
if pretrained_model_name_or_path == "build_new": |
|
shared_encoder = None |
|
taskmodels_dict = {} |
|
for path_name in tqdm([cls._3way_class_model, cls._regression_model, cls._binary_class_model], desc=f'Building new architectures from {model_name}'): |
|
task_name = path_name.split("_")[0] |
|
task_config = transformers.XLMRobertaConfig.from_pretrained(model_name) |
|
model = transformers.XLMRobertaForSequenceClassification.from_pretrained(model_name, config=task_config,*model_args,**kwargs) |
|
if shared_encoder is None: |
|
shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model)) |
|
else: |
|
setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder) |
|
taskmodels_dict[task_name] = model |
|
return AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name) |
|
|
|
|
|
shared_encoder = None |
|
taskmodels_dict = {} |
|
for model_dir in tqdm(architectures, desc='Loading from huggingface HUB'): |
|
task_name = model_dir.split("_")[0] |
|
subfolder = '' if task_name == "3way" else model_dir |
|
config = transformers.XLMRobertaConfig.from_pretrained(f"{pretrained_model_name_or_path}", subfolder=subfolder) |
|
model = transformers.XLMRobertaForSequenceClassification.from_pretrained(f"{pretrained_model_name_or_path}",config=config, subfolder=subfolder) |
|
if shared_encoder is None: |
|
shared_encoder = getattr(model, AlignScoreCS.get_encoder_attr_name(model)) |
|
else: |
|
setattr(model, AlignScoreCS.get_encoder_attr_name(model), shared_encoder) |
|
taskmodels_dict[task_name] = model |
|
alignScoreCS = AlignScoreCS(encoder=shared_encoder, taskmodels_dict=taskmodels_dict, model_name=model_name) |
|
return alignScoreCS |
|
|
|
""" |
|
This saves the architectures into the directory. Model with 3way head is in the main dir, while bin and reg are in subfolders (bin_model, re_model). |
|
""" |
|
def save_pretrained( |
|
self, |
|
save_directory: Union[str, os.PathLike], |
|
is_main_process: bool = True, |
|
state_dict: Optional[dict] = None, |
|
save_function: Callable = torch.save, |
|
push_to_hub: bool = False, |
|
max_shard_size: Union[int, str] = "10GB", |
|
safe_serialization: bool = False, |
|
variant: Optional[str] = None, |
|
token: Optional[Union[str, bool]] = None, |
|
save_peft_format: bool = True, |
|
**kwargs, |
|
): |
|
|
|
|
|
for task_name, model_type in self.taskmodels_dict.items(): |
|
final_directory = Path(save_directory,task_name+"_model") if task_name in ["re", "bin"] else Path(save_directory) |
|
model_type.save_pretrained(save_directory = final_directory, |
|
is_main_process = is_main_process, |
|
state_dict = state_dict, |
|
save_function = save_function, |
|
push_to_hub = push_to_hub, |
|
max_shard_size = max_shard_size, |
|
safe_serialization = safe_serialization, |
|
variant = variant, |
|
token = token, |
|
save_peft_format = save_peft_format, |
|
**kwargs) |
|
|
|
""" |
|
This piece of code is copied and modified from AlignScore github code from: https://github.com/yuh-zha/AlignScore/blob/main/src/alignscore/inference.py |
|
### eval_mode #### |
|
## sp ## means chunk context to roughly 300 tokens and split claim into sentneces then apply AlignScore function to get consistency score |
|
- nli_sp - ## DEFAULT ## returns consistency score of Aligned class from 3way head given context and claim using chunking |
|
- bin_sp - returns consistency score of Aligned class from 2way head given context and claim using chunking |
|
- re_sp - returns output of regression head given context and claim using chunking. |
|
## simple ## without chunking and splitting |
|
- nli - returns probability of Aligned class from 3way head given context and claim. |
|
- bin - returns probability of Aligned class from 2way head given context and claim. |
|
- re - returns output of regression head given context and claim. |
|
## sp_cls ## means chunk context to roughly 300 tokens and split claim into sentneces then apply AlignScore Classification function to get class |
|
- nli_sp_cls - returns class from 3way head given context and claim using chunking |
|
- bin_sp_cls - returns class from 2way head given context and claim using chunking |
|
- re_sp_cls - returns output from regression head given context and claim using chunking |
|
## simple ## without chunking and splitting |
|
- nli_cls - returns class of Aligned class from 3way head given context and claim. |
|
- bin_cls - returns class from 2way head given context and claim. |
|
- re_cls - returns output of regression head given context and claim. |
|
""" |
|
class InferenceHandler: |
|
def __init__(self, model, tokenizer, device = "cuda", batch_size = 32, verbose = False): |
|
|
|
self.input_evalmode_handler = {"3way_sp" : "nli_sp", "3way_sp_class" : "nli_sp_cls", "3way" : "nli", "3-way" : "nli", "3way_class" : "nli_cls", |
|
"2way_sp" : "bin_sp", "2way_sp_class" : "bin_sp_cls", "2way" : "bin", "2-way" : "bin", "2way_class" : "bin_cls", |
|
"reg_sp" : "re_sp", "reg_sp_class" : "re_sp_cls", "reg" : "re", "reg_class" : "re_cls"} |
|
self.taskname_handler = lambda eval_mode: "3way" if "nli" in eval_mode else ("bin" if "bin" in eval_mode else "re") |
|
|
|
self.nlg_eval_mode = "nli_sp" |
|
self.task_name = "3way" |
|
|
|
|
|
self.model = model |
|
self.device = device |
|
self.tokenizer = tokenizer |
|
|
|
self.model.eval() |
|
|
|
self.batch_size = batch_size |
|
self.verbose = verbose |
|
|
|
self.softmax = nn.Softmax(dim=-1) |
|
|
|
|
|
def nlg_eval(self, premise, hypo, eval_mode = "nli_sp", question = None): |
|
|
|
if isinstance(premise, str) and isinstance(hypo, str): |
|
premise = [premise] |
|
hypo = [hypo] |
|
if (isinstance(question,str)): |
|
question = [question] |
|
|
|
if question is None: |
|
question = [None]*len(premise) |
|
|
|
|
|
self.nlg_eval_mode = self.input_evalmode_handler.get(eval_mode, eval_mode) |
|
self.task_name = self.taskname_handler(self.nlg_eval_mode) |
|
assert self.nlg_eval_mode in set(self.input_evalmode_handler.values()), f"eval_mode is wrong {self.nlg_eval_mode}, use please : nli_sp or any other, look at the comments." |
|
|
|
if "sp" in self.nlg_eval_mode: |
|
return self.inference_example_batch(premise, hypo, question) |
|
elif "sp" not in self.nlg_eval_mode: |
|
return self.inference(premise, hypo) |
|
return None |
|
|
|
def inference_example_batch(self, premise: list, hypo: list, question : list): |
|
""" |
|
inference a example, |
|
premise: list |
|
hypo: list |
|
using self.inference to batch the process |
|
SummaC Style aggregation |
|
""" |
|
self.disable_progress_bar_in_inference = True |
|
assert len(premise) == len(hypo), "Premise must has the same length with Hypothesis!" |
|
|
|
out_score = [] |
|
for one_pre, one_hypo, one_quest in tqdm(zip(premise, hypo, question), desc="Evaluating", total=len(premise), disable=(not self.verbose)): |
|
out_score.append(self.inference_per_example(one_pre, one_hypo, one_quest)) |
|
|
|
return torch.tensor(out_score) |
|
|
|
def inference_per_example(self, premise:str, hypo: str, quest = None): |
|
""" |
|
inference a example, |
|
premise: string |
|
hypo: string |
|
using self.inference to batch the process |
|
""" |
|
def chunks(lst, n): |
|
"""Yield successive n-sized chunks from lst.""" |
|
for i in range(0, len(lst), n): |
|
yield ' '.join(lst[i:i + n]) |
|
|
|
premise_sents = sent_tokenize(premise) |
|
premise_sents = premise_sents or [''] |
|
|
|
n_chunk = len(premise.strip().split()) // 350 + 1 |
|
n_chunk = max(len(premise_sents) // n_chunk, 1) |
|
premise_sents = [each for each in chunks(premise_sents, n_chunk)] |
|
|
|
hypo_sents = sent_tokenize(hypo) |
|
|
|
|
|
if quest is not None: |
|
hypo_sents = [quest+" "+ sent for sent in hypo_sents] |
|
|
|
premise_sent_mat = [] |
|
hypo_sents_mat = [] |
|
for i in range(len(premise_sents)): |
|
for j in range(len(hypo_sents)): |
|
premise_sent_mat.append(premise_sents[i]) |
|
hypo_sents_mat.append(hypo_sents[j]) |
|
|
|
output_score = self.inference(premise_sent_mat, hypo_sents_mat) |
|
if "cls" in self.nlg_eval_mode: |
|
output_score = output_score.view(len(premise_sents), len(hypo_sents),-1).mean(1).mean(0).argmax().item() |
|
else: |
|
output_score = output_score.view(len(premise_sents), len(hypo_sents)).max(dim=0).values.mean().item() |
|
return output_score |
|
|
|
def inference(self, premise, hypo): |
|
""" |
|
inference a list of premise and hypo |
|
Standard aggregation |
|
""" |
|
if isinstance(premise, str) and isinstance(hypo, str): |
|
premise = [premise] |
|
hypo = [hypo] |
|
|
|
batch = self.batch_tokenize(premise, hypo) |
|
output_score = [] |
|
|
|
for mini_batch in tqdm(batch, desc="Evaluating", disable=not self.verbose or self.disable_progress_bar_in_inference): |
|
mini_batch = mini_batch.to(self.device) |
|
with torch.no_grad(): |
|
model_output = self.model.forward(task_name=self.task_name, **mini_batch) |
|
model_output = model_output.logits |
|
if self.task_name == "re": |
|
model_output = model_output.cpu() |
|
model_output = model_output[:,0] |
|
else: |
|
model_output = self.softmax(model_output).cpu() |
|
if "cls" in self.nlg_eval_mode: |
|
model_output = model_output |
|
if "sp" not in self.nlg_eval_mode: |
|
model_output = model_output.argmax(-1) |
|
else: |
|
model_output = model_output[:,1] |
|
|
|
output_score.append(model_output) |
|
output_score = torch.cat(output_score) |
|
|
|
return output_score |
|
|
|
def batch_tokenize(self, premise, hypo): |
|
""" |
|
input premise and hypos are lists |
|
""" |
|
assert isinstance(premise, list) and isinstance(hypo, list) |
|
assert len(premise) == len(hypo), "premise and hypo should be in the same length." |
|
|
|
batch = [] |
|
for mini_batch_pre, mini_batch_hypo in zip(self.chunks(premise, self.batch_size), self.chunks(hypo, self.batch_size)): |
|
try: |
|
mini_batch = self.tokenizer(mini_batch_pre, mini_batch_hypo, truncation='only_first', padding='max_length', max_length=self.tokenizer.model_max_length, return_tensors='pt') |
|
except: |
|
print('text_b too long...') |
|
mini_batch = self.tokenizer(mini_batch_pre, mini_batch_hypo, truncation=True, padding='max_length', max_length=self.tokenizer.model_max_length, return_tensors='pt') |
|
batch.append(mini_batch) |
|
|
|
return batch |
|
|
|
def chunks(self, lst, n): |
|
"""Yield successive n-sized chunks from lst.""" |
|
for i in range(0, len(lst), n): |
|
yield lst[i:i + n] |
|
|
|
def show_examples(self): |
|
self.to("cuda" if torch.cuda.is_available() else "cpu") |
|
contexts = ["Jaromír Jágr (68) střelil poslední gól sezóny do branky Dominika Haška. Davy šílely dokonce i po celém zápase."] |
|
claims = ["Dav šílel, když Jarda (68) střelil gól.", "Dav šílel, když Jarda (78) střelil gól.", "Dav šílel jen při zápase, když Jarda (68) střelil gól.", "Dominik Hašek nedokázal chytit poslední střelu od Jágra.", |
|
"Dominik Jágr (68) střelil poslední gól sezóny do branky Jaromíra Haška.", "Dominik Jágr (68) střelil poslední gól sezóny do branky Dominika Haška.", "Jaromír jágr nestřelil gól v sezóně.", |
|
"Davy šílely, když střelily gól do branky Dominika Haška.","Davy šílely, když davy střelily gól do branky Dominika Haška.", "Dav šílel. Jarda střelil gól.", "Dav šílel. Jarda nestřelil gól.", |
|
"Dneska odevzdávám diplomovou práci a koukám na hokej.", "Téma pojednává o hokeji", "Téma pojednává o baletu", "Dominik hašek je brankář", "Dominik hašek je útočník", "Jaromír Jágr je střelec", "Jaromír Jágr je hokejový útočník", |
|
"Jaromír Jágr je hokejový brankář", "Na utkání se dívaly davy lidí, které byly potichu.", "Na utkání se dívaly davy lidí, které šílely."] |
|
print("EXAMPLES:") |
|
print("context:",contexts[0]) |
|
print("SCORE: ", "claims:") |
|
for co, cl in zip(contexts*len(claims),claims): |
|
print(round(self.score(co,cl,eval_mode="nli_sp").tolist()[0],5),cl) |
|
print("EXAMPLES QA:") |
|
print("SCORE: ", "q-a pairs:") |
|
claims = [("Kdo střelil gól?", "Jaromír Jágr."), ("Kdo střelil gól?", "Domink Hašek."), ("Kdo nechytil střelu?", "Jaromír Jágr."), ("Kdo nechytil střelu?", "Domink Hašek.") |
|
, ("Jaký má číslo drezu Jaromír Jágr?", "Jaromír Jágr má číslo drezu 68."), ("Kolik je Jaromíru Jágrovi let?", "Jaromíru Jágrovi je 68."), ("Kolik je Jaromíru Jágrovi let?", "Jaromíru Jágrovi je 67.") |
|
, ("Co udělali lidi, když Jágr střelil gól?", "Lidi začali šílet. Dokonce šílely i po zápase."), ("Co udělali lidi, když Jágr střelil gól?", "Šli dát góla Haškovi")] |
|
for co,cl in zip(contexts*len(claims),claims): |
|
print(round(self.score(co, cl[1],eval_mode="nli_sp",eval_question_answer=cl[0] ).tolist()[0],5)," ".join(cl)) |