Spaces:
Sleeping
Sleeping
from presidio_analyzer import PatternRecognizer, Pattern, RecognizerResult | |
from presidio_anonymizer import ( | |
AnonymizerEngine, | |
EngineResult, | |
OperatorConfig, | |
) | |
from typing import Dict, List | |
def deanonymize_output( | |
output: str, recognized_tokens: List[RecognizerResult], regex_vault: Dict[str, str] | |
) -> EngineResult: | |
""" | |
Deanonymize the provided output using the recognized tokens and regex vault. | |
Args: | |
output (str): The anonymized output text. | |
recognized_tokens: Tokens recognized for anonymization. | |
regex_vault (dict): Dictionary for mapping anonymized tokens to their original values. | |
Returns: | |
str: The deanonymized output text. | |
""" | |
engine = AnonymizerEngine() | |
def deanonymize_operator(anonymized_text: str) -> str: | |
return regex_vault.get(anonymized_text, anonymized_text) | |
result = engine.anonymize( | |
text=output, | |
analyzer_results=recognized_tokens, | |
operators={ | |
"AnonymizedToken": OperatorConfig( | |
"custom", {"lambda": deanonymize_operator} | |
) | |
}, | |
) | |
return result | |
def recognize_anonymized_tokens(output: str) -> List[RecognizerResult]: | |
""" | |
Recognize anonymized tokens in the output using a predefined pattern. | |
Args: | |
output (str): The text to analyze. | |
Returns: | |
list: Recognized anonymized tokens in the output. | |
""" | |
anonymized_pattern = Pattern( | |
name="AnonymizedToken", | |
regex=r"\<[A-Za-z]+\d+\>", | |
score=1.0, | |
) | |
recognizer = PatternRecognizer( | |
supported_entity="AnonymizedToken", patterns=[anonymized_pattern] | |
) | |
return recognizer.analyze(text=output, entities=["AnonymizedToken"]) | |
def process_output_with_llmguard( | |
output: str, regex_vault: Dict[str, str] | |
) -> EngineResult: | |
""" | |
Process the output with Presidio by applying the deanonymization scanner. | |
Args: | |
output (str): The anonymized output to process. | |
regex_vault (dict): The vault to use for deanonymization. | |
Returns: | |
EngineResult: The deanonymized output. | |
""" | |
recognized_anonymized_tokens = recognize_anonymized_tokens(output) | |
deanonymized_output = deanonymize_output( | |
output, recognized_anonymized_tokens, regex_vault | |
) | |
return deanonymized_output | |
def anonymize_input( | |
input_text: str, | |
recognized_patterns: List[RecognizerResult], | |
regex_vault: Dict[str, str], | |
) -> EngineResult: | |
""" | |
Anonymize input text using predefined patterns and update the regex vault. | |
Args: | |
input_text (str): The input text to anonymize. | |
recognized_patterns: Recognized patterns for sensitive information. | |
regex_vault (dict): The vault to store the anonymized values. | |
Returns: | |
EngineResult: The result of the anonymization process. | |
""" | |
engine = AnonymizerEngine() | |
entity_counters = {} # Store counters for each entity type | |
def store_record(record: str, entity_type: str) -> str: | |
# I am not sure why some of the entries are just PII, so we actually should skip them | |
if record == "PII": | |
return record | |
nonlocal entity_counters | |
# Initialize counter for entity type if not set | |
if entity_type not in entity_counters: | |
entity_counters[entity_type] = 0 | |
entity_counters[entity_type] += 1 | |
# Create token in the format <{EntityType}{Counter}> | |
token = f"<{entity_type}{entity_counters[entity_type]}>" | |
regex_vault[token] = record | |
return token | |
# Anonymize the input using specific entities and descriptive tokens | |
result = engine.anonymize( | |
text=input_text, | |
analyzer_results=recognized_patterns, | |
operators={ | |
"StudentNetID": OperatorConfig( | |
"custom", {"lambda": lambda x: store_record(x, "StudentNetID")} | |
), | |
"NNumber": OperatorConfig( | |
"custom", {"lambda": lambda x: store_record(x, "NNumber")} | |
), | |
"ID": OperatorConfig("custom", {"lambda": lambda x: store_record(x, "ID")}), | |
}, | |
) | |
return result | |
def recognize_patterns_in_input(input_text: str) -> List[RecognizerResult]: | |
""" | |
Recognize sensitive patterns in the input text using predefined patterns. | |
Args: | |
input_text (str): The input text to scan for sensitive information. | |
Returns: | |
list: Recognized patterns in the input. | |
""" | |
patterns = [ | |
Pattern( | |
name="StudentNetID", | |
regex=r"[a-zA-Z]+\d+(?:@[a-zA-Z]+\.[a-zA-Z]+)?", | |
score=1.0, | |
), | |
Pattern(name="NNumber", regex=r"N\d{8}\b", score=1.0), | |
Pattern( | |
name="ID", | |
regex=r"\b((([A-Za-z]{2})( +))|([A-Za-z]{2}))?\d{6,10}\b", | |
score=1, | |
), | |
] | |
recognized_results = [] | |
for pattern in patterns: | |
recognizer = PatternRecognizer( | |
supported_entity=pattern.name, patterns=[pattern] | |
) | |
recognized_results.extend( | |
recognizer.analyze(text=input_text, entities=[pattern.name]) | |
) | |
return recognized_results | |
def process_input_with_llmguard( | |
input_text: str, regex_vault: Dict[str, str] | |
) -> EngineResult: | |
""" | |
Process the input text with Presidio by applying anonymization. | |
Args: | |
input_text (str): The input to process. | |
regex_vault (dict): The vault to store anonymized information. | |
Returns: | |
EngineResult: The processed input. | |
""" | |
recognized_patterns = recognize_patterns_in_input(input_text) | |
sanitized_prompt = anonymize_input(input_text, recognized_patterns, regex_vault) | |
return sanitized_prompt | |