code / sagemaker /classifier.py
24122168-collab
Add application file
6ba100e
"""
classifier.py β€” Production Rule-Based Email Classifier
=======================================================
Shared by SageMaker inference.py and Lambda handler.py.
Zero heavy dependencies β€” no numpy, no gymnasium.
Key fix vs lambda/classifier.py:
"legal" removed from _LEGAL_SECURITY_KW β€” it is a deception keyword
in phishing emails (TC-H-09), not a routing signal. Context field
is the authoritative source for legal routing.
"""
# ── Label maps ────────────────────────────────────────────────────────────────
URGENCY_LABELS = {0: "General", 1: "Billing", 2: "Security Breach"}
ROUTING_LABELS = {0: "AI Auto-Reply", 1: "Tech Support", 2: "Legal"}
RESOLUTION_LABELS = {0: "Archive", 1: "Draft Reply", 2: "Escalate"}
# Security emails that need Legal routing (ransomware / extortion / IP theft).
# NOTE: "legal" intentionally excluded β€” it appears in phishing deception text.
_LEGAL_SECURITY_KW = {"lawsuit", "attorney", "sue", "ransomware", "extortion"}
# Only "refund" escalates billing to Legal β€” "overdue" stays routine.
_BILLING_ESCALATE_KW = {"refund"}
# Canonical keyword vocabulary (must match environment.py KEYWORD_VOCAB)
KEYWORD_VOCAB = [
"invoice", "payment", "overdue", "refund",
"hacked", "breach", "unauthorized", "password",
"crash", "error", "bug", "slow",
"lawsuit", "legal", "attorney", "sue",
"spam", "offer", "win", "free",
"urgent", "critical", "angry", "threat",
]
# Words used for sentiment scoring
_NEG_WORDS = {
"angry", "threat", "hacked", "breach", "lawsuit", "overdue",
"unauthorized", "ransomware", "critical", "urgent", "error",
"crash", "bug", "refund",
}
_POS_WORDS = {"win", "free", "offer", "congratulations", "prize"}
# ── Feature extraction ────────────────────────────────────────────────────────
def extract_features(subject: str, body: str) -> dict:
"""
Parse raw email text β†’ feature dict {keywords, sentiment, context}.
Used when the caller does not supply pre-computed features.
"""
text = (subject + " " + body).lower()
tokens = set(text.split())
keywords = [kw for kw in KEYWORD_VOCAB if kw in tokens]
kw_set = set(keywords)
# Sentiment
neg_hits = len(tokens & _NEG_WORDS)
pos_hits = len(tokens & _POS_WORDS)
if neg_hits > pos_hits:
sentiment = "negative"
elif pos_hits > 0:
sentiment = "positive"
else:
sentiment = "neutral"
# Context β€” priority order matches the classifier decision tree
if kw_set & {"hacked", "breach", "unauthorized", "ransomware"}:
context = "security"
elif kw_set & {"lawsuit", "attorney", "sue"}:
context = "legal"
elif kw_set & {"invoice", "payment", "overdue", "refund"}:
context = "billing"
elif kw_set & {"crash", "error", "bug", "slow", "password"}:
context = "tech"
elif kw_set & {"spam", "offer", "win", "free"}:
context = "spam"
else:
context = "general"
return {"keywords": keywords, "sentiment": sentiment, "context": context}
# ── Classifier ────────────────────────────────────────────────────────────────
def classify(email: dict) -> tuple[int, int, int]:
"""
Deterministic rule-based classifier.
Returns (urgency, routing, resolution) as plain ints.
Decision tree β€” first match wins:
Rule 1 legal context OR lawsuit/attorney/sue keywords β†’ (2, 2, 2)
Rule 2a security + ransomware/extortion/hacked+breach β†’ (2, 2, 2)
Rule 2b security (account-level attack) β†’ (2, 1, 2)
Rule 3 billing + refund keyword β†’ (1, 2, 2)
Rule 4 billing routine β†’ (1, 0, 1)
Rule 5 tech context or crash/error/bug/slow β†’ (0, 1, 1)
Rule 6 spam / default β†’ (0, 0, 0)
"""
kw = set(email.get("keywords", []))
context = email.get("context", "").lower()
# Rule 1 β€” Legal
if context == "legal" or kw & {"lawsuit", "attorney", "sue"}:
return (2, 2, 2)
# Rule 2 β€” Security
if context == "security":
if kw & _LEGAL_SECURITY_KW or ("hacked" in kw and "breach" in kw):
return (2, 2, 2) # ransomware / extortion β†’ Legal
return (2, 1, 2) # account-level attack β†’ Tech Support
# Rule 3 & 4 β€” Billing
if context == "billing":
return (1, 2, 2) if kw & _BILLING_ESCALATE_KW else (1, 0, 1)
# Rule 5 β€” Tech
if context == "tech" or kw & {"crash", "error", "bug", "slow"}:
return (0, 1, 1)
# Rule 6 β€” Spam / default
return (0, 0, 0)
# ── Decoder ───────────────────────────────────────────────────────────────────
def decode(urgency: int, routing: int, resolution: int) -> dict:
"""Convert integer action codes to human-readable label dict."""
return {
"urgency": URGENCY_LABELS[urgency],
"routing": ROUTING_LABELS[routing],
"resolution": RESOLUTION_LABELS[resolution],
}
# ── Batch helper ─────────────────────────────────────────────────────────────
def classify_batch(emails: list[dict]) -> list[dict]:
"""
Classify a list of email dicts in one call.
Each dict may contain pre-computed features OR raw subject+body.
Returns a list of decode() dicts with codes attached.
"""
results = []
for email in emails:
if not email.get("context"):
features = extract_features(
email.get("subject", ""),
email.get("body", ""),
)
else:
features = email
u, r, res = classify(features)
result = decode(u, r, res)
result.update({"urgency_code": u, "routing_code": r, "resolution_code": res})
results.append(result)
return results