cat > app.py <<'PY' import re, csv, json from pathlib import Path import streamlit as st st.set_page_config(page_title="LLM Prompt Injection: Attack & Defense", layout="wide") ROOT = Path(__file__).resolve().parent CFG_PATH = ROOT / "config" / "policy.json" DEFAULT_CFG = { "tool_allowlist": { "read_files": ["data/policy.txt"] }, "egress_block_pii": True, "reidentify": ["NAME","COMPANY"] } def load_cfg(): if CFG_PATH.exists(): return json.loads(CFG_PATH.read_text(encoding="utf-8")) return DEFAULT_CFG # --- simple redaction & re-ID --- PII_PATTERNS = [ ("EMAIL", re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")), ("PHONE", re.compile(r"(?:(?:\+?\d{1,3})?[\s\-\.]?)?(?:\(?\d{2,4}\)?[\s\-\.]?)?\d{3,4}[\s\-\.]?\d{4}")), ] NAME_HINT = re.compile(r"\b(?:my name is|i am|i'm)\s+([A-Z][a-z]+(?:\s[A-Z][a-z]+)+)\b", re.IGNORECASE) COMPANY_HINT = re.compile(r"\bfrom\s+([A-Z][A-Za-z0-9&\-\.\s]{1,40})(?:\b|,)", re.IGNORECASE) def redact(text: str): findings = [] for label, rx in PII_PATTERNS: for m in rx.finditer(text or ""): findings.append((label, m.group(0))) for m in NAME_HINT.finditer(text or ""): findings.append(("NAME", m.group(1))) for m in COMPANY_HINT.finditer(text or ""): findings.append(("COMPANY", m.group(1).strip())) masked = text or "" mapping, counts = {}, {} for label, value in findings: counts[label] = counts.get(label, 0) + 1 token = f"[{label}_{counts[label]}]" mapping[token] = {"label": label, "value": value, "reidentifiable": label in {"NAME","COMPANY"}} masked = masked.replace(value, token, 1) return masked, mapping def reidentify(text: str, mapping: dict, allow_types: set): out = text or "" for token, info in mapping.items(): if info["label"] in allow_types and info["reidentifiable"]: out = out.replace(token, info["value"]) return out # --- defenses: allow-list, canary, DLP --- CANARY = "C4NARY{do-not-expose}" def add_canary_to_context(context: str) -> str: return (context or "") + f"\n\n# internal-note: {CANARY}" def canary_tripped(text: str) -> bool: return CANARY in (text or "") PII_RXS = [ ("EMAIL", re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")), ("PHONE", re.compile(r"(?:(?:\+?\d{1,3})?[\s\-\.]?)?(?:\(?\d{2,4}\)?[\s\-\.]?)?\d{3,4}[\s\-\.]?\d{4}")), ("AWS_KEY", re.compile(r"\bAKIA[0-9A-Z]{16}\b")), ] def dlp_block_or_pass(text: str, logs: list) -> str: hits = [] for label, rx in PII_RXS: for m in rx.finditer(text or ""): hits.append((label, m.group(0))) if hits: logs.append(f"[DLP] Egress findings: {hits} — BLOCK (ATT&CK TA0010; Mitigation M1057)") return "[BLOCKED — sensitive data detected in output]" return text class Tooling: def __init__(self, protected: bool, cfg): self.protected = protected self.cfg = cfg def read_file(self, path: str, logs: list): from pathlib import PurePosixPath safe = ROOT / PurePosixPath(path) if not self.protected: try: logs.append(f"[TOOL] read_file('{path}') — allowed (UNPROTECTED)") return safe.read_text(encoding="utf-8") except Exception as e: logs.append(f"[TOOL ERROR] {e}") return None allowed = set(self.cfg.get("tool_allowlist", {}).get("read_files", [])) if path in allowed: try: logs.append(f"[TOOL] read_file('{path}') — allowed by policy") return safe.read_text(encoding="utf-8") except Exception as e: logs.append(f"[TOOL ERROR] {e}") return None else: logs.append(f"[POLICY] BLOCK read_file('{path}') — not in allow-list (ATT&CK TA0009/T1213)") return None def mock_llm(masked_prompt: str, tools: Tooling, logs: list, context: str) -> str: output = ["DEMO_ASSISTANT:"] mp = (masked_prompt or "").lower() if " """ content = st.text_area("Content", value=sample, height=220) uploaded = st.file_uploader("...or upload HTML/TXT", type=["html","txt"]) if uploaded is not None: content = uploaded.read().decode("utf-8", errors="ignore") if st.button("Run Attack ▶️"): logs = [] protected = (mode == "Protected") tools = Tooling(protected, cfg) # Ingress redaction (privacy) if protected: masked, mapping = redact(content) logs.append("[PRIVACY] Ingress redaction applied (model sees placeholders)") else: masked, mapping = content, {} # Internal context + canary try: policy_text = (ROOT / "data" / "policy.txt").read_text(encoding="utf-8") except Exception: policy_text = "No policy file found." context = add_canary_to_context(policy_text) # Model step assistant = mock_llm(masked, tools, logs, context) # Canary detection → block covert leakage if canary_tripped(assistant): logs.append("[CANARY] Leak detected — blocking output (maps to Exfiltration TA0010)") assistant = "[BLOCKED — attempted exfiltration detected]" # Egress DLP (protected mode) if protected and cfg.get("egress_block_pii", True): assistant = dlp_block_or_pass(assistant, logs) # Optional re-ID if mapping: assistant = reidentify(assistant, mapping, set(reid)) st.markdown("### Model Output") st.code(assistant) st.markdown("### Logs (for narration)") st.code("\n".join(logs)) st.markdown("---") st.caption("ATLAS: LLM Prompt Injection → LLM Data Leakage. ATT&CK: Collection (T1213) → Exfiltration (TA0010). Mitigations: M1057 (DLP), M1037 (Allow-list).") PY