| |
| """ |
| PII (Personally Identifiable Information) Detection Extension |
| Integrates with dual-mode content moderation |
| """ |
|
|
| import re |
| from enum import Enum |
| from typing import Dict, List, Tuple |
|
|
| class PIILabel(Enum): |
| SAFE = "safe" |
| EMAIL = "email" |
| PHONE = "phone" |
| ADDRESS = "address" |
| CREDIT_CARD = "credit_card" |
| SSN = "ssn" |
| SOCIAL_MEDIA = "social_media" |
| URL = "url" |
|
|
| class UnicodeDeobfuscator: |
| """Detect and normalize unicode obfuscation attempts""" |
| |
| |
| CIRCLED_LETTERS = range(0x24B6, 0x24EA) |
| MATHEMATICAL_CHARS = range(0x1D400, 0x1D800) |
| FULLWIDTH_CHARS = range(0xFF01, 0xFF5F) |
| DOUBLE_STRUCK = range(0x2100, 0x2150) |
| BOX_DRAWING = range(0x2500, 0x2580) |
| BLOCK_ELEMENTS = range(0x2580, 0x25A0) |
| |
| |
| CIRCLED_MAP = { |
| |
| 'βΆ': 'A', 'β·': 'B', 'βΈ': 'C', 'βΉ': 'D', 'βΊ': 'E', |
| 'β»': 'F', 'βΌ': 'G', 'β½': 'H', 'βΎ': 'I', 'βΏ': 'J', |
| 'β': 'K', 'β': 'L', 'β': 'M', 'β': 'N', 'β': 'O', |
| 'β
': 'P', 'β': 'Q', 'β': 'R', 'β': 'S', 'β': 'T', |
| 'β': 'U', 'β': 'V', 'β': 'W', 'β': 'X', 'β': 'Y', 'β': 'Z', |
| |
| 'β': 'a', 'β': 'b', 'β': 'c', 'β': 'd', 'β': 'e', |
| 'β': 'f', 'β': 'g', 'β': 'h', 'β': 'i', 'β': 'j', |
| 'β': 'k', 'β': 'l', 'β': 'm', 'β': 'n', 'β': 'o', |
| 'β': 'p', 'β ': 'q', 'β‘': 'r', 'β’': 's', 'β£': 't', |
| 'β€': 'u', 'β₯': 'v', 'β¦': 'w', 'β§': 'x', 'β¨': 'y', 'β©': 'z', |
| } |
| |
| @classmethod |
| def detect_obfuscation(cls, text: str) -> Tuple[bool, List[Tuple[str, str]], str]: |
| """ |
| Detect unicode obfuscation |
| Returns: (is_obfuscated, [(char, type)], normalized_text) |
| """ |
| suspicious = [] |
| normalized = [] |
| |
| for char in text: |
| code = ord(char) |
| |
| |
| if char in cls.CIRCLED_MAP: |
| suspicious.append((char, 'circled')) |
| normalized.append(cls.CIRCLED_MAP[char]) |
| |
| elif code in cls.DOUBLE_STRUCK: |
| suspicious.append((char, 'double-struck')) |
| |
| if char == 'β': |
| normalized.append('C') |
| elif char == 'β': |
| normalized.append('H') |
| elif char == 'β': |
| normalized.append('N') |
| elif char == 'β': |
| normalized.append('P') |
| elif char == 'β': |
| normalized.append('Q') |
| elif char == 'β': |
| normalized.append('R') |
| elif char == 'β€': |
| normalized.append('Z') |
| else: |
| normalized.append(char) |
| |
| elif code in cls.FULLWIDTH_CHARS: |
| suspicious.append((char, 'fullwidth')) |
| |
| normalized.append(chr(code - 0xFEE0)) |
| |
| elif code in cls.MATHEMATICAL_CHARS: |
| suspicious.append((char, 'mathematical')) |
| normalized.append(char) |
| else: |
| normalized.append(char) |
| |
| is_obfuscated = len(suspicious) > 0 |
| normalized_text = ''.join(normalized) |
| |
| return is_obfuscated, suspicious, normalized_text |
| |
| @classmethod |
| def normalize(cls, text: str) -> str: |
| """Quick normalize without detection details""" |
| _, _, normalized = cls.detect_obfuscation(text) |
| return normalized |
|
|
|
|
| class PIIDetector: |
| """Detect PII in text with context awareness""" |
| |
| def __init__(self): |
| |
| self.email_pattern = re.compile( |
| r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
| ) |
| |
| |
| self.phone_patterns = [ |
| re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'), |
| re.compile(r'\b\(\d{3}\)\s?\d{3}[-.]?\d{4}\b'), |
| re.compile(r'\b\+?\d{1,3}[-.\s]?\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b'), |
| re.compile(r'\b\d{4}\s?\d{3}\s?\d{3}\b'), |
| re.compile(r'\b\d{3}[-.]?\d{4}\b'), |
| re.compile(r'\b\d{7,10}\b'), |
| ] |
| |
| |
| self.address_patterns = [ |
| re.compile(r'\b\d+\s+\d*[A-Za-z]+(?:\s+[A-Za-z]+)?\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Way|Place|Pl|Circle|Cir|Trail|Trl|Parkway|Pkwy)\b', re.IGNORECASE), |
| re.compile(r'\b(?:PO|P\.O\.)\s*Box\s*\d+\b', re.IGNORECASE), |
| re.compile(r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Ave|Road|Rd)\b', re.IGNORECASE), |
| ] |
| |
| |
| self.cc_pattern = re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b|\b\d{16}\b') |
| |
| |
| self.ssn_pattern = re.compile(r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b') |
| |
| |
| self.social_media_domains = [ |
| 'instagram.com', 'instagr.am', |
| 'twitter.com', 'x.com', |
| 'tiktok.com', |
| 'snapchat.com', 'snap.com', |
| 'discord.com', 'discord.gg', |
| 'facebook.com', 'fb.com', |
| 'reddit.com', |
| 'youtube.com', 'youtu.be', |
| 'twitch.tv', |
| 'steamcommunity.com', |
| 'roblox.com', |
| ] |
| |
| |
| self.grooming_keywords = [ |
| 'dm me', 'message me privately', 'private chat', 'secret', |
| 'dont tell your parents', 'our little secret', 'just between us', |
| 'send me pics', 'send pictures', 'photo of you', 'what do you look like', |
| 'how old are you', 'where do you live', 'home alone', 'parents gone', |
| 'meet up', 'meet in person', 'come over', 'visit you', |
| 'boyfriend', 'girlfriend', 'dating', 'relationship', |
| 'trust me', 'special friend', 'mature for your age', |
| 'youre different', 'understand you', 'only one who gets you', |
| ] |
| |
| |
| self.url_pattern = re.compile( |
| r'https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?', |
| re.IGNORECASE |
| ) |
| |
| def detect_emails(self, text: str) -> List[Tuple[str, int, int]]: |
| """Find all emails in text""" |
| matches = [] |
| for match in self.email_pattern.finditer(text): |
| matches.append((match.group(), match.start(), match.end())) |
| return matches |
| |
| def detect_phones(self, text: str) -> List[Tuple[str, int, int]]: |
| """Find all phone numbers""" |
| matches = [] |
| for pattern in self.phone_patterns: |
| for match in pattern.finditer(text): |
| matches.append((match.group(), match.start(), match.end())) |
| return matches |
| |
| def detect_addresses(self, text: str) -> List[Tuple[str, int, int]]: |
| """Find addresses""" |
| matches = [] |
| for pattern in self.address_patterns: |
| for match in pattern.finditer(text): |
| matches.append((match.group(), match.start(), match.end())) |
| return matches |
| |
| def detect_credit_cards(self, text: str) -> List[Tuple[str, int, int]]: |
| """Find credit card numbers""" |
| matches = [] |
| for match in self.cc_pattern.finditer(text): |
| card = match.group().replace('-', '').replace(' ', '') |
| if len(card) >= 13 and len(card) <= 19: |
| matches.append((match.group(), match.start(), match.end())) |
| return matches |
| |
| def detect_ssn(self, text: str) -> List[Tuple[str, int, int]]: |
| """Find SSNs""" |
| matches = [] |
| for match in self.ssn_pattern.finditer(text): |
| matches.append((match.group(), match.start(), match.end())) |
| return matches |
| |
| def detect_social_media(self, text: str) -> List[Tuple[str, int, int, str]]: |
| """Find social media links with platform detection""" |
| matches = [] |
| urls = self.url_pattern.finditer(text) |
| |
| for url_match in urls: |
| url = url_match.group() |
| for domain in self.social_media_domains: |
| if domain.lower() in url.lower(): |
| matches.append((url, url_match.start(), url_match.end(), domain)) |
| break |
| |
| |
| username_patterns = [ |
| re.compile(r'\b(?:instagram|ig|insta)[:\s]*@?(\w+)\b', re.IGNORECASE), |
| re.compile(r'\b(?:twitter|x)[:\s]*@?(\w+)\b', re.IGNORECASE), |
| re.compile(r'\bdiscord[:\s]*@?(\w+)\b', re.IGNORECASE), |
| re.compile(r'\bsnapchat|snap[:\s]*@?(\w+)\b', re.IGNORECASE), |
| re.compile(r'\btiktok[:\s]*@?(\w+)\b', re.IGNORECASE), |
| ] |
| |
| for pattern in username_patterns: |
| for match in pattern.finditer(text): |
| platform = match.group(0).split(':')[0].lower() |
| matches.append((match.group(), match.start(), match.end(), platform)) |
| |
| return matches |
| |
| def detect_grooming_context(self, text: str) -> Tuple[bool, float, List[str]]: |
| """Detect if social media sharing has grooming context""" |
| text_lower = text.lower() |
| found_keywords = [] |
| |
| for keyword in self.grooming_keywords: |
| if keyword in text_lower: |
| found_keywords.append(keyword) |
| |
| |
| risk_score = min(len(found_keywords) / 3.0, 1.0) |
| is_suspicious = risk_score >= 0.33 |
| |
| return is_suspicious, risk_score, found_keywords |
| |
| def scan(self, text: str, age: int) -> Dict: |
| """ |
| Full PII scan with age-appropriate rules |
| Also detects unicode obfuscation |
| |
| Returns: |
| { |
| "has_pii": bool, |
| "pii_types": list, |
| "details": list, |
| "social_media_allowed": bool, |
| "grooming_risk": float, |
| "action": "allow" | "block" | "flag", |
| "reason": str, |
| "obfuscation_detected": bool, |
| "normalized_text": str |
| } |
| """ |
| |
| is_obfuscated, suspicious_chars, normalized_text = UnicodeDeobfuscator.detect_obfuscation(text) |
| |
| |
| detection_text = normalized_text if is_obfuscated else text |
| |
| pii_found = [] |
| pii_types = set() |
| |
| |
| emails = self.detect_emails(detection_text) |
| if emails: |
| pii_types.add(PIILabel.EMAIL) |
| for email, start, end in emails: |
| pii_found.append({"type": "email", "value": email, "start": start, "end": end}) |
| |
| phones = self.detect_phones(detection_text) |
| if phones: |
| pii_types.add(PIILabel.PHONE) |
| for phone, start, end in phones: |
| pii_found.append({"type": "phone", "value": phone, "start": start, "end": end}) |
| |
| addresses = self.detect_addresses(detection_text) |
| if addresses: |
| pii_types.add(PIILabel.ADDRESS) |
| for addr, start, end in addresses: |
| pii_found.append({"type": "address", "value": addr, "start": start, "end": end}) |
| |
| credit_cards = self.detect_credit_cards(detection_text) |
| if credit_cards: |
| pii_types.add(PIILabel.CREDIT_CARD) |
| for cc, start, end in credit_cards: |
| pii_found.append({"type": "credit_card", "value": cc, "start": start, "end": end}) |
| |
| ssns = self.detect_ssn(detection_text) |
| if ssns: |
| pii_types.add(PIILabel.SSN) |
| for ssn, start, end in ssns: |
| pii_found.append({"type": "ssn", "value": ssn, "start": start, "end": end}) |
| |
| |
| social_links = self.detect_social_media(detection_text) |
| has_social_media = len(social_links) > 0 |
| |
| if has_social_media: |
| pii_types.add(PIILabel.SOCIAL_MEDIA) |
| for link, start, end, platform in social_links: |
| pii_found.append({"type": "social_media", "value": link, "platform": platform, "start": start, "end": end}) |
| |
| |
| grooming_risk = 0.0 |
| grooming_keywords = [] |
| |
| |
| critical_pii = pii_types.intersection({PIILabel.EMAIL, PIILabel.PHONE, PIILabel.ADDRESS, PIILabel.CREDIT_CARD, PIILabel.SSN}) |
| |
| if critical_pii: |
| action = "block" |
| reason = f"PII detected: {', '.join([p.value for p in critical_pii])}" |
| elif has_social_media: |
| |
| is_grooming, grooming_risk, grooming_keywords = self.detect_grooming_context(detection_text) |
| |
| if age < 13: |
| |
| action = "block" |
| reason = "Social media sharing not permitted under 13" |
| elif is_grooming: |
| |
| action = "block" |
| reason = f"Potential grooming detected (risk: {grooming_risk:.0%})" |
| else: |
| |
| action = "allow" |
| reason = "Social media permitted for 13+ (no grooming signals)" |
| else: |
| action = "allow" |
| reason = "No PII detected" |
| |
| |
| social_media_allowed = True |
| if has_social_media: |
| if age < 13: |
| social_media_allowed = False |
| elif grooming_risk > 0: |
| social_media_allowed = False |
| |
| |
| if is_obfuscated and action == "allow": |
| reason = f"Unicode obfuscation detected and normalized. {reason}" |
| |
| return { |
| "has_pii": len(pii_types) > 0, |
| "pii_types": [p.value for p in pii_types], |
| "details": pii_found, |
| "social_media_allowed": social_media_allowed, |
| "grooming_risk": grooming_risk, |
| "grooming_keywords": grooming_keywords, |
| "action": action, |
| "reason": reason, |
| "age": age, |
| "obfuscation_detected": is_obfuscated, |
| "obfuscation_chars": [(c, t) for c, t in suspicious_chars] if is_obfuscated else [], |
| "normalized_text": normalized_text if is_obfuscated else text |
| } |
|
|
|
|
| |
| class CombinedModerationFilter: |
| """Combines content moderation + PII detection""" |
| |
| def __init__(self, content_model_path="./moderation_model_v2.pkl"): |
| from enhanced_moderation import EnhancedContentModerator, ContentLabel |
| |
| self.content_moderator = EnhancedContentModerator() |
| self.content_moderator.load(content_model_path) |
| self.pii_detector = PIIDetector() |
| |
| |
| self.under_13_blocked_content = [1, 2, 3, 4, 5] |
| self.teen_plus_blocked_content = [1, 3, 4, 5] |
| |
| def check(self, text: str, age: int) -> Dict: |
| """Full check: content + PII""" |
| from enhanced_moderation import ContentLabel |
| |
| |
| pii_result = self.pii_detector.scan(text, age) |
| |
| if pii_result["action"] == "block": |
| return { |
| "allowed": False, |
| "violation": "PII", |
| "pii_details": pii_result, |
| "content_details": None, |
| "reason": pii_result["reason"], |
| "age": age |
| } |
| |
| |
| content_label, confidence = self.content_moderator.predict(text) |
| |
| |
| if age >= 13: |
| content_allowed = content_label.value not in self.teen_plus_blocked_content |
| else: |
| content_allowed = content_label.value not in self.under_13_blocked_content |
| |
| |
| if not content_allowed and content_label.value == 2 and age >= 13: |
| content_allowed = True |
| content_reason = "Swearing permitted as reaction (13+)" |
| elif not content_allowed: |
| content_reason = f"{content_label.name} detected" |
| else: |
| content_reason = "Content safe" |
| |
| if not content_allowed: |
| return { |
| "allowed": False, |
| "violation": "CONTENT", |
| "pii_details": pii_result, |
| "content_details": { |
| "label": content_label.name, |
| "confidence": confidence |
| }, |
| "reason": content_reason, |
| "age": age |
| } |
| |
| |
| return { |
| "allowed": True, |
| "violation": None, |
| "pii_details": pii_result, |
| "content_details": { |
| "label": content_label.name, |
| "confidence": confidence |
| }, |
| "reason": "Content and PII checks passed", |
| "age": age |
| } |
|
|
|
|
| |
| if __name__ == "__main__": |
| detector = PIIDetector() |
| |
| test_cases = [ |
| ("My email is john@example.com", 15), |
| ("Call me at 555-123-4567", 16), |
| ("I'm at 123 Main Street", 14), |
| ("Follow me on instagram @cooluser", 10), |
| ("Follow me on instagram @cooluser", 15), |
| ("DM me on instagram, don't tell your parents", 15), |
| ("Check my tiktok @user", 14), |
| ("Send me pics on snapchat, it's our secret", 13), |
| ] |
| |
| print("PII Detection Tests") |
| print("=" * 70) |
| |
| for text, age in test_cases: |
| result = detector.scan(text, age) |
| status = "β
ALLOW" if result["action"] == "allow" else "β BLOCK" |
| |
| print(f"\nAge {age}: '{text}'") |
| print(f" {status} - {result['reason']}") |
| if result["grooming_risk"] > 0: |
| print(f" Grooming risk: {result['grooming_risk']:.0%}") |
| print(f" Keywords: {result['grooming_keywords']}") |
|
|