PR-ENT_Dashboard / helpers.py
PR ENT
Push dashboard
88d8172
raw history blame
No virus
8.26 kB
import streamlit as st
import pandas as pd
import numpy as np
from nltk.tokenize import sent_tokenize
# Split the text into sentences. Necessary for NLI models
def split_sentences(text):
return sent_tokenize(text)
###### Prompting
def query_model_prompting(model, text, prompt_with_mask, top_k, targets):
"""Query the prompting model
:param model: Prompting model object
:type model: Huggingface pipeline object
:param text: Event description (context)
:type text: str
:param prompt_with_mask: Prompt with a mask
:type prompt_with_mask: str
:param top_k: Number of tokens to output
:type top_k: integer
:param targets: Restrict the answer to these possible tokens
:type targets: list
:return: Results of the prompting model
:rtype: list of dict
"""
sequence = text + prompt_with_mask
output_tokens = model(sequence, top_k=top_k, targets=targets)
return output_tokens
def do_sentence_entailment(sentence, hypothesis, model):
"""Concatenate context and hypothesis then perform entailment
:param sentence: Event description (context), 1 sentence
:type sentence: str
:param hypothesis: Mask filled with a token
:type hypothesis: str
:param model: NLI Model
:type model: Huggingface pipeline
:return: DataFrame containing the result of the entailment
:rtype: pandas DataFrame
"""
text = sentence + '</s></s>' + hypothesis
res = model(text, return_all_scores=True)
df_res = pd.DataFrame(res[0])
df_res['label'] = df_res['label'].apply(lambda x: x.lower())
df_res.columns = ["Label", "Score"]
return df_res
def softmax(x):
"""Compute softmax values for each sets of scores in x."""
return np.exp(x) / np.sum(np.exp(x), axis=0)
######### NLI + PROMPTING
def do_text_entailment(text, hypothesis, model):
"""
Do entailment for each sentence of the event description as
model was trained on sentence pair
:param text: Event Description (context)
:type text: str
:param hypothesis: Mask filled with a token
:type hypothesis: str
:param model: Model NLI
:type model: Huggingface pipeline
:return: List of entailment results for each sentence of the text
:rtype: list
"""
text_entailment_results = []
for i, sentence in enumerate(split_sentences(text)):
df_score = do_sentence_entailment(sentence, hypothesis, model)
text_entailment_results.append((sentence, hypothesis, df_score))
return text_entailment_results
def get_true_entailment(text_entailment_results, nli_limit):
"""
From the result of each sentence entailment, extract the maximum entailment score and
check if it's higher than the entailment threshold.
"""
true_hypothesis_list = []
max_score = 0
for sentence_entailment in text_entailment_results:
df_score = sentence_entailment[2]
score = df_score[df_score["Label"] == 'entailment']["Score"].values.max()
if score > max_score:
max_score = score
if max_score > nli_limit:
true_hypothesis_list.append((sentence_entailment[1], np.round(max_score,2)))
return list(set(true_hypothesis_list))
def prompt_to_nli(text, prompt, model_prompting, nli_model, nlp, top_k=10, nli_limit=0.5, remove_lemma=False):
"""
Apply the PR-ENT pipeline
:param text: Event description
:type text: str
:param prompt: Prompt with mask
:type prompt: str
:param model_prompting: Prompting Model
:type model_prompting: Huggingface pipeline
:param nli_model: NLI Model
:type nli_model: Huggingface pipeline
:param top_k: Number of words output by the prompting model
:type top_k: int
:param nli_limit: Entailment threshold
:type nli_limit: float
:return: Results of the pipeline
:rtype: list
"""
prompt_masked = prompt.format(model_prompting.tokenizer.mask_token)
label = []
output_prompting = query_model_prompting(model_prompting, text, prompt_masked, top_k, targets=None)
if remove_lemma:
output_prompting = filter_prompt_output_by_lemma(prompt, output_prompting, nlp)
for token in output_prompting:
hypothesis = prompt.format(token['token_str'])
text_entailment_results = do_text_entailment(text, hypothesis, nli_model)
true_hypothesis_list = get_true_entailment(text_entailment_results, nli_limit)
if len(true_hypothesis_list) > 0:
label.append(((token['token_str'], token['score']), true_hypothesis_list[0]))
return label
def display_nli_pr_results_as_list(title, list_results):
"""
Display the list of entailment results as a streamlit choice list
"""
st.markdown(
"""
<style>
span[data-baseweb="tag"] {
background-color: red !important;
}
</style>
""",
unsafe_allow_html=True,
)
prompt_list = st.multiselect(
title,
list_results
,
list_results, key='results_mix')
##### QA
def question_answering(model, text, questions_list, to_print=True):
"""
Apply question answering model
:param model: QA Model
:type model: Huggingface pipeline
:param text: Event description (context)
:type text: str
:param question: Question to answer
:type question: str
:return: Tuple containing the answer and the confidence score
:rtype: tuple
"""
for question in questions_list:
QA_input = {
'question': question,
'context': text}
res = model(QA_input, handle_impossible_answer=False)
if to_print:
st.write("Question: {}".format(question))
st.write("Answer: {}".format(res["answer"]))
return res["answer"], res["score"]
### Prompt + NLI + QA
def get_who_what_whom_qa(text, tokens, model_qa):
who_what_whom = []
if not tokens:
res_dict = {"Actor":'', "Action":'', "Target": ''}
st.write("No entailed tokens.")
else:
for token in tokens:
# res_dict = {"who":'', "did_what":token, "to_whom": '', "qa_score": []}
res_dict = {"Actor":'', "Action":token, "Target": ''}
if token[-3:] == 'ing':
perp,score_p = question_answering(model_qa, text, ["Who was {}?".format(token)], to_print=False)
else:
perp,score_p = question_answering(model_qa, text, ["Who {} people?".format(token)], to_print=False)
if perp:
res_dict["Actor"] = perp + ' [' + str(np.round(score_p*100,1)) + '%]'
else:
res_dict["Actor"] = 'N/A' + ' [' + str(np.round(score_p*100,1)) + '%]'
victim,score_v = question_answering(model_qa, text, ["Who was {}?".format(token)], to_print=False)
if victim:
res_dict["Target"] = victim + ' [' + str(np.round(score_v*100,1)) + '%]'
else:
res_dict["Target"] = 'N/A' + ' [' + str(np.round(score_v*100,1)) + '%]'
who_what_whom.append(res_dict)
return who_what_whom
def remove_similar_lemma_from_list(prompt, list_words, nlp):
## Compute a dictionnary with the lemma for all tokens
## If there is a duplicate lemma then the dictionnary value will be a list of the corresponding tokens
lemma_dict = {}
for each in list_words:
mask_filled = nlp(prompt.strip('.').format(each))
lemma_dict.setdefault([x.lemma_ for x in mask_filled][-1],[]).append(each)
## Get back the list of tokens
## If multiple tokens available then take the shortest one
new_token_list = []
for key in lemma_dict.keys():
if len(lemma_dict[key]) >= 1:
new_token_list.append(min(lemma_dict[key], key=len))
else:
raise ValueError("Lemma dict has 0 corresponding words")
return new_token_list
def filter_prompt_output_by_lemma(prompt, output_prompting, nlp):
"""
Remove all similar lemmas from the prompt output (e.g. "protest", "protests")
"""
list_words = [x['token_str'] for x in output_prompting]
new_token_list = remove_similar_lemma_from_list(prompt, list_words, nlp)
return [x for x in output_prompting if x['token_str'] in new_token_list]