from pytorch_transformers import GPT2Tokenizer, GPT2LMHeadModel import torch import os import wget import numpy as np from fairseq.models.roberta import RobertaModel from joblib import load pretrained = os.path.join(os.path.dirname(__file__), "pretrained") ROBERTA_STS_PATH = pretrained + '/roBERTa_STS' ROBERTA_MNLI_PATH = pretrained + '/roBERTa_MNLI' AGGREGATOR_DIR = pretrained + '/aggregators/' AGGREGATOR_2015_2016 = \ pretrained + '/aggregators/nn_2015_2016_6_dim' \ '.joblib' AGGREGATOR_2015_2017 = \ pretrained + '/aggregators/nn_2015_2017_6_dim' \ '.joblib' AGGREGATOR_2015_2016_8_dim = \ pretrained + '/aggregators/nn_2015_2016_8_dim' \ '.joblib' AGGREGATOR_2015_2017_8_dim = \ pretrained + '/aggregators/nn_2015_2017_8_dim' \ '.joblib' ROBERTA_STS_URL = "https://nubia-nn.s3.amazonaws.com/" \ "neural-feature-extractors/checkpoint_best.pt" ROBERTA_MNLI_URL = "https://nubia-nn.s3.amazonaws.com/" \ "neural-feature-extractors/model_mnli.pt" AGGREGATOR_2015_2016_URL = "https://nubia-nn.s3.amazonaws.com/" \ "aggregators/nn_2015_2016_6_dim.joblib" AGGREGATOR_2015_2017_URL = "https://nubia-nn.s3.amazonaws.com/" \ "aggregators/nn_2015_2017_6_dim.joblib" AGGREGATOR_2015_2016_8_dim_URL = "https://nubia-nn.s3.amazonaws.com/" \ "aggregators/nn_2015_2016_8_dim.joblib" AGGREGATOR_2015_2017_8_dim_URL = "https://nubia-nn.s3.amazonaws.com/" \ "aggregators/nn_2015_2017_8_dim.joblib" class Nubia: def __init__(self): if not os.path.exists(AGGREGATOR_DIR): os.makedirs(AGGREGATOR_DIR) if not os.path.isfile(AGGREGATOR_2015_2016): print("Downloading aggregators from s3...") wget.download(AGGREGATOR_2015_2016_URL, AGGREGATOR_2015_2016) if not os.path.isfile(AGGREGATOR_2015_2017): print("\nDownloading aggregators from s3...") wget.download(AGGREGATOR_2015_2017_URL, AGGREGATOR_2015_2017) if not os.path.isfile(AGGREGATOR_2015_2016_8_dim): print("\nDownloading aggregators from s3...") wget.download(AGGREGATOR_2015_2016_8_dim_URL, AGGREGATOR_2015_2016_8_dim) if not os.path.isfile(AGGREGATOR_2015_2017_8_dim): print("\nDownloading aggregators from s3...") wget.download(AGGREGATOR_2015_2017_8_dim_URL, AGGREGATOR_2015_2017_8_dim) if not os.path.isfile(ROBERTA_STS_PATH + '/checkpoint_best.pt'): print("\nDownloading ROBERTA STS model from s3...") wget.download(ROBERTA_STS_URL, ROBERTA_STS_PATH + '/checkpoint_best.pt') if not os.path.isfile(ROBERTA_MNLI_PATH + '/model_mnli.pt'): print("\nDownloading ROBERTA MNLI model from s3...") wget.download(ROBERTA_MNLI_URL, ROBERTA_MNLI_PATH + '/model_mnli.pt') print('\n') self.roberta_STS = RobertaModel.from_pretrained( checkpoint_file='checkpoint_best.pt', model_name_or_path=ROBERTA_STS_PATH) self.roberta_STS.eval() self.roberta_MNLI = RobertaModel.from_pretrained( checkpoint_file='model_mnli.pt', model_name_or_path=ROBERTA_MNLI_PATH) self.roberta_MNLI.eval() self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2') self.gpt_model = GPT2LMHeadModel.from_pretrained('gpt2') self.agg_one = load(AGGREGATOR_2015_2016) self.agg_two = load(AGGREGATOR_2015_2017) self.agg_one_8_dim = load(AGGREGATOR_2015_2016_8_dim) self.agg_two_8_dim = load(AGGREGATOR_2015_2017_8_dim) @staticmethod def _download_progress_bar(current, total, width=80): print("Downloading: %d%% [%d / %d] bytes" % ( current / total * 100, current, total)) def _roberta_tokenizer(self, ref, hyp): tokens = self.roberta_STS.encode(ref, hyp) return tokens def _roberta_similarity(self, tokens): if len(tokens) > 512: tokens = tokens[:512] features = self.roberta_STS.extract_features(tokens) predicted_semantic_distance = 5.0 * \ self.roberta_STS.model.classification_heads['sentence_classification_head'](features) return predicted_semantic_distance def _roberta_mnli_all_values(self, tokens): if len(tokens) > 512: tokens = tokens[:512] prediction = self.roberta_MNLI.predict('mnli', tokens)[0].\ cpu().detach().numpy() return prediction def _gpt_score(self, text): tokenize_input = self.tokenizer.tokenize(text) tensor_input = torch.tensor([[self.tokenizer. eos_token_id] + self.tokenizer.convert_tokens_to_ids( tokenize_input)]) with torch.no_grad(): outputs = self.gpt_model(tensor_input, labels=tensor_input) loss, logits = outputs[:2] return loss def nubia(self, ref, hyp, get_features=False, six_dim=False, aggregator="agg_two", gpt_ref=None): tokens = self._roberta_tokenizer(ref, hyp) sim = float(self._roberta_similarity(tokens)[0]) mnli_zero, mnli_one, mnli_two = self._roberta_mnli_all_values(tokens) if not gpt_ref: gpt_ref = self._gpt_score(ref) gpt_hyp = self._gpt_score(hyp) else: gpt_hyp = gpt_ref len_ref = len(ref.split(" ")) len_hyp = len(hyp.split(" ")) mnli_friendly = torch.nn.functional.softmax( torch.tensor([mnli_zero, mnli_one, mnli_two]), dim=0).tolist() neural_features_6_dim = np.array( [float(sim), float(mnli_zero), float(mnli_one), float(mnli_two), float(gpt_ref), float(gpt_hyp)]) # 6 Neural Features neural_features_8_dim = np.array( [float(sim), float(mnli_zero), float(mnli_one), float(mnli_two), float(gpt_ref), float(gpt_hyp), float(len_ref), float(len_hyp)]) # 8 Neural Features if aggregator == "agg_one": if six_dim: nubia_metric = float(self.agg_one.predict( neural_features_6_dim.reshape(1, -1))[0]) else: nubia_metric = float(self.agg_one_8_dim.predict( neural_features_8_dim.reshape(1, -1))[0]) else: if six_dim: nubia_metric = float(self.agg_two.predict( neural_features_6_dim.reshape(1, -1))[0]) else: nubia_metric = float(self.agg_two_8_dim.predict( neural_features_8_dim.reshape(1, -1))[0]) if get_features: return {"nubia_score": nubia_metric, "features": { "semantic_relation": min(5.0, sim), "contradiction": mnli_friendly[0]*100, "irrelevancy": mnli_friendly[1]*100, "logical_agreement": mnli_friendly[2]*100, "grammar_ref": gpt_ref.item(), "grammar_hyp": gpt_hyp.item(), } }, gpt_ref return nubia_metric, gpt_ref def score(self, ref, hyp, verbose=False, get_features=False, six_dim=False, aggregator="agg_two"): if not ref or not hyp: if get_features: return {"nubia_score": 0, "features": { "semantic_relation": 0, "contradiction": 0, "irrelevancy": 0, "logical_agreement": 0, "grammar_ref": 0, "grammar_hyp": 0, } } return 0 nubia, gpt_ref = self.nubia(ref, hyp, get_features=True, six_dim=six_dim, aggregator=aggregator) self_similarity, _ = self.nubia(ref, ref, get_features=False, six_dim=six_dim, aggregator=aggregator, gpt_ref=gpt_ref) amplitude = abs(self_similarity) + 1 difference = self_similarity - nubia["nubia_score"] calibrated = 1.0 - (float(difference) / float(amplitude)) if calibrated > 0: calibrated = min(1.0, calibrated) else: calibrated = max(0.0, calibrated) if verbose: print("Semantic relation: " + str(min(5.0, nubia["features"]["semantic_relation"])) + '/5.0') print("Percent chance of contradiction: " + str(nubia["features"]["contradiction"]) + "%") print("Percent chance of irrelevancy or new information: " + str(nubia["features"]["irrelevancy"]) + "%") print("Percent chance of logical agreement: " + str(nubia["features"]["logical_agreement"]) + "%\n") print("NUBIA score: " + str(calibrated) + "/1.0") nubia["nubia_score"] = calibrated if get_features: return nubia return calibrated