hassiahk's picture
Model changes and code formatting
666b7aa
import json
import os
import numpy as np
import pandas as pd
from transformers import (AutoModel, AutoModelForMaskedLM, AutoTokenizer,
RobertaModel, pipeline)
class MLMTest():
def __init__(self, config_file="mlm_test_config.csv", full_text_file="mlm_full_text.csv", targeted_text_file="mlm_targeted_text.csv"):
self.config_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), config_file))
self.config_df.fillna("", inplace=True)
self.full_text_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), full_text_file))
self.targeted_text_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), targeted_text_file))
self.full_text_results = []
self.targeted_text_results = []
def _run_full_test_row(self, text, print_debug=False):
return_data = []
data = text.split()
for i in range(0, len(data)):
masked_text = " ".join(data[:i]) + " "+self.nlp.tokenizer.mask_token+" " + " ".join(data[i+1:])
expected_result = data[i]
result = self.nlp(masked_text)
self.full_text_results.append({"text": masked_text, "result": result[0]["token_str"], "true_output": expected_result})
if print_debug:
print(masked_text)
print([x["token_str"] for x in result])
print("-"*20)
return_data.append({"prediction": result[0]["token_str"], "true_output": expected_result})
return return_data
def _run_targeted_test_row(self, text, expected_result, print_debug=False):
return_data = []
result = self.nlp(text.replace("<mask>", self.nlp.tokenizer.mask_token))
self.targeted_text_results.append({"text": text, "result": result[0]["token_str"], "true_output": expected_result})
if print_debug:
print(text)
print([x["token_str"] for x in result])
print("-"*20)
return_data.append({"prediction": result[0]["token_str"], "true_output": expected_result})
return return_data
def _compute_acc(self, results):
ctr = 0
for row in results:
try:
z = json.loads(row["true_output"])
if isinstance(z, list):
if row["prediction"] in z:
ctr+=1
except:
if row["prediction"] == row["true_output"]:
ctr+=1
return float(ctr/len(results))
def run_full_test(self, exclude_user_ids=[], print_debug=False):
df = pd.DataFrame()
for idx, row in self.config_df.iterrows():
self.full_text_results = []
model_name = row["model_name"]
display_name = row["display_name"] if row["display_name"] else row["model_name"]
revision = row["revision"] if row["revision"] else "main"
from_flax = row["from_flax"]
if from_flax:
model = AutoModelForMaskedLM.from_pretrained(model_name, from_flax=True, revision=revision)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.save_pretrained('exported_pytorch_model')
model.save_pretrained('exported_pytorch_model')
self.nlp = pipeline('fill-mask', model="exported_pytorch_model")
else:
self.nlp = pipeline('fill-mask', model=model_name)
accs = []
try:
for idx, row in self.full_text_df.iterrows():
if row["user_id"] in exclude_user_ids:
continue
results = self._run_full_test_row(row["text"], print_debug=print_debug)
acc = self._compute_acc(results)
accs.append(acc)
except:
print("Error for", display_name)
continue
print(display_name, " Average acc:", sum(accs)/len(accs))
if df.empty:
df = pd.DataFrame(self.full_text_results)
df.rename(columns={"result": display_name}, inplace=True)
else:
preds = [x["result"] for x in self.full_text_results]
df[display_name] = preds
df.to_csv("full_text_results.csv", index=False)
print("Results saved to full_text_results.csv")
def run_targeted_test(self, exclude_user_ids=[], print_debug=False):
df = pd.DataFrame()
for idx, row in self.config_df.iterrows():
self.targeted_text_results = []
model_name = row["model_name"]
display_name = row["display_name"] if row["display_name"] else row["model_name"]
revision = row["revision"] if row["revision"] else "main"
from_flax = row["from_flax"]
if from_flax:
model = AutoModelForMaskedLM.from_pretrained(model_name, from_flax=True, revision=revision)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.save_pretrained('exported_pytorch_model')
model.save_pretrained('exported_pytorch_model')
self.nlp = pipeline('fill-mask', model="exported_pytorch_model")
else:
self.nlp = pipeline('fill-mask', model=model_name)
accs = []
try:
for idx, row2 in self.targeted_text_df.iterrows():
if row2["user_id"] in exclude_user_ids:
continue
results = self._run_targeted_test_row(row2["text"], row2["output"], print_debug=print_debug)
acc = self._compute_acc(results)
accs.append(acc)
except:
import traceback
print(traceback.format_exc())
print("Error for", display_name)
continue
print(display_name, " Average acc:", sum(accs)/len(accs))
if df.empty:
df = pd.DataFrame(self.targeted_text_results)
df.rename(columns={"result": display_name}, inplace=True)
else:
preds = [x["result"] for x in self.targeted_text_results]
df[display_name] = preds
df.to_csv("targeted_text_results.csv", index=False)
print("Results saved to targeted_text_results.csv")