awacke1's picture
Update app.py
be8431a verified
raw
history blame contribute delete
9.23 kB
import logging
import os
import base64
import datetime
from datetime import datetime
import dotenv
import pandas as pd
import streamlit as st
from streamlit_tags import st_tags
from PyPDF2 import PdfReader, PdfWriter
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, RecognizerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
import tempfile
import pytz
st.set_page_config(page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"})
dotenv.load_dotenv()
logger = logging.getLogger("presidio-streamlit")
def get_timestamp_prefix_old() -> str:
"""πŸ•’ Stamps time with Central swagger!"""
central = pytz.timezone("US/Central")
return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
def get_timestamp_prefix() -> str:
central = pytz.timezone("US/Central")
return datetime.datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple:
"""πŸ€– Sparks NLP models with a wink!"""
registry = RecognizerRegistry()
registry.load_predefined_recognizers()
if model_family.lower() == "flair":
from flair.models import SequenceTagger
tagger = SequenceTagger.load(model_path)
logger.info(f"Flair model loaded: {model_path}")
return tagger, registry
elif model_family.lower() == "huggingface":
from transformers import pipeline
nlp = pipeline("ner", model=model_path, tokenizer=model_path)
logger.info(f"HuggingFace model loaded: {model_path}")
return nlp, registry
raise ValueError(f"Model family {model_family} unsupported")
def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine:
"""πŸ” Unleashes the PHI-hunting beast!"""
nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
return AnalyzerEngine(registry=registry)
def get_supported_entities(model_family: str, model_path: str) -> list[str]:
"""πŸ“‹ Spills the beans on PHI targets!"""
return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"] if model_family.lower() == "huggingface" else ["PERSON", "LOCATION", "ORGANIZATION"]
# Feature Spotlight: πŸ•΅οΈβ€β™‚οΈ PHI Hunt Kicks Off!
# Models dive into PDFs, sniffing out sensitive bits with ninja vibes! 😎
def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list:
"""🦸 Zaps PHI with eagle-eye precision!"""
results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process)
filtered = []
for result in results:
snippet = text[result.start:result.end].lower()
if any(word.lower() in snippet for word in allow_list):
continue
if any(word.lower() in snippet for word in deny_list) or not deny_list:
filtered.append(result)
return filtered
def anonymize(text: str, operator: str, analyze_results: list, mask_char: str = "*", number_of_chars: int = 15) -> dict:
"""πŸ•΅οΈβ€β™€οΈ Hides PHI with a magician’s flair!"""
anonymizer = AnonymizerEngine()
config = {"DEFAULT": OperatorConfig(operator, {})}
if operator == "mask":
config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars})
return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=config)
def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer:
"""🚨 Sets traps for sneaky PHI rogues!"""
return None if not deny_list else PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list)
def save_pdf(pdf_input) -> str:
"""πŸ’Ύ Stashes PDFs in a temp vault!"""
if pdf_input.size > 200 * 1024 * 1024:
logger.error(f"Upload rejected: {pdf_input.name} exceeds 200MB")
st.error("PDF exceeds 200MB limit")
raise ValueError("PDF too big")
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf", dir="/tmp") as tmp:
tmp.write(pdf_input.read())
logger.info(f"Uploaded PDF to {tmp.name}, size: {pdf_input.size} bytes")
return tmp.name
# Feature Spotlight: πŸ“„ PDF Wizardry Unleashed!
# Uploads zip through, PHI vanishes, and out pops a safe PDF with timestamp pizzazz! ✨
def read_pdf(pdf_path: str) -> str:
"""πŸ“– Gobbles PDF text like candy!"""
reader = PdfReader(pdf_path)
text = "".join(page.extract_text() or "" + "\n" for page in reader.pages)
logger.info(f"Extracted {len(text)} chars from {pdf_path}")
return text
def create_pdf(text: str, input_path: str, output_filename: str) -> str:
"""πŸ–¨οΈ Spins a new PDF with PHI-proof charm!"""
reader = PdfReader(input_path)
writer = PdfWriter()
for page in reader.pages:
writer.add_page(page)
with open(output_filename, "wb") as f:
writer.write(f)
logger.info(f"Created PDF: {output_filename}")
return output_filename
# Sidebar
st.sidebar.header("PHI De-identification with Presidio")
model_list = [
("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
]
st_model = st.sidebar.selectbox("NER model", [m[0] for m in model_list], 0)
st.sidebar.markdown(f"[View model]({next(url for m, url in model_list if m == st_model)})")
st_model_package = st_model.split("/")[0]
st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:])
analyzer_params = (st_model_package, st_model)
st.sidebar.warning("Models may snooze briefly!")
st_operator = st.sidebar.selectbox("De-id approach", ["replace", "redact", "mask"], 0)
st_threshold = st.sidebar.slider("Threshold", 0.0, 1.0, 0.35)
st_return_decision_process = st.sidebar.checkbox("Show analysis", False)
with st.sidebar.expander("Allow/Deny lists"):
st_allow_list = st_tags(label="Allowlist", text="Add word, hit enter")
st_deny_list = st_tags(label="Denylist", text="Add word, hit enter")
# Main
col1, col2 = st.columns(2)
with col1:
st.subheader("Input")
uploaded_file = st.file_uploader("Upload PDF", type=["pdf"], help="Max 200MB")
if uploaded_file:
try:
logger.info(f"Upload: {uploaded_file.name}, size: {uploaded_file.size} bytes")
pdf_path = save_pdf(uploaded_file)
text = read_pdf(pdf_path)
if not text:
st.error("No text extracted")
raise ValueError("Empty PDF")
analyzer = analyzer_engine(*analyzer_params)
st_analyze_results = analyze(
analyzer=analyzer,
text=text,
entities=get_supported_entities(*analyzer_params),
language="en",
score_threshold=st_threshold,
return_decision_process=st_return_decision_process,
allow_list=st_allow_list,
deny_list=st_deny_list,
)
phi_types = set(res.entity_type for res in st_analyze_results)
if phi_types:
st.success(f"Zapped PHI: {', '.join(phi_types)}")
else:
st.info("No PHI found")
anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results)
timestamp = get_timestamp_prefix()
output_filename = f"{timestamp}_{uploaded_file.name}"
create_pdf(anonymized_result.text, pdf_path, output_filename)
with open(output_filename, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True)
with col2:
st.subheader("Findings")
if st_analyze_results:
df = pd.DataFrame([r.to_dict() for r in st_analyze_results])
df["text"] = [text[r.start:r.end] for r in st_analyze_results]
df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
{"entity_type": "Type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1
)
if st_return_decision_process:
df_subset = pd.concat([df_subset, pd.DataFrame([r.analysis_explanation.to_dict() for r in st_analyze_results])], axis=1)
st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
else:
st.text("No findings")
os.remove(pdf_path)
except Exception as e:
st.error(f"Oops: {str(e)}")
logger.error(f"Error: {str(e)}")