import streamlit as st import pandas as pd import numpy as np from nltk.tokenize import sent_tokenize # Split the text into sentences. Necessary for NLI models def split_sentences(text): return sent_tokenize(text) ###### Prompting def query_model_prompting(model, text, prompt_with_mask, top_k, targets): """Query the prompting model :param model: Prompting model object :type model: Huggingface pipeline object :param text: Event description (context) :type text: str :param prompt_with_mask: Prompt with a mask :type prompt_with_mask: str :param top_k: Number of tokens to output :type top_k: integer :param targets: Restrict the answer to these possible tokens :type targets: list :return: Results of the prompting model :rtype: list of dict """ sequence = text + prompt_with_mask output_tokens = model(sequence, top_k=top_k, targets=targets) return output_tokens def do_sentence_entailment(sentence, hypothesis, model): """Concatenate context and hypothesis then perform entailment :param sentence: Event description (context), 1 sentence :type sentence: str :param hypothesis: Mask filled with a token :type hypothesis: str :param model: NLI Model :type model: Huggingface pipeline :return: DataFrame containing the result of the entailment :rtype: pandas DataFrame """ text = sentence + '' + hypothesis res = model(text, return_all_scores=True) df_res = pd.DataFrame(res[0]) df_res['label'] = df_res['label'].apply(lambda x: x.lower()) df_res.columns = ["Label", "Score"] return df_res def softmax(x): """Compute softmax values for each sets of scores in x.""" return np.exp(x) / np.sum(np.exp(x), axis=0) ######### NLI + PROMPTING def do_text_entailment(text, hypothesis, model): """ Do entailment for each sentence of the event description as model was trained on sentence pair :param text: Event Description (context) :type text: str :param hypothesis: Mask filled with a token :type hypothesis: str :param model: Model NLI :type model: Huggingface pipeline :return: List of entailment results for each sentence of the text :rtype: list """ text_entailment_results = [] for i, sentence in enumerate(split_sentences(text)): df_score = do_sentence_entailment(sentence, hypothesis, model) text_entailment_results.append((sentence, hypothesis, df_score)) return text_entailment_results def get_true_entailment(text_entailment_results, nli_limit): """ From the result of each sentence entailment, extract the maximum entailment score and check if it's higher than the entailment threshold. """ true_hypothesis_list = [] max_score = 0 for sentence_entailment in text_entailment_results: df_score = sentence_entailment[2] score = df_score[df_score["Label"] == 'entailment']["Score"].values.max() if score > max_score: max_score = score if max_score > nli_limit: true_hypothesis_list.append((sentence_entailment[1], np.round(max_score,2))) return list(set(true_hypothesis_list)) def prompt_to_nli(text, prompt, model_prompting, nli_model, nlp, top_k=10, nli_limit=0.5, remove_lemma=False): """ Apply the PR-ENT pipeline :param text: Event description :type text: str :param prompt: Prompt with mask :type prompt: str :param model_prompting: Prompting Model :type model_prompting: Huggingface pipeline :param nli_model: NLI Model :type nli_model: Huggingface pipeline :param top_k: Number of words output by the prompting model :type top_k: int :param nli_limit: Entailment threshold :type nli_limit: float :return: Results of the pipeline :rtype: list """ prompt_masked = prompt.format(model_prompting.tokenizer.mask_token) label = [] output_prompting = query_model_prompting(model_prompting, text, prompt_masked, top_k, targets=None) if remove_lemma: output_prompting = filter_prompt_output_by_lemma(prompt, output_prompting, nlp) for token in output_prompting: hypothesis = prompt.format(token['token_str']) text_entailment_results = do_text_entailment(text, hypothesis, nli_model) true_hypothesis_list = get_true_entailment(text_entailment_results, nli_limit) if len(true_hypothesis_list) > 0: label.append(((token['token_str'], token['score']), true_hypothesis_list[0])) return label def display_nli_pr_results_as_list(title, list_results): """ Display the list of entailment results as a streamlit choice list """ st.markdown( """ """, unsafe_allow_html=True, ) prompt_list = st.multiselect( title, list_results , list_results, key='results_mix') ##### QA def question_answering(model, text, questions_list, to_print=True): """ Apply question answering model :param model: QA Model :type model: Huggingface pipeline :param text: Event description (context) :type text: str :param question: Question to answer :type question: str :return: Tuple containing the answer and the confidence score :rtype: tuple """ for question in questions_list: QA_input = { 'question': question, 'context': text} res = model(QA_input, handle_impossible_answer=False) if to_print: st.write("Question: {}".format(question)) st.write("Answer: {}".format(res["answer"])) return res["answer"], res["score"] ### Prompt + NLI + QA def get_who_what_whom_qa(text, tokens, model_qa): who_what_whom = [] if not tokens: res_dict = {"Actor":'', "Action":'', "Target": ''} st.write("No entailed tokens.") else: for token in tokens: # res_dict = {"who":'', "did_what":token, "to_whom": '', "qa_score": []} res_dict = {"Actor":'', "Action":token, "Target": ''} if token[-3:] == 'ing': perp,score_p = question_answering(model_qa, text, ["Who was {}?".format(token)], to_print=False) else: perp,score_p = question_answering(model_qa, text, ["Who {} people?".format(token)], to_print=False) if perp: res_dict["Actor"] = perp + ' [' + str(np.round(score_p*100,1)) + '%]' else: res_dict["Actor"] = 'N/A' + ' [' + str(np.round(score_p*100,1)) + '%]' victim,score_v = question_answering(model_qa, text, ["Who was {}?".format(token)], to_print=False) if victim: res_dict["Target"] = victim + ' [' + str(np.round(score_v*100,1)) + '%]' else: res_dict["Target"] = 'N/A' + ' [' + str(np.round(score_v*100,1)) + '%]' who_what_whom.append(res_dict) return who_what_whom def remove_similar_lemma_from_list(prompt, list_words, nlp): ## Compute a dictionnary with the lemma for all tokens ## If there is a duplicate lemma then the dictionnary value will be a list of the corresponding tokens lemma_dict = {} for each in list_words: mask_filled = nlp(prompt.strip('.').format(each)) lemma_dict.setdefault([x.lemma_ for x in mask_filled][-1],[]).append(each) ## Get back the list of tokens ## If multiple tokens available then take the shortest one new_token_list = [] for key in lemma_dict.keys(): if len(lemma_dict[key]) >= 1: new_token_list.append(min(lemma_dict[key], key=len)) else: raise ValueError("Lemma dict has 0 corresponding words") return new_token_list def filter_prompt_output_by_lemma(prompt, output_prompting, nlp): """ Remove all similar lemmas from the prompt output (e.g. "protest", "protests") """ list_words = [x['token_str'] for x in output_prompting] new_token_list = remove_similar_lemma_from_list(prompt, list_words, nlp) return [x for x in output_prompting if x['token_str'] in new_token_list]