Sparkonix's picture
added Project Report
aea4b1f
import re
import spacy
from typing import List, Dict, Tuple, Any, Optional
from database import EmailDatabase
class Entity:
def __init__(self, start: int, end: int, entity_type: str, value: str):
self.start = start
self.end = end
self.entity_type = entity_type
self.value = value
def to_dict(self):
return {
"position": [self.start, self.end],
"classification": self.entity_type,
"entity": self.value
}
def __repr__(self): # Added for easier debugging
return (
f"Entity(type='{self.entity_type}', value='{self.value}', "
f"start={self.start}, end={self.end})"
)
class PIIMasker:
def __init__(
self,
spacy_model_name: str = "xx_ent_wiki_sm",
db_path: str = None
): # Allow model choice
# Load SpaCy model
try:
self.nlp = spacy.load(spacy_model_name)
except OSError:
print(f"SpaCy model '{spacy_model_name}' not found. Downloading...")
try:
spacy.cli.download(spacy_model_name)
self.nlp = spacy.load(spacy_model_name)
except Exception as e:
print(f"Failed to download or load {spacy_model_name}. Error: {e}")
print("Attempting to load 'en_core_web_sm' as a fallback for English.")
try:
self.nlp = spacy.load("en_core_web_sm")
except OSError:
print("Downloading 'en_core_web_sm'...")
spacy.cli.download("en_core_web_sm")
self.nlp = spacy.load("en_core_web_sm")
# Initialize database connection with SQLite path
self.db = EmailDatabase(connection_string=db_path)
# Initialize regex patterns
self._initialize_patterns()
def _initialize_patterns(self):
# Define regex patterns for different entity types
self.patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
# Simplified phone regex to capture both standard and international formats
"phone_number": (
r'\b(?:(?:\+|00)[1-9]\d{0,3}[-\s.]?)?'
r'(?:\(?\d{1,5}\)?[-\s.]?)?\d{1,5}'
r'(?:[-\s.]\d{1,5}){1,4}\b'
),
# Card number regex: common formats, allows optional spaces/hyphens
"credit_debit_no": r'\b(?:(?:\d{4}[\s-]?){3}\d{4}|\d{13,19})\b',
# CVV: 3 or 4 digits, ensuring it's a standalone number (word boundary)
"cvv_no": r'\b\d{3,4}\b',
# Expiry: MM/YY or MM/YYYY, common separators
"expiry_no": r'\b(0[1-9]|1[0-2])[/\s-]([0-9]{2}|20[0-9]{2})\b',
"aadhar_num": r'\b\d{4}\s?\d{4}\s?\d{4}\b',
# DOB: DD/MM/YYYY or DD-MM-YYYY etc.
"dob": (
r'\b(0[1-9]|[12][0-9]|3[01])[/\s-]'
r'(0[1-9]|1[0-2])[/\s-](?:19|20)\d\d\b'
)
}
def detect_regex_entities(self, text: str) -> List[Entity]:
"""Detect entities using regex patterns"""
entities = []
for entity_type, pattern in self.patterns.items():
for match in re.finditer(pattern, text):
start, end = match.span()
value = match.group()
# Specific verifications for each entity type
if entity_type == "credit_debit_no":
if not self.verify_credit_card(text, match):
continue
elif entity_type == "cvv_no":
if not self.verify_cvv(text, match):
continue
elif entity_type == "phone_number":
if not self.verify_phone_number(text, match):
continue
elif entity_type == "dob":
if not self._verify_with_context(
text, start, end, ["birth", "dob", "born"]
):
continue
# Avoid detecting parts of already matched longer entities
# (e.g. year within a DOB)
# This is a simple check; more robust overlap handling is done later
is_substring_of_existing = False
for existing_entity in entities:
if (existing_entity.start <= start
and existing_entity.end >= end
and existing_entity.value != value):
is_substring_of_existing = True
break
if is_substring_of_existing:
continue
entities.append(Entity(start, end, entity_type, value))
return entities
def _verify_with_context(
self, text: str, start: int, end: int, keywords: List[str], window: int = 50
) -> bool:
"""Verify an entity match using surrounding context"""
context_before = text[max(0, start - window):start].lower()
context_after = text[end:min(len(text), end + window)].lower()
for keyword in keywords:
if keyword in context_before or keyword in context_after:
return True
return False
def verify_credit_card(self, text: str, match: re.Match) -> bool:
"""Verify if a match is actually a credit card number using contextual clues"""
context_window = 50
start, end = match.span()
context_before = text[max(0, start - context_window):start].lower()
context_after = text[end:min(len(text), end + context_window)].lower()
card_keywords = [
"card", "credit", "debit", "visa", "mastercard",
"payment", "amex", "account no", "card no"
]
for keyword in card_keywords:
if keyword in context_before or keyword in context_after:
return True
# Basic Luhn algorithm check (optional, can be computationally more intensive)
# For simplicity, we'll rely on context here. If needed, Luhn can be added.
return False
def verify_cvv(self, text: str, match: re.Match) -> bool:
"""Verify if a 3-4 digit number is actually a CVV using contextual clues"""
context_window = 50
start, end = match.span()
value = match.group()
# If it's part of a longer number sequence (like a phone number or ID),
# it's likely not a CVV
# Check character immediately before and after
char_before = text[start - 1:start] if start > 0 else ""
char_after = text[end:end + 1] if end < len(text) else ""
if char_before.isdigit() or char_after.isdigit():
return False # It's part of a larger number
# Only consider 3-4 digit numbers
if not value.isdigit() or len(value) < 3 or len(value) > 4:
return False
context_before = text[max(0, start - context_window):start].lower()
context_after = text[end:min(len(text), end + context_window)].lower()
# Expanded list of CVV-related keywords to improve detection
cvv_keywords = [
"cvv", "cvc", "csc", "security code", "card verification",
"verification no", "security", "security number", "cv2",
"card code", "security value"
]
# Look for CVV context clues
is_cvv_context = any(
keyword in context_before or keyword in context_after
for keyword in cvv_keywords
)
# If explicitly mentioned as a CVV, immediately return true
if is_cvv_context:
return True
# If it looks like a year, reject it
if len(value) == 4 and 1900 <= int(value) <= 2100:
if any(
k in context_before or k in context_after
for k in ["year", "born", "established", "since"]
):
return False
# If in expiry date context, reject it
if re.search(r'\b(0[1-9]|1[0-2])[/\s-]$', context_before.strip()):
return False
# If no context clues but we have a credit card mention nearby,
# it could be a CVV
card_context = any(
k in context_before or k in context_after for k in
["card", "credit", "visa", "mastercard", "amex", "discover"]
)
return is_cvv_context or (card_context and len(value) in [3, 4])
def verify_phone_number(self, text: str, match: re.Match) -> bool:
"""
Verify if a match is actually a phone number using validation rules and context.
"""
value = match.group()
start, end = match.span()
# Extract only digits to count them
digits = ''.join(c for c in value if c.isdigit())
digit_count = len(digits)
# Most phone numbers worldwide have between 7 and 15 digits
if digit_count < 7 or digit_count > 15:
return False
# Check for common phone number indicators
context_window = 50
context_before = text[max(0, start - context_window):start].lower()
context_after = text[end:min(len(text), end + context_window)].lower()
# Expanded phone keywords
phone_keywords = [
"phone", "call", "tel", "telephone", "contact", "dial", "mobile",
"cell", "number", "direct", "office", "fax", "reach me at",
"call me", "contact me", "line", "extension", "ext", "phone number"
]
# Check for phone context
has_phone_context = any(
kw in context_before or kw in context_after for kw in phone_keywords
)
# Check for formatting that indicates a phone number
has_phone_formatting = bool(re.search(r'[-\s.()\\+]', value))
# Check for international prefix
has_intl_prefix = value.startswith('+') or value.startswith('00')
# Return true if any of these conditions are met:
# 1. Has explicit phone context
# 2. Has phone-like formatting AND reasonable digit count
# 3. Has international prefix AND reasonable digit count
# 4. Has 10 digits exactly (common in many countries) with formatting
return (
has_phone_context
or (has_phone_formatting and digit_count >= 7)
or (has_intl_prefix)
or (digit_count == 10 and has_phone_formatting)
)
def detect_name_entities(self, text: str) -> List[Entity]:
"""Detect name entities using SpaCy NER"""
entities = []
doc = self.nlp(text)
for ent in doc.ents:
# Use PER for person, common in many models like xx_ent_wiki_sm
# Also checking for PERSON as some models might use it.
if ent.label_ in ["PER", "PERSON"]:
entities.append(
Entity(ent.start_char, ent.end_char, "full_name", ent.text)
)
return entities
def detect_all_entities(self, text: str) -> List[Entity]:
"""Detect all types of entities in the text"""
# Get regex-based entities first
entities = self.detect_regex_entities(text)
# Add SpaCy-based name entities
# We add them second and let overlap resolution handle conflicts
# This is because NER for names can be more reliable than a generic regex
name_entities = self.detect_name_entities(text)
entities.extend(name_entities)
# Sort entities by their starting position
entities.sort(key=lambda x: x.start)
# Resolve overlaps: prioritize NER entities (like names) or longer regex matches
entities = self._resolve_overlaps(entities)
return entities
def _resolve_overlaps(self, entities: List[Entity]) -> List[Entity]:
"""Resolve overlapping entities.
Prioritize:
1. NER entities (e.g., "full_name") if they overlap with regex.
2. Longer entities over shorter ones.
3. If same length and type, no change (first one encountered).
"""
if not entities:
return []
# A simple greedy approach: iterate and remove/adjust overlaps
# This can be made more sophisticated
resolved_entities: List[Entity] = []
# Process by start, then by longest
for current_entity in sorted(
entities, key=lambda e: (e.start, -(e.end - e.start))
):
is_overlapped_or_contained = False
temp_resolved = []
for i, res_entity in enumerate(resolved_entities):
# Check for overlap:
# Current: |----|
# Res: |----| or |----| or |--| or |------|
overlap = max(
0,
min(current_entity.end, res_entity.end)
- max(current_entity.start, res_entity.start)
)
if overlap > 0:
is_overlapped_or_contained = True
# Preference:
# 1. NER often trump regex if they are the ones causing overlap
# 2. Longer entity wins
current_len = current_entity.end - current_entity.start
res_len = res_entity.end - res_entity.start
# If current is a name and overlaps, and previous is not a name,
# prefer current if it's not fully contained
if (current_entity.entity_type == "full_name" # E501 corrected
and res_entity.entity_type != "full_name"):
# current not fully contained by res
if not (res_entity.start <= current_entity.start
and res_entity.end >= current_entity.end):
# remove res_entity, current will be added later
continue # go to next res_entity, marked for removal
elif (res_entity.entity_type == "full_name"
and current_entity.entity_type != "full_name"):
# res_entity is a name, current is not. Prefer res_entity
# if it's not fully contained
if not (current_entity.start <= res_entity.start
and current_entity.end >= res_entity.end):
# current entity is subsumed or less important,
# so don't add current and keep res_entity
temp_resolved.append(res_entity)
is_overlapped_or_contained = True # Mark current as handled
break # Current is dominated
# General case: longer entity wins
if current_len > res_len:
# current is longer, res_entity is removed from
# consideration for this current_entity
pass # res_entity not added to temp_resolved if fully replaced
elif res_len > current_len:
# res is longer, current is dominated
temp_resolved.append(res_entity)
is_overlapped_or_contained = True # Mark current as handled
break
else: # Same length, keep existing one (res_entity)
temp_resolved.append(res_entity)
is_overlapped_or_contained = True # Mark current as handled
break
else: # No overlap
temp_resolved.append(res_entity)
if not is_overlapped_or_contained:
temp_resolved.append(current_entity)
resolved_entities = sorted(
temp_resolved, key=lambda e: (e.start, -(e.end - e.start))
)
# Final pass to remove fully contained entities if a larger one exists
final_entities = []
if not resolved_entities:
return []
for i, entity in enumerate(resolved_entities):
is_contained = False
for j, other_entity in enumerate(resolved_entities):
if i == j:
continue
# If 'entity' is strictly contained within 'other_entity'
if (other_entity.start <= entity.start
and other_entity.end >= entity.end
and (other_entity.end - other_entity.start
> entity.end - entity.start)):
is_contained = True
break
if not is_contained:
final_entities.append(entity)
return final_entities
def mask_text(self, text: str) -> Tuple[str, List[Dict[str, Any]]]:
"""
Mask PII entities in the text and return masked text and entity information
"""
entities = self.detect_all_entities(text)
entity_info = [entity.to_dict() for entity in entities]
# Sort entities by start position to ensure correct masking,
# longest first at same start to prevent partial masking by shorter entities
entities.sort(key=lambda x: (x.start, -(x.end - x.start)))
new_text_parts = []
current_pos = 0
for entity in entities:
# Add text before the entity
if entity.start > current_pos:
new_text_parts.append(text[current_pos:entity.start])
# Add the mask with entity type in uppercase for better visibility
mask = f"[{entity.entity_type.upper()}]"
new_text_parts.append(mask)
current_pos = entity.end
# Add any remaining text after the last entity
if current_pos < len(text):
new_text_parts.append(text[current_pos:])
return "".join(new_text_parts), entity_info
def process_email(self, email_text: str) -> Dict[str, Any]:
"""
Process an email by detecting and masking PII entities.
The original email is stored in the database for later retrieval if needed.
"""
# Mask the email
masked_email, entity_info = self.mask_text(email_text)
# Store the email in the SQLite database - only get back email_id now
email_id = self.db.store_email(
original_email=email_text,
masked_email=masked_email,
masked_entities=entity_info
)
# Return the processed data with just the email_id
return {
"input_email_body": email_text, # Return original for API compatibility
"list_of_masked_entities": entity_info,
"masked_email": masked_email,
"category_of_the_email": "",
"email_id": email_id
}
def get_original_email(
self, email_id: str, access_key: str
) -> Optional[Dict[str, Any]]:
"""
Retrieve the original email with PII using the email ID and access key.
Args:
email_id: The ID of the stored email
access_key: The security key for accessing the original email
Returns:
The original email data or None if not found or access_key is invalid
"""
return self.db.get_original_email(email_id, access_key)
def get_masked_email_by_id(self, email_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve a masked email by its ID (without the original PII-containing email).
Args:
email_id: The ID of the stored email
Returns:
The masked email data or None if not found
"""
return self.db.get_email_by_id(email_id)
def get_original_by_masked_email(
self, masked_email: str
) -> Optional[Dict[str, Any]]:
"""
Retrieve the original unmasked email using the masked email content.
Args:
masked_email: The masked version of the email to search for
Returns:
The original email data or None if not found
"""
return self.db.get_email_by_masked_content(masked_email)