Spaces:
Runtime error
Runtime error
import spacy | |
import streamlit as st | |
import re | |
import logging | |
from presidio_anonymizer import AnonymizerEngine | |
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, RecognizerResult, EntityRecognizer | |
from annotated_text import annotated_text | |
from flair_recognizer import FlairRecognizer | |
from detoxify import Detoxify | |
############################### | |
#### Render Streamlit page #### | |
############################### | |
st.title("Anonymise your text!") | |
st.markdown( | |
"This mini-app anonymises text using Flair and Presidio. You can find the code in the Files and versions tab above." | |
) | |
# Configure logger | |
logging.basicConfig(format="\n%(asctime)s\n%(message)s", level=logging.INFO, force=True) | |
############################## | |
###### Define functions ###### | |
############################## | |
def analyzer_engine(): | |
"""Return AnalyzerEngine.""" | |
analyzer = AnalyzerEngine() | |
flair_recognizer = FlairRecognizer() | |
analyzer.registry.add_recognizer(flair_recognizer) | |
return analyzer | |
def analyze(**kwargs): | |
"""Analyze input using Analyzer engine and input arguments (kwargs).""" | |
if "entities" not in kwargs or "All" in kwargs["entities"]: | |
kwargs["entities"] = None | |
# if st.session_state.excluded_words: | |
# deny_list = [i.strip() for i in st.session_state.excluded_words.split(',')] | |
# logging.info( | |
# f"words excluded : {deny_list}\n" | |
# ) | |
# excluded_words_recognizer = PatternRecognizer(supported_entity="MANUAL ADD", | |
# name="Excluded words recognizer", | |
# deny_list=deny_list) | |
# analyzer_engine().registry.add_recognizer(excluded_words_recognizer) | |
results = analyzer_engine().analyze(**kwargs) | |
st.session_state.analyze_results = results | |
def annotate(): | |
text = st.session_state.text | |
analyze_results = st.session_state.analyze_results | |
tokens = [] | |
starts=[] | |
# sort by start index | |
results = sorted(analyze_results, key=lambda x: x.start) | |
for i, res in enumerate(results): | |
# if we already have an entity for this token don't add another | |
if res.start not in starts: | |
if i == 0: | |
tokens.append(text[:res.start]) | |
# append entity text and entity type | |
tokens.append((text[res.start: res.end], res.entity_type)) | |
# if another entity coming i.e. we're not at the last results element, add text up to next entity | |
if i != len(results) - 1: | |
tokens.append(text[res.end:results[i+1].start]) | |
# if no more entities coming, add all remaining text | |
else: | |
tokens.append(text[res.end:]) | |
# append this token to the list so we don't repeat results per token | |
starts.append(res.start) | |
return tokens | |
def get_supported_entities(): | |
"""Return supported entities from the Analyzer Engine.""" | |
return analyzer_engine().get_supported_entities() | |
def analyze_text(): | |
if not st.session_state.text: | |
st.session_state.text_error = "Please enter your text" | |
return | |
toxicity_results = Detoxify('original').predict(st.session_state.text) | |
is_toxic=False | |
for k in toxicity_results.keys(): | |
for k in toxicity_results.keys(): | |
if k!='toxicity': | |
if toxicity_results[k]>0.5: | |
is_toxic=True | |
else: | |
if toxicity_results[k]>0.65: | |
is_toxic=True | |
if is_toxic: | |
st.session_state.text_error = "Your text entry was detected as toxic, please re-write it." | |
return | |
with text_spinner_placeholder: | |
with st.spinner("Please wait while your text is being analysed..."): | |
logging.info(f"This is the text being analysed: {st.session_state.text}") | |
st.session_state.text_error = "" | |
st.session_state.n_requests += 1 | |
analyze( | |
text=st.session_state.text, | |
entities=st_entities, | |
language="en", | |
return_decision_process=False, | |
) | |
if st.session_state.excluded_words: | |
include_manual_input() | |
if st.session_state.allowed_words: | |
exclude_manual_input() | |
logging.info( | |
f"analyse results: {st.session_state.analyze_results}\n" | |
) | |
def include_manual_input(): | |
deny_list = [i.strip() for i in st.session_state.excluded_words.split(',')] | |
def _deny_list_to_regex(deny_list): | |
""" | |
Convert a list of words to a matching regex. | |
To be analyzed by the analyze method as any other regex patterns. | |
:param deny_list: the list of words to detect | |
:return:the regex of the words for detection | |
""" | |
# Escape deny list elements as preparation for regex | |
escaped_deny_list = [re.escape(element) for element in deny_list] | |
regex = r"(?:^|(?<=\W))(" + "|".join(escaped_deny_list) + r")(?:(?=\W)|$)" | |
return regex | |
deny_list_pattern = _deny_list_to_regex(deny_list) | |
matches = re.finditer(deny_list_pattern, st.session_state.text) | |
results = [] | |
for match in matches: | |
start, end = match.span() | |
current_match = st.session_state.text[start:end] | |
# Skip empty results | |
if current_match == "": | |
continue | |
pattern_result = RecognizerResult( | |
entity_type='MANUALLY ADDED', | |
start=start, | |
end=end, | |
score=1.0, | |
) | |
results.append(pattern_result) | |
results = EntityRecognizer.remove_duplicates(results) | |
st.session_state.analyze_results.extend(results) | |
logging.info( | |
f"analyse results after adding excluded words: {st.session_state.analyze_results}\n" | |
) | |
def exclude_manual_input(): | |
analyze_results_fltered=[] | |
for token in st.session_state.analyze_results: | |
if st.session_state.text[token.start:token.end] not in st.session_state.allowed_words: | |
analyze_results_fltered.append(token) | |
logging.info( | |
f"analyse results after removing allowed words: {analyze_results_fltered}\n" | |
) | |
st.session_state.analyze_results = analyze_results_fltered | |
def anonymizer_engine(): | |
"""Return AnonymizerEngine.""" | |
return AnonymizerEngine() | |
def anonymise_text(): | |
if st.session_state.n_requests >= 50: | |
st.session_state.text_error = "Too many requests. Please wait a few seconds before anonymising more text." | |
logging.info(f"Session request limit reached: {st.session_state.n_requests}") | |
st.session_state.n_requests = 1 | |
st.session_state.text_error = "" | |
if not st.session_state.text: | |
st.session_state.text_error = "Please enter your text" | |
return | |
if not st.session_state.analyze_results: | |
analyze_text() | |
with text_spinner_placeholder: | |
with st.spinner("Please wait while your text is being anonymised..."): | |
anon_results = anonymizer_engine().anonymize(st.session_state.text, st.session_state.analyze_results) | |
st.session_state.text_error = "" | |
st.session_state.n_requests += 1 | |
st.session_state.anon_results = anon_results | |
logging.info( | |
f"text anonymised: {st.session_state.anon_results}" | |
) | |
def clear_results(): | |
st.session_state.anon_results="" | |
st.session_state.analyze_results="" | |
# analyzer_engine().registry.remove_recognizer("Excluded words recognizer") | |
####################################### | |
#### Initialize "global" variables #### | |
####################################### | |
if "text_error" not in st.session_state: | |
st.session_state.text_error = "" | |
if "analyze_results" not in st.session_state: | |
st.session_state.analyze_results = "" | |
if "anon_results" not in st.session_state: | |
st.session_state.anon_results = "" | |
if "n_requests" not in st.session_state: | |
st.session_state.n_requests = 0 | |
############################## | |
####### Page arguments ####### | |
############################## | |
# Every widget with a key is automatically added to Session State as a global variable. | |
# In Streamlit, interacting with a widget triggers a rerun and variables defined | |
# in the code get reinitialized after each rerun. | |
# If a callback function is associated with a widget then a change in the widget | |
# triggers the following sequence: First the callback function is executed and then | |
# the app executes from top to bottom. | |
st.text_input( | |
label="Text", | |
placeholder="Write your text here", | |
key='text', | |
on_change=clear_results | |
) | |
st.text_input( | |
label="Data to be redacted (optional)", | |
placeholder="John, Mary, London", | |
key='excluded_words', | |
on_change=clear_results | |
) | |
st.text_input( | |
label="Data to be ignored (optional)", | |
placeholder="NHS, GEL, Lab", | |
key='allowed_words', | |
on_change=clear_results | |
) | |
st_entities = st.sidebar.multiselect( | |
label="Which entities to look for?", | |
options=get_supported_entities(), | |
default=list(get_supported_entities()), | |
) | |
############################## | |
######## Page buttons ######## | |
############################## | |
# button return true when clicked | |
col1, col2 = st.columns(2) | |
with col1: | |
analyze_now = st.button( | |
label="Analyse text", | |
type="primary", | |
on_click=analyze_text, | |
) | |
with col2: | |
anonymise_now = st.button( | |
label="Anonymise text", | |
type="primary", | |
on_click=anonymise_text, | |
) | |
############################## | |
######## Page actions ######## | |
############################## | |
text_spinner_placeholder = st.empty() | |
if st.session_state.text_error: | |
st.error(st.session_state.text_error) | |
with col1: | |
if st.session_state.analyze_results: | |
annotated_tokens=annotate() | |
annotated_text(*annotated_tokens) | |
st.write(st.session_state.analyze_results) | |
with col2: | |
if st.session_state.anon_results: | |
st.write(st.session_state.anon_results.text) |