Spaces:
Build error
Build error
""" | |
rg_utils load helpers methods from python | |
""" | |
import pandas as pd | |
import re | |
import robustnessgym as rg | |
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score | |
def update_pred(dp, model, dp_only=False): | |
""" Updating data panel with model prediction""" | |
model.predict_batch(dp, ["sentence"]) | |
dp = dp.update( | |
lambda x: model.predict_batch(x, ["sentence"]), | |
batch_size=4, | |
is_batched_fn=True, | |
pbar=True, | |
) | |
if dp_only: | |
return dp | |
labels = pd.Series(["Negative Sentiment", "Positive Sentiment"]) | |
probs = pd.Series(dp.__dict__["_data"]["probs"][0]) | |
pred = pd.concat([labels, probs], axis=1) | |
pred.columns = ["Label", "Probability"] | |
return (dp, pred) | |
def remove_slice(bench, slice_name="user_data"): | |
""" Remove a slice from the rg dev bench""" | |
# slices and identifiers are in the same order | |
slice_list = [] | |
slice_identifier = [] | |
for i in bench.__dict__["_slices"]: | |
# look-up the term | |
name = str(i.__dict__["_identifier"]) | |
if not re.search("new_words", name): | |
slice_list = slice_list + [i] | |
slice_identifier = slice_identifier + [name] | |
# metrics put datain a different order | |
metrics = {} | |
for key in bench.metrics["model"].keys(): | |
if not re.search("new_words", key): | |
metrics[key] = bench.metrics["model"][key] | |
# slice table, repeat for sanity check | |
# slice_table = {} | |
# for key in bench.__dict__["_slice_table"].keys(): | |
# key = str(key) | |
# if not re.search("new_words",key): | |
# slice_table[key] = bench.__dict__["_slice_table"][key] | |
bench.__dict__["_slices"] = set(slice_list) | |
bench.__dict__["_slice_identifiers"] = set(slice_identifier) | |
# bench.__dict__["_slice_table"] = set(slice_identifier) | |
bench.metrics["model"] = metrics | |
return bench | |
def add_slice(bench, table, model, slice_name="user_data"): | |
""" Adds a custom slice to RG """ | |
# do it this way or it complains | |
dp = rg.DataPanel( | |
{ | |
"sentence": table["sentence"].tolist(), | |
"label": table["label"].tolist(), | |
"pred": table["pred"].tolist(), | |
} | |
) | |
# dp._identifier = slice_name | |
# get prediction | |
# add to bench | |
# bench.add_slices([dp]) | |
return dp | |
def new_bench(): | |
""" Create new rg dev bench""" | |
bench = rg.DevBench() | |
bench.add_aggregators( | |
{ | |
# Every model can be associated with custom metric calculation functions | |
#'distilbert-base-uncased-finetuned-sst-2-english': { | |
"model": { | |
# This function uses the predictions we stored earlier to calculate accuracy | |
#'accuracy': lambda dp: (dp['label'].round() == dp['pred'].numpy()).mean() | |
#'f1' : lambda dp: f1_score(dp['label'].round(),dp['pred'],average='macro',zero_division=1), | |
"recall": lambda dp: recall_score( | |
dp["label"].round(), dp["pred"], average="macro", zero_division=1 | |
), | |
"precision": lambda dp: precision_score( | |
dp["label"].round(), dp["pred"], average="macro", zero_division=1 | |
), | |
"accuracy": lambda dp: accuracy_score(dp["label"].round(), dp["pred"]), | |
} | |
} | |
) | |
return bench | |
def get_sliceid(slices): | |
""" Because RG stores data in a silly way""" | |
ids = [] | |
for slice in list(slices): | |
ids = ids + [slice._identifier] | |
return ids | |
def get_sliceidx(slice_ids,name): | |
""" get the index from an rg slice""" | |
if name == "xyz_train": | |
idx = [i for i, elem in enumerate(slice_ids) if ("split=train" in str(elem)) ] #and len(str(elem).split("->")) == 1)] | |
elif name == "xyz_test": | |
idx = [i for i, elem in enumerate(slice_ids) if ("split=test" in str(elem)) ] #and len(str(elem).split("->")) == 1)] | |
else: | |
idx = [i for i, elem in enumerate(slice_ids) if name in str(elem)] | |
return idx[0] | |
def get_prob(x,i): | |
""" Helper to get probability""" | |
return(float(x[i])) | |
def slice_to_df(data): | |
""" Convert slice to dataframe""" | |
df = pd.DataFrame( | |
{ | |
"sentence": list(data["sentence"]), | |
"model label": ["Positive Sentiment" if int(round(x)) == 1 else "Negative Sentiment" for x in data["label"]], | |
"model binary": [int(round(x)) for x in data["label"]], | |
} | |
) | |
prob = [] | |
for i in range(0, len(data['probs'])): | |
prob.append(get_prob(data['probs'][i],df["model binary"][i])) | |
df["probability"] = prob | |
return df | |
def metrics_to_dict(metrics, slice_name): | |
""" Convert metrics to dataframe""" | |
all_metrics = {slice_name: {}} | |
all_metrics[slice_name]["metrics"] = metrics[slice_name] | |
all_metrics[slice_name]["source"] = "Custom Slice" | |
return all_metrics | |