File size: 5,744 Bytes
cd69f4c
417fa02
 
 
 
 
cd69f4c
56932a9
1181cc4
56932a9
cd69f4c
6cd2282
56932a9
 
a7037f6
56932a9
 
 
 
a7037f6
56932a9
 
fa039ae
56932a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6cd2282
56932a9
 
cd69f4c
56932a9
 
fa039ae
 
56932a9
fa039ae
 
56932a9
fa039ae
56932a9
2361dd6
 
 
56932a9
 
 
 
cd69f4c
56932a9
417fa02
 
56932a9
cd69f4c
6cd2282
56932a9
cd69f4c
417fa02
56932a9
 
 
f934c22
56932a9
6cd2282
56932a9
 
 
 
 
f934c22
56932a9
417fa02
 
56932a9
cd69f4c
 
 
56932a9
fa039ae
56932a9
fa039ae
 
56932a9
 
 
 
fa039ae
56932a9
fa039ae
56932a9
 
 
 
6cd2282
 
 
 
56932a9
 
 
 
 
 
2361dd6
 
56932a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
417fa02
cd69f4c
56932a9
 
 
 
 
 
 
 
 
417fa02
 
 
56932a9
417fa02
 
56932a9
417fa02
 
56932a9
 
a7037f6
 
 
56932a9
 
 
 
 
 
 
 
417fa02
56932a9
417fa02
 
56932a9
 
 
 
cd69f4c
417fa02
56932a9
 
 
417fa02
56932a9
 
 
 
 
ff939b8
a7037f6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
from presidio_analyzer import PatternRecognizer, Pattern, RecognizerResult
from presidio_anonymizer import (
    AnonymizerEngine,
    EngineResult,
    OperatorConfig,
)
from typing import Dict, List


def deanonymize_output(
    output: str, recognized_tokens: List[RecognizerResult], regex_vault: Dict[str, str]
) -> EngineResult:
    """
    Deanonymize the provided output using the recognized tokens and regex vault.

    Args:
        output (str): The anonymized output text.
        recognized_tokens: Tokens recognized for anonymization.
        regex_vault (dict): Dictionary for mapping anonymized tokens to their original values.

    Returns:
        str: The deanonymized output text.
    """
    engine = AnonymizerEngine()

    def deanonymize_operator(anonymized_text: str) -> str:
        return regex_vault.get(anonymized_text, anonymized_text)

    result = engine.anonymize(
        text=output,
        analyzer_results=recognized_tokens,
        operators={
            "AnonymizedToken": OperatorConfig(
                "custom", {"lambda": deanonymize_operator}
            )
        },
    )

    return result


def recognize_anonymized_tokens(output: str) -> List[RecognizerResult]:
    """
    Recognize anonymized tokens in the output using a predefined pattern.

    Args:
        output (str): The text to analyze.

    Returns:
        list: Recognized anonymized tokens in the output.
    """
    anonymized_pattern = Pattern(
        name="AnonymizedToken",
        regex=r"\<[A-Za-z]+\d+\>",
        score=1.0,
    )
    recognizer = PatternRecognizer(
        supported_entity="AnonymizedToken", patterns=[anonymized_pattern]
    )

    return recognizer.analyze(text=output, entities=["AnonymizedToken"])


def process_output_with_llmguard(
    output: str, regex_vault: Dict[str, str]
) -> EngineResult:
    """
    Process the output with Presidio by applying the deanonymization scanner.

    Args:
        output (str): The anonymized output to process.
        regex_vault (dict): The vault to use for deanonymization.

    Returns:
        EngineResult: The deanonymized output.
    """
    recognized_anonymized_tokens = recognize_anonymized_tokens(output)
    deanonymized_output = deanonymize_output(
        output, recognized_anonymized_tokens, regex_vault
    )

    return deanonymized_output


def anonymize_input(
    input_text: str,
    recognized_patterns: List[RecognizerResult],
    regex_vault: Dict[str, str],
) -> EngineResult:
    """
    Anonymize input text using predefined patterns and update the regex vault.

    Args:
        input_text (str): The input text to anonymize.
        recognized_patterns: Recognized patterns for sensitive information.
        regex_vault (dict): The vault to store the anonymized values.

    Returns:
        EngineResult: The result of the anonymization process.
    """
    engine = AnonymizerEngine()
    entity_counters = {}  # Store counters for each entity type

    def store_record(record: str, entity_type: str) -> str:
        # I am not sure why some of the entries are just PII, so we actually should skip them
        if record == "PII":
            return record

        nonlocal entity_counters
        # Initialize counter for entity type if not set
        if entity_type not in entity_counters:
            entity_counters[entity_type] = 0
        entity_counters[entity_type] += 1

        # Create token in the format <{EntityType}{Counter}>
        token = f"<{entity_type}{entity_counters[entity_type]}>"
        regex_vault[token] = record
        return token

    # Anonymize the input using specific entities and descriptive tokens
    result = engine.anonymize(
        text=input_text,
        analyzer_results=recognized_patterns,
        operators={
            "StudentNetID": OperatorConfig(
                "custom", {"lambda": lambda x: store_record(x, "StudentNetID")}
            ),
            "NNumber": OperatorConfig(
                "custom", {"lambda": lambda x: store_record(x, "NNumber")}
            ),
            "ID": OperatorConfig("custom", {"lambda": lambda x: store_record(x, "ID")}),
        },
    )

    return result


def recognize_patterns_in_input(input_text: str) -> List[RecognizerResult]:
    """
    Recognize sensitive patterns in the input text using predefined patterns.

    Args:
        input_text (str): The input text to scan for sensitive information.

    Returns:
        list: Recognized patterns in the input.
    """
    patterns = [
        Pattern(
            name="StudentNetID",
            regex=r"[a-zA-Z]+\d+(?:@[a-zA-Z]+\.[a-zA-Z]+)?",
            score=1.0,
        ),
        Pattern(name="NNumber", regex=r"N\d{8}\b", score=1.0),
        Pattern(
            name="ID",
            regex=r"\b((([A-Za-z]{2})( +))|([A-Za-z]{2}))?\d{6,10}\b",
            score=1,
        ),
    ]

    recognized_results = []
    for pattern in patterns:
        recognizer = PatternRecognizer(
            supported_entity=pattern.name, patterns=[pattern]
        )
        recognized_results.extend(
            recognizer.analyze(text=input_text, entities=[pattern.name])
        )

    return recognized_results


def process_input_with_llmguard(
    input_text: str, regex_vault: Dict[str, str]
) -> EngineResult:
    """
    Process the input text with Presidio by applying anonymization.

    Args:
        input_text (str): The input to process.
        regex_vault (dict): The vault to store anonymized information.

    Returns:
        EngineResult: The processed input.
    """
    recognized_patterns = recognize_patterns_in_input(input_text)
    sanitized_prompt = anonymize_input(input_text, recognized_patterns, regex_vault)

    return sanitized_prompt