nlc-explorer / NLselector.py
Nathan Butters
attempt to remediate
401217e
raw
history blame
11.5 kB
#Import the libraries we know we'll need for the Generator.
import pandas as pd, spacy, nltk, numpy as np, re
from spacy.matcher import Matcher
nlp = spacy.load("en_core_web_lg")
import altair as alt
import streamlit as st
from annotated_text import annotated_text as ant
#Import the libraries to support the model and predictions.
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TextClassificationPipeline
import lime
import torch
import torch.nn.functional as F
from lime.lime_text import LimeTextExplainer
#Import WNgen.py
from WNgen import *
class_names = ['negative', 'positive']
explainer = LimeTextExplainer(class_names=class_names)
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
pipe = TextClassificationPipeline(model=model, tokenizer=tokenizer, return_all_scores=True)
def predictor(texts):
outputs = model(**tokenizer(texts, return_tensors="pt", padding=True))
probas = F.softmax(outputs.logits, dim=1).detach().numpy()
return probas
@st.experimental_singleton
def critical_words(document, options=False):
'''This function is meant to select the critical part of a sentence. Critical, in this context means
the part of the sentence that is either: A) a NOUN or PROPN from the correct entity group, B) a NOUN,
C) a NOUN + ADJ combination, or D) ADJ and PROPN used to modify other NOUN tokens.
It also checks this against what the model thinks is important if the user defines "options" as "LIME" or True.'''
if type(document) is not spacy.tokens.doc.Doc:
document = nlp(document)
chunks = list(document.noun_chunks)
pos_options = []
lime_options = []
#Identify what the model cares about.
if options:
#Run Lime Setup code
exp = explainer.explain_instance(document.text, predictor, num_features=15, num_samples=2000)
lime_results = exp.as_list()
for feature in lime_results:
lime_options.append(feature[0])
lime_results = pd.DataFrame(lime_results, columns=["Word","Weight"])
#Identify what we care about "parts of speech"
# Here I am going to try to pick up pronouns, which are people, and Adjectival Compliments.
for token in document:
if (token.text not in pos_options) and ((token.text in lime_options) or (options == False)):
#print(f"executed {token.text} with {token.pos_} and {token.dep_}") #QA
if (token.pos_ in ["ADJ","PROPN"]) and (token.dep_ in ["compound", "amod"]) and (document[token.i - 1].dep_ in ["compound", "amod"]):
compound = document[token.i - 1: token.i +1].text
pos_options.append(compound)
print(f'Added {compound} based on "amod" and "compound" adjectives.')
elif (token.pos_ in ["NOUN"]) and (token.dep_ in ["compound", "amod", "conj"]) and (document[token.i - 1].dep_ in ["compound"]):
compound = document[token.i - 1: token.i +1].text
pos_options.append(compound)
print(f'Added {compound} based on "amod" and "compound" and "conj" nouns.')
elif (token.pos_ == "PROPN") and (token.dep_ in ["prep","amod"]):
pos_options.append(token.text)
print(f"Added '{token.text}' based on their adjectival state.")
elif (token.pos_ == "ADJ") and (token.dep_ in ["acomp","conj","amod"]):
pos_options.append(token.text)
print(f"Added '{token.text}' based on their adjectival state.")
elif (token.pos_ == "PRON") and (len(token.morph) !=0):
if (token.morph.get("PronType") == "Prs"):
pos_options.append(token.text)
print(f"Added '{token.text}' because it's a human pronoun.")
#Noun Chunks parsing
for chunk in chunks:
#The use of chunk[-1] is due to testing that it appears to always match the root
root = chunk[-1]
#This currently matches to a list I've created. I don't know the best way to deal with this so I'm leaving it as is for the moment.
if root.ent_type_:
cur_values = []
if (len(chunk) > 1) and (chunk[-2].dep_ == "compound"):
#creates the compound element of the noun
compound = [x.text for x in chunk if x.dep_ == "compound"]
print(f"This is the contents of {compound} and it is {all(elem in lime_options for elem in compound)} that all elements are present in {lime_options}.") #for QA
#checks to see all elements in the compound are important to the model or use the compound if not checking importance.
if (all(elem in lime_options for elem in cur_values) and (options is True)) or ((options is False)):
#creates a span for the entirety of the compound noun and adds it to the list.
span = -1 * (1 + len(compound))
pos_options.append(chunk[span:].text)
cur_values + [token.text for token in chunk if token.pos_ in ["ADJ","NOUN","PROPN"]]
else:
print(f"The elmenents in {compound} could not be added to the final list because they are not all relevant to the model.")
else:
cur_values = [token.text for token in chunk if (token.ent_type_) or (token.pos_ == "ADJ")]
if (all(elem in lime_options for elem in cur_values) and (options is True)) or ((options is False)):
pos_options.extend(cur_values)
print(f"From {chunk.text}, {cur_values} added to pos_options due to entity recognition.") #for QA
elif len(chunk) >= 1:
cur_values = [token.text for token in chunk if token.pos_ in ["NOUN","ADJ","PROPN"]]
if (all(elem in lime_options for elem in cur_values) and (options is True)) or ((options is False)):
pos_options.extend(cur_values)
print(f"From {chunk.text}, {cur_values} added to pos_options due to wildcard.") #for QA
else:
print(f"No options added for \'{chunk.text}\' ")
pos_options = list(set(pos_options))
if options:
return pos_options, lime_results
else:
return pos_options
# Return the Viz of elements critical to LIME.
def lime_viz(df):
if not isinstance(df, pd.DataFrame):
df = pd.DataFrame(df, columns=["Word","Weight"])
single_nearest = alt.selection_single(on='mouseover', nearest=True)
viz = alt.Chart(df).encode(
alt.X('Weight:Q', scale=alt.Scale(domain=(-1, 1))),
alt.Y('Word:N', sort='x', axis=None),
color=alt.Color("Weight", scale=alt.Scale(scheme='blueorange', domain=[0], type="threshold", range='diverging'), legend=None),
tooltip = ("Word","Weight")
).mark_bar().properties(title ="Importance of individual words")
text = viz.mark_text(
fill="black",
align='right',
baseline='middle'
).encode(
text='Word:N'
)
limeplot = alt.LayerChart(layer=[viz,text], width = 300).configure_axis(grid=False).configure_view(strokeWidth=0)
return limeplot
# Evaluate Predictions using the model and pipe.
def eval_pred(text, return_all = False):
'''A basic function for evaluating the prediction from the model and turning it into a visualization friendly number.'''
preds = pipe(text)
neg_score = -1 * preds[0][0]['score']
sent_neg = preds[0][0]['label']
pos_score = preds[0][1]['score']
sent_pos = preds[0][1]['label']
prediction = 0
sentiment = ''
if pos_score > abs(neg_score):
prediction = pos_score
sentiment = sent_pos
elif abs(neg_score) > pos_score:
prediction = neg_score
sentiment = sent_neg
if return_all:
return prediction, sentiment
else:
return prediction
def construct_nlexp(text,sentiment,probability):
prob = str(np.round(100 * abs(probability),2))
if sentiment == "NEGATIVE":
color_sent = ant('The model predicts the sentiment of the sentence you provided is ', (sentiment, "-", "#FFA44F"), ' with a probability of ', (prob, "neg", "#FFA44F"),"%.")
elif sentiment == "POSITIVE":
color_sent = ant('The model predicts the sentiment of the sentence you provided is ', (sentiment, "+", "#50A9FF"), ' with a probability of ', (prob, "pos", "#50A9FF"),"%.")
return color_sent
def get_min_max(df, seed):
'''This function provides the alternatives with the highest spaCy similarity scores and the lowest similarity scores. As similarity is based on vectorization of words and documents this may not be the best way to identify bias.
text2 = Most Similar
text3 = Least Similar'''
maximum = df[df['similarity'] < .9999].similarity.max()
text2 = df.loc[df['similarity'] == maximum, 'text'].iloc[0]
minimum = df[df['similarity'] > .0001].similarity.min()
text3 = df.loc[df['similarity'] == minimum, 'text'].iloc[0]
return text2, text3
# Inspired by https://stackoverflow.com/questions/17758023/return-rows-in-a-dataframe-closest-to-a-user-defined-number/17758115#17758115
def abs_dif(df,seed):
'''This function enables a user to identify the alternative that is closest to the seed and farthest from the seed should that be the what they wish to display.
text2 = Nearest Prediction
text3 = Farthest Prediction'''
seed = process_text(seed)
target = df[df['Words'] == seed].pred.iloc[0]
sub_df = df[df['Words'] != seed].reset_index()
nearest_prediction = sub_df.pred[(sub_df.pred-target).abs().argsort()[:1]]
farthest_prediction = sub_df.pred[(sub_df.pred-target).abs().argsort()[-1:]]
text2 = sub_df.text.iloc[nearest_prediction.index[0]]
text3 = sub_df.text.iloc[farthest_prediction.index[0]]
return text2, text3
#@st.experimental_singleton #I've enabled this to prevent it from triggering every time the code runs... which could get very messy
def sampled_alts(df, seed, fixed=False):
'''This function enables a user to select an alternate way of choosing which counterfactuals are shown for MultiNLC, MultiNLC + Lime, and VizNLC. If you use this then you are enabling random sampling over other options (ex. spaCy similarity scores, or absolute difference).
Both samples are random.'''
sub_df = df[df['Words'] != seed]
if fixed:
sample = sub_df.sample(n=2, random_state = 2052)
else:
sample = sub_df.sample(n=2)
text2 = sample.text.iloc[0]
text3 = sample.text.iloc[1]
return text2, text3
def gen_cf_country(df,_document,selection):
df['text'] = df.Words.apply(lambda x: re.sub(r'\b'+selection+r'\b',x,_document.text))
df['pred'] = df.text.apply(eval_pred)
df['seed'] = df.Words.apply(lambda x: 'seed' if x == selection else 'alternative')
df['similarity'] = df.Words.apply(lambda x: nlp(selection).similarity(nlp(x)))
return df
def gen_cf_profession(df,_document,selection):
category = df.loc[df['Words'] == selection, 'Major'].iloc[0]
df = df[df.Major == category]
df['text'] = df.Words.apply(lambda x: re.sub(r'\b'+selection+r'\b',x,_document.text))
df['pred'] = df.text.apply(eval_pred)
df['seed'] = df.Words.apply(lambda x: 'seed' if x == selection else 'alternative')
df['similarity'] = df.Words.apply(lambda x: nlp(selection).similarity(nlp(x)))
return df