import json import os import numpy as np import pandas as pd from transformers import (AutoModel, AutoModelForMaskedLM, AutoTokenizer, RobertaModel, pipeline) class MLMTest(): def __init__(self, config_file="mlm_test_config.csv", full_text_file="mlm_full_text.csv", targeted_text_file="mlm_targeted_text.csv"): self.config_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), config_file)) self.config_df.fillna("", inplace=True) self.full_text_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), full_text_file)) self.targeted_text_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), targeted_text_file)) self.full_text_results = [] self.targeted_text_results = [] def _run_full_test_row(self, text, print_debug=False): return_data = [] data = text.split() for i in range(0, len(data)): masked_text = " ".join(data[:i]) + " "+self.nlp.tokenizer.mask_token+" " + " ".join(data[i+1:]) expected_result = data[i] result = self.nlp(masked_text) self.full_text_results.append({"text": masked_text, "result": result[0]["token_str"], "true_output": expected_result}) if print_debug: print(masked_text) print([x["token_str"] for x in result]) print("-"*20) return_data.append({"prediction": result[0]["token_str"], "true_output": expected_result}) return return_data def _run_targeted_test_row(self, text, expected_result, print_debug=False): return_data = [] result = self.nlp(text.replace("", self.nlp.tokenizer.mask_token)) self.targeted_text_results.append({"text": text, "result": result[0]["token_str"], "true_output": expected_result}) if print_debug: print(text) print([x["token_str"] for x in result]) print("-"*20) return_data.append({"prediction": result[0]["token_str"], "true_output": expected_result}) return return_data def _compute_acc(self, results): ctr = 0 for row in results: try: z = json.loads(row["true_output"]) if isinstance(z, list): if row["prediction"] in z: ctr+=1 except: if row["prediction"] == row["true_output"]: ctr+=1 return float(ctr/len(results)) def run_full_test(self, exclude_user_ids=[], print_debug=False): df = pd.DataFrame() for idx, row in self.config_df.iterrows(): self.full_text_results = [] model_name = row["model_name"] display_name = row["display_name"] if row["display_name"] else row["model_name"] revision = row["revision"] if row["revision"] else "main" from_flax = row["from_flax"] if from_flax: model = AutoModelForMaskedLM.from_pretrained(model_name, from_flax=True, revision=revision) tokenizer = AutoTokenizer.from_pretrained(model_name) tokenizer.save_pretrained('exported_pytorch_model') model.save_pretrained('exported_pytorch_model') self.nlp = pipeline('fill-mask', model="exported_pytorch_model") else: self.nlp = pipeline('fill-mask', model=model_name) accs = [] try: for idx, row in self.full_text_df.iterrows(): if row["user_id"] in exclude_user_ids: continue results = self._run_full_test_row(row["text"], print_debug=print_debug) acc = self._compute_acc(results) accs.append(acc) except: print("Error for", display_name) continue print(display_name, " Average acc:", sum(accs)/len(accs)) if df.empty: df = pd.DataFrame(self.full_text_results) df.rename(columns={"result": display_name}, inplace=True) else: preds = [x["result"] for x in self.full_text_results] df[display_name] = preds df.to_csv("full_text_results.csv", index=False) print("Results saved to full_text_results.csv") def run_targeted_test(self, exclude_user_ids=[], print_debug=False): df = pd.DataFrame() for idx, row in self.config_df.iterrows(): self.targeted_text_results = [] model_name = row["model_name"] display_name = row["display_name"] if row["display_name"] else row["model_name"] revision = row["revision"] if row["revision"] else "main" from_flax = row["from_flax"] if from_flax: model = AutoModelForMaskedLM.from_pretrained(model_name, from_flax=True, revision=revision) tokenizer = AutoTokenizer.from_pretrained(model_name) tokenizer.save_pretrained('exported_pytorch_model') model.save_pretrained('exported_pytorch_model') self.nlp = pipeline('fill-mask', model="exported_pytorch_model") else: self.nlp = pipeline('fill-mask', model=model_name) accs = [] try: for idx, row2 in self.targeted_text_df.iterrows(): if row2["user_id"] in exclude_user_ids: continue results = self._run_targeted_test_row(row2["text"], row2["output"], print_debug=print_debug) acc = self._compute_acc(results) accs.append(acc) except: import traceback print(traceback.format_exc()) print("Error for", display_name) continue print(display_name, " Average acc:", sum(accs)/len(accs)) if df.empty: df = pd.DataFrame(self.targeted_text_results) df.rename(columns={"result": display_name}, inplace=True) else: preds = [x["result"] for x in self.targeted_text_results] df[display_name] = preds df.to_csv("targeted_text_results.csv", index=False) print("Results saved to targeted_text_results.csv")