student-llm-guard / LLM /LLMGuard /GuardProcessor.py
retereum's picture
Updated gradio version to resolve issues with HFSpaces, changed tokenization type.
2361dd6
from presidio_analyzer import PatternRecognizer, Pattern, RecognizerResult
from presidio_anonymizer import (
AnonymizerEngine,
EngineResult,
OperatorConfig,
)
from typing import Dict, List
def deanonymize_output(
output: str, recognized_tokens: List[RecognizerResult], regex_vault: Dict[str, str]
) -> EngineResult:
"""
Deanonymize the provided output using the recognized tokens and regex vault.
Args:
output (str): The anonymized output text.
recognized_tokens: Tokens recognized for anonymization.
regex_vault (dict): Dictionary for mapping anonymized tokens to their original values.
Returns:
str: The deanonymized output text.
"""
engine = AnonymizerEngine()
def deanonymize_operator(anonymized_text: str) -> str:
return regex_vault.get(anonymized_text, anonymized_text)
result = engine.anonymize(
text=output,
analyzer_results=recognized_tokens,
operators={
"AnonymizedToken": OperatorConfig(
"custom", {"lambda": deanonymize_operator}
)
},
)
return result
def recognize_anonymized_tokens(output: str) -> List[RecognizerResult]:
"""
Recognize anonymized tokens in the output using a predefined pattern.
Args:
output (str): The text to analyze.
Returns:
list: Recognized anonymized tokens in the output.
"""
anonymized_pattern = Pattern(
name="AnonymizedToken",
regex=r"\<[A-Za-z]+\d+\>",
score=1.0,
)
recognizer = PatternRecognizer(
supported_entity="AnonymizedToken", patterns=[anonymized_pattern]
)
return recognizer.analyze(text=output, entities=["AnonymizedToken"])
def process_output_with_llmguard(
output: str, regex_vault: Dict[str, str]
) -> EngineResult:
"""
Process the output with Presidio by applying the deanonymization scanner.
Args:
output (str): The anonymized output to process.
regex_vault (dict): The vault to use for deanonymization.
Returns:
EngineResult: The deanonymized output.
"""
recognized_anonymized_tokens = recognize_anonymized_tokens(output)
deanonymized_output = deanonymize_output(
output, recognized_anonymized_tokens, regex_vault
)
return deanonymized_output
def anonymize_input(
input_text: str,
recognized_patterns: List[RecognizerResult],
regex_vault: Dict[str, str],
) -> EngineResult:
"""
Anonymize input text using predefined patterns and update the regex vault.
Args:
input_text (str): The input text to anonymize.
recognized_patterns: Recognized patterns for sensitive information.
regex_vault (dict): The vault to store the anonymized values.
Returns:
EngineResult: The result of the anonymization process.
"""
engine = AnonymizerEngine()
entity_counters = {} # Store counters for each entity type
def store_record(record: str, entity_type: str) -> str:
# I am not sure why some of the entries are just PII, so we actually should skip them
if record == "PII":
return record
nonlocal entity_counters
# Initialize counter for entity type if not set
if entity_type not in entity_counters:
entity_counters[entity_type] = 0
entity_counters[entity_type] += 1
# Create token in the format <{EntityType}{Counter}>
token = f"<{entity_type}{entity_counters[entity_type]}>"
regex_vault[token] = record
return token
# Anonymize the input using specific entities and descriptive tokens
result = engine.anonymize(
text=input_text,
analyzer_results=recognized_patterns,
operators={
"StudentNetID": OperatorConfig(
"custom", {"lambda": lambda x: store_record(x, "StudentNetID")}
),
"NNumber": OperatorConfig(
"custom", {"lambda": lambda x: store_record(x, "NNumber")}
),
"ID": OperatorConfig("custom", {"lambda": lambda x: store_record(x, "ID")}),
},
)
return result
def recognize_patterns_in_input(input_text: str) -> List[RecognizerResult]:
"""
Recognize sensitive patterns in the input text using predefined patterns.
Args:
input_text (str): The input text to scan for sensitive information.
Returns:
list: Recognized patterns in the input.
"""
patterns = [
Pattern(
name="StudentNetID",
regex=r"[a-zA-Z]+\d+(?:@[a-zA-Z]+\.[a-zA-Z]+)?",
score=1.0,
),
Pattern(name="NNumber", regex=r"N\d{8}\b", score=1.0),
Pattern(
name="ID",
regex=r"\b((([A-Za-z]{2})( +))|([A-Za-z]{2}))?\d{6,10}\b",
score=1,
),
]
recognized_results = []
for pattern in patterns:
recognizer = PatternRecognizer(
supported_entity=pattern.name, patterns=[pattern]
)
recognized_results.extend(
recognizer.analyze(text=input_text, entities=[pattern.name])
)
return recognized_results
def process_input_with_llmguard(
input_text: str, regex_vault: Dict[str, str]
) -> EngineResult:
"""
Process the input text with Presidio by applying anonymization.
Args:
input_text (str): The input to process.
regex_vault (dict): The vault to store anonymized information.
Returns:
EngineResult: The processed input.
"""
recognized_patterns = recognize_patterns_in_input(input_text)
sanitized_prompt = anonymize_input(input_text, recognized_patterns, regex_vault)
return sanitized_prompt