Spaces:
Sleeping
Sleeping
""" | |
Helper methods for the Presidio Streamlit app | |
""" | |
from typing import List, Optional, Tuple | |
import logging | |
import streamlit as st | |
from presidio_analyzer import ( | |
AnalyzerEngine, | |
RecognizerResult, | |
RecognizerRegistry, | |
PatternRecognizer, | |
Pattern, | |
) | |
from presidio_analyzer.nlp_engine import NlpEngine | |
from presidio_anonymizer import AnonymizerEngine | |
from presidio_anonymizer.entities import OperatorConfig | |
# from openai_fake_data_generator import ( | |
# set_openai_params, | |
# call_completion_model, | |
# create_prompt, | |
# OpenAIParams, | |
# ) | |
from presidio_nlp_engine_config import ( | |
create_nlp_engine_with_spacy, | |
# create_nlp_engine_with_flair, | |
create_nlp_engine_with_transformers, | |
# create_nlp_engine_with_azure_text_analytics, | |
) | |
logger = logging.getLogger("presidio-streamlit") | |
def nlp_engine_and_registry( | |
model_family: str, | |
model_path: str, | |
ta_key: Optional[str] = None, | |
ta_endpoint: Optional[str] = None, | |
) -> Tuple[NlpEngine, RecognizerRegistry]: | |
"""Create the NLP Engine instance based on the requested model. | |
:param model_family: Which model package to use for NER. | |
:param model_path: Which model to use for NER. E.g., | |
"StanfordAIMI/stanford-deidentifier-base", | |
"obi/deid_roberta_i2b2", | |
"en_core_web_lg" | |
:param ta_key: Key to the Text Analytics endpoint (only if model_path = "Azure Text Analytics") | |
:param ta_endpoint: Endpoint of the Text Analytics instance (only if model_path = "Azure Text Analytics") | |
""" | |
# Set up NLP Engine according to the model of choice | |
if "spaCy" in model_family: | |
return create_nlp_engine_with_spacy(model_path) | |
# elif "flair" in model_family: | |
# return create_nlp_engine_with_flair(model_path) | |
elif "HuggingFace" in model_family: | |
return create_nlp_engine_with_transformers(model_path) | |
# elif "Azure Text Analytics" in model_family: | |
# return create_nlp_engine_with_azure_text_analytics(ta_key, ta_endpoint) | |
# else: | |
# raise ValueError(f"Model family {model_family} not supported") | |
def analyzer_engine( | |
model_family: str, | |
model_path: str, | |
ta_key: Optional[str] = None, | |
ta_endpoint: Optional[str] = None, | |
) -> AnalyzerEngine: | |
"""Create the NLP Engine instance based on the requested model. | |
:param model_family: Which model package to use for NER. | |
:param model_path: Which model to use for NER: | |
"StanfordAIMI/stanford-deidentifier-base", | |
"obi/deid_roberta_i2b2", | |
"en_core_web_lg" | |
:param ta_key: Key to the Text Analytics endpoint (only if model_path = "Azure Text Analytics") | |
:param ta_endpoint: Endpoint of the Text Analytics instance (only if model_path = "Azure Text Analytics") | |
""" | |
nlp_engine, registry = nlp_engine_and_registry( | |
model_family, model_path, ta_key, ta_endpoint | |
) | |
analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry, supported_languages=['fr', 'en']) | |
return analyzer | |
def anonymizer_engine(): | |
"""Return AnonymizerEngine.""" | |
return AnonymizerEngine() | |
def get_supported_entities( | |
model_family: str, model_path: str, ta_key: str, ta_endpoint: str | |
): | |
"""Return supported entities from the Analyzer Engine.""" | |
# return analyzer_engine( | |
# model_family, model_path, ta_key, ta_endpoint | |
# ).get_supported_entities() + ["GENERIC_PII"] | |
return ["PERSON", "IBAN_CODE", "PHONE_NUMBER", "CREDIT_CARD", "CRYPTO", "DATE_TIME", "EMAIL_ADDRESS", "IP_ADDRESS", "NRP", "LOCATION", "URL", "FRENCH_SSN", "FRENCH_PASS", "FRENCH_NID"] | |
# | |
def analyze( | |
model_family: str, model_path: str, ta_key: str, ta_endpoint: str, **kwargs | |
): | |
"""Analyze input using Analyzer engine and input arguments (kwargs).""" | |
if "entities" not in kwargs or "All" in kwargs["entities"]: | |
kwargs["entities"] = None | |
if "deny_list" in kwargs and kwargs["deny_list"] is not None: | |
ad_hoc_recognizer = create_ad_hoc_deny_list_recognizer(kwargs["deny_list"]) | |
kwargs["ad_hoc_recognizers"] = [ad_hoc_recognizer] if ad_hoc_recognizer else [] | |
del kwargs["deny_list"] | |
if "regex_params" in kwargs and len(kwargs["regex_params"]) > 0: | |
ad_hoc_recognizer = create_ad_hoc_regex_recognizer(*kwargs["regex_params"]) | |
kwargs["ad_hoc_recognizers"] = [ad_hoc_recognizer] if ad_hoc_recognizer else [] | |
del kwargs["regex_params"] | |
return analyzer_engine(model_family, model_path, ta_key, ta_endpoint).analyze( | |
**kwargs | |
) | |
def anonymize( | |
text: str, | |
operator: str, | |
analyze_results: List[RecognizerResult], | |
mask_char: Optional[str] = None, | |
number_of_chars: Optional[str] = None, | |
encrypt_key: Optional[str] = None, | |
): | |
"""Anonymize identified input using Presidio Anonymizer. | |
:param text: Full text | |
:param operator: Operator name | |
:param mask_char: Mask char (for mask operator) | |
:param number_of_chars: Number of characters to mask (for mask operator) | |
:param encrypt_key: Encryption key (for encrypt operator) | |
:param analyze_results: list of results from presidio analyzer engine | |
""" | |
if operator == "mask": | |
operator_config = { | |
"type": "mask", | |
"masking_char": mask_char, | |
"chars_to_mask": number_of_chars, | |
"from_end": False, | |
} | |
# Define operator config | |
elif operator == "encrypt": | |
operator_config = {"key": encrypt_key} | |
elif operator == "highlight": | |
operator_config = {"lambda": lambda x: x} | |
else: | |
operator_config = None | |
# Change operator if needed as intermediate step | |
if operator == "highlight": | |
operator = "custom" | |
elif operator == "synthesize": | |
operator = "replace" | |
else: | |
operator = operator | |
res = anonymizer_engine().anonymize( | |
text, | |
analyze_results, | |
operators={"DEFAULT": OperatorConfig(operator, operator_config)}, | |
) | |
return res | |
def annotate(text: str, analyze_results: List[RecognizerResult]): | |
"""Highlight the identified PII entities on the original text | |
:param text: Full text | |
:param analyze_results: list of results from presidio analyzer engine | |
""" | |
tokens = [] | |
# Use the anonymizer to resolve overlaps | |
results = anonymize( | |
text=text, | |
operator="highlight", | |
analyze_results=analyze_results, | |
) | |
# sort by start index | |
results = sorted(results.items, key=lambda x: x.start) | |
for i, res in enumerate(results): | |
if i == 0: | |
tokens.append(text[: res.start]) | |
# append entity text and entity type | |
tokens.append((text[res.start : res.end], res.entity_type)) | |
# if another entity coming i.e. we're not at the last results element, add text up to next entity | |
if i != len(results) - 1: | |
tokens.append(text[res.end : results[i + 1].start]) | |
# if no more entities coming, add all remaining text | |
else: | |
tokens.append(text[res.end :]) | |
return tokens | |
# def create_fake_data( | |
# text: str, | |
# analyze_results: List[RecognizerResult], | |
# openai_params: OpenAIParams, | |
# ): | |
# """Creates a synthetic version of the text using OpenAI APIs""" | |
# if not openai_params.openai_key: | |
# return "Please provide your OpenAI key" | |
# results = anonymize(text=text, operator="replace", analyze_results=analyze_results) | |
# set_openai_params(openai_params) | |
# prompt = create_prompt(results.text) | |
# print(f"Prompt: {prompt}") | |
# fake = call_openai_api( | |
# prompt=prompt, | |
# openai_model_name=openai_params.model, | |
# openai_deployment_name=openai_params.deployment_name, | |
# ) | |
# return fake | |
# @st.cache_data | |
# def call_openai_api( | |
# prompt: str, openai_model_name: str, openai_deployment_name: Optional[str] = None | |
# ) -> str: | |
# fake_data = call_completion_model( | |
# prompt, model=openai_model_name, deployment_id=openai_deployment_name | |
# ) | |
# return fake_data | |
def create_ad_hoc_deny_list_recognizer( | |
deny_list=Optional[List[str]], | |
) -> Optional[PatternRecognizer]: | |
if not deny_list: | |
return None | |
deny_list_recognizer = PatternRecognizer( | |
supported_entity="GENERIC_PII", deny_list=deny_list | |
) | |
return deny_list_recognizer | |
def create_ad_hoc_regex_recognizer( | |
regex: str, entity_type: str, score: float, context: Optional[List[str]] = None | |
) -> Optional[PatternRecognizer]: | |
if not regex: | |
return None | |
pattern = Pattern(name="Regex pattern", regex=regex, score=score) | |
regex_recognizer = PatternRecognizer( | |
supported_entity=entity_type, patterns=[pattern], context=context | |
) | |
return regex_recognizer | |