presidio commited on
Commit
384d9d6
·
1 Parent(s): 6ca5077

Upload presidio_streamlit.py

Browse files
Files changed (1) hide show
  1. presidio_streamlit.py +293 -0
presidio_streamlit.py ADDED
@@ -0,0 +1,293 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Streamlit app for Presidio."""
2
+
3
+ from json import JSONEncoder
4
+ from typing import List
5
+
6
+ import pandas as pd
7
+ import spacy
8
+ import streamlit as st
9
+ from annotated_text import annotated_text
10
+ from presidio_analyzer import AnalyzerEngine, RecognizerResult, RecognizerRegistry
11
+ from presidio_analyzer.nlp_engine import NlpEngineProvider
12
+ from presidio_anonymizer import AnonymizerEngine
13
+ from presidio_anonymizer.entities import OperatorConfig
14
+
15
+ from transformers_rec import (
16
+ STANFORD_COFIGURATION,
17
+ TransformersRecognizer,
18
+ BERT_DEID_CONFIGURATION,
19
+ )
20
+
21
+
22
+ # Helper methods
23
+ @st.cache_resource
24
+ def analyzer_engine(model_path: str):
25
+ """Return AnalyzerEngine.
26
+
27
+ :param model_path: Which model to use for NER:
28
+ "StanfordAIMI/stanford-deidentifier-base",
29
+ "obi/deid_roberta_i2b2",
30
+ "en_core_web_lg"
31
+ """
32
+
33
+ registry = RecognizerRegistry()
34
+ registry.load_predefined_recognizers()
35
+
36
+ # Set up NLP Engine according to the model of choice
37
+ if model_path == "en_core_web_lg":
38
+ if not spacy.util.is_package("en_core_web_lg"):
39
+ spacy.cli.download("en_core_web_lg")
40
+ nlp_configuration = {
41
+ "nlp_engine_name": "spacy",
42
+ "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}],
43
+ }
44
+ else:
45
+ if not spacy.util.is_package("en_core_web_sm"):
46
+ spacy.cli.download("en_core_web_sm")
47
+ # Using a small spaCy model + a HF NER model
48
+ transformers_recognizer = TransformersRecognizer(model_path=model_path)
49
+
50
+ if model_path == "StanfordAIMI/stanford-deidentifier-base":
51
+ transformers_recognizer.load_transformer(**STANFORD_COFIGURATION)
52
+ elif model_path == "obi/deid_roberta_i2b2":
53
+ transformers_recognizer.load_transformer(**BERT_DEID_CONFIGURATION)
54
+
55
+ # Use small spaCy model, no need for both spacy and HF models
56
+ # The transformers model is used here as a recognizer, not as an NlpEngine
57
+ nlp_configuration = {
58
+ "nlp_engine_name": "spacy",
59
+ "models": [{"lang_code": "en", "model_name": "en_core_web_sm"}],
60
+ }
61
+
62
+ registry.add_recognizer(transformers_recognizer)
63
+
64
+ nlp_engine = NlpEngineProvider(nlp_configuration=nlp_configuration).create_engine()
65
+
66
+ analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry)
67
+ return analyzer
68
+
69
+
70
+ @st.cache_resource
71
+ def anonymizer_engine():
72
+ """Return AnonymizerEngine."""
73
+ return AnonymizerEngine()
74
+
75
+
76
+ @st.cache_data
77
+ def get_supported_entities():
78
+ """Return supported entities from the Analyzer Engine."""
79
+ return analyzer_engine(st_model).get_supported_entities()
80
+
81
+
82
+ @st.cache_data
83
+ def analyze(**kwargs):
84
+ """Analyze input using Analyzer engine and input arguments (kwargs)."""
85
+ if "entities" not in kwargs or "All" in kwargs["entities"]:
86
+ kwargs["entities"] = None
87
+ return analyzer_engine(st_model).analyze(**kwargs)
88
+
89
+
90
+ def anonymize(text: str, analyze_results: List[RecognizerResult]):
91
+ """Anonymize identified input using Presidio Anonymizer.
92
+
93
+ :param text: Full text
94
+ :param analyze_results: list of results from presidio analyzer engine
95
+ """
96
+
97
+ if st_operator == "mask":
98
+ operator_config = {
99
+ "type": "mask",
100
+ "masking_char": st_mask_char,
101
+ "chars_to_mask": st_number_of_chars,
102
+ "from_end": False,
103
+ }
104
+
105
+ elif st_operator == "encrypt":
106
+ operator_config = {"key": st_encrypt_key}
107
+ elif st_operator == "highlight":
108
+ operator_config = {"lambda": lambda x: x}
109
+ else:
110
+ operator_config = None
111
+
112
+ if st_operator == "highlight":
113
+ operator = "custom"
114
+ else:
115
+ operator = st_operator
116
+
117
+ res = anonymizer_engine().anonymize(
118
+ text,
119
+ analyze_results,
120
+ operators={"DEFAULT": OperatorConfig(operator, operator_config)},
121
+ )
122
+ return res
123
+
124
+
125
+ def annotate(text: str, analyze_results: List[RecognizerResult]):
126
+ """
127
+ Highlights every identified entity on top of the text.
128
+ :param text: full text
129
+ :param analyze_results: list of analyzer results.
130
+ """
131
+ tokens = []
132
+
133
+ # Use the anonymizer to resolve overlaps
134
+ results = anonymize(text, analyze_results)
135
+
136
+ # sort by start index
137
+ results = sorted(results.items, key=lambda x: x.start)
138
+ for i, res in enumerate(results):
139
+ if i == 0:
140
+ tokens.append(text[: res.start])
141
+
142
+ # append entity text and entity type
143
+ tokens.append((text[res.start: res.end], res.entity_type))
144
+
145
+ # if another entity coming i.e. we're not at the last results element, add text up to next entity
146
+ if i != len(results) - 1:
147
+ tokens.append(text[res.end: results[i + 1].start])
148
+ # if no more entities coming, add all remaining text
149
+ else:
150
+ tokens.append(text[res.end:])
151
+ return tokens
152
+
153
+
154
+ st.set_page_config(page_title="Presidio demo", layout="wide")
155
+
156
+ # Sidebar
157
+ st.sidebar.header(
158
+ """
159
+ PII De-Identification with Microsoft Presidio
160
+ """
161
+ )
162
+
163
+ st.sidebar.info(
164
+ "Presidio is an open source customizable framework for PII detection and de-identification\n"
165
+ "[Code](https://aka.ms/presidio) | "
166
+ "[Tutorial](https://microsoft.github.io/presidio/tutorial/) | "
167
+ "[Installation](https://microsoft.github.io/presidio/installation/) | "
168
+ "[FAQ](https://microsoft.github.io/presidio/faq/)",
169
+ icon="ℹ️",
170
+ )
171
+
172
+ st.sidebar.markdown(
173
+ "[![Pypi Downloads](https://img.shields.io/pypi/dm/presidio-analyzer.svg)](https://img.shields.io/pypi/dm/presidio-analyzer.svg)"
174
+ "[![MIT license](https://img.shields.io/badge/license-MIT-brightgreen.svg)](http://opensource.org/licenses/MIT)"
175
+ "![GitHub Repo stars](https://img.shields.io/github/stars/microsoft/presidio?style=social)"
176
+ )
177
+
178
+ st_model = st.sidebar.selectbox(
179
+ "NER model",
180
+ [
181
+ "StanfordAIMI/stanford-deidentifier-base",
182
+ "obi/deid_roberta_i2b2",
183
+ "en_core_web_lg",
184
+ ],
185
+ index=1,
186
+ )
187
+ st.sidebar.markdown("> Note: Models might take some time to download. ")
188
+
189
+ st_operator = st.sidebar.selectbox(
190
+ "De-identification approach",
191
+ ["redact", "replace", "mask", "hash", "encrypt", "highlight"],
192
+ index=1,
193
+ )
194
+
195
+ if st_operator == "mask":
196
+ st_number_of_chars = st.sidebar.number_input(
197
+ "number of chars", value=15, min_value=0, max_value=100
198
+ )
199
+ st_mask_char = st.sidebar.text_input("Mask character", value="*", max_chars=1)
200
+ elif st_operator == "encrypt":
201
+ st_encrypt_key = st.sidebar.text_input("AES key", value="WmZq4t7w!z%C&F)J")
202
+
203
+ st_threshold = st.sidebar.slider(
204
+ label="Acceptance threshold", min_value=0.0, max_value=1.0, value=0.35
205
+ )
206
+
207
+ st_return_decision_process = st.sidebar.checkbox(
208
+ "Add analysis explanations to findings", value=False
209
+ )
210
+
211
+ st_entities = st.sidebar.multiselect(
212
+ label="Which entities to look for?",
213
+ options=get_supported_entities(),
214
+ default=list(get_supported_entities()),
215
+ )
216
+
217
+ # Main panel
218
+ analyzer_load_state = st.info("Starting Presidio analyzer...")
219
+ engine = analyzer_engine(model_path=st_model)
220
+ analyzer_load_state.empty()
221
+
222
+ # Read default text
223
+ with open("demo_text.txt") as f:
224
+ demo_text = f.readlines()
225
+
226
+ # Create two columns for before and after
227
+ col1, col2 = st.columns(2)
228
+
229
+ # Before:
230
+ col1.subheader("Input string:")
231
+ st_text = col1.text_area(
232
+ label="Enter text",
233
+ value="".join(demo_text),
234
+ height=400,
235
+ )
236
+
237
+ st_analyze_results = analyze(
238
+ text=st_text,
239
+ entities=st_entities,
240
+ language="en",
241
+ score_threshold=st_threshold,
242
+ return_decision_process=st_return_decision_process,
243
+ )
244
+
245
+ # After
246
+ if st_operator != "highlight":
247
+ with col2:
248
+ st.subheader(f"Output")
249
+ st_anonymize_results = anonymize(st_text, st_analyze_results)
250
+ st.text_area(label="De-identified", value=st_anonymize_results.text, height=400)
251
+ else:
252
+ st.subheader("Highlighted")
253
+ annotated_tokens = annotate(st_text, st_analyze_results)
254
+ # annotated_tokens
255
+ annotated_text(*annotated_tokens)
256
+
257
+
258
+ # json result
259
+ class ToDictEncoder(JSONEncoder):
260
+ """Encode dict to json."""
261
+
262
+ def default(self, o):
263
+ """Encode to JSON using to_dict."""
264
+ return o.to_dict()
265
+
266
+
267
+ # table result
268
+ st.subheader(
269
+ "Findings" if not st_return_decision_process else "Findings with decision factors"
270
+ )
271
+ if st_analyze_results:
272
+ df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results])
273
+ df["text"] = [st_text[res.start: res.end] for res in st_analyze_results]
274
+
275
+ df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
276
+ {
277
+ "entity_type": "Entity type",
278
+ "text": "Text",
279
+ "start": "Start",
280
+ "end": "End",
281
+ "score": "Confidence",
282
+ },
283
+ axis=1,
284
+ )
285
+ df_subset["Text"] = [st_text[res.start: res.end] for res in st_analyze_results]
286
+ if st_return_decision_process:
287
+ analysis_explanation_df = pd.DataFrame.from_records(
288
+ [r.analysis_explanation.to_dict() for r in st_analyze_results]
289
+ )
290
+ df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
291
+ st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
292
+ else:
293
+ st.text("No findings")