princemaxp commited on
Commit
3bd60c1
Β·
verified Β·
1 Parent(s): 4038d7a

Update body_analyzer.py

Browse files
Files changed (1) hide show
  1. body_analyzer.py +82 -82
body_analyzer.py CHANGED
@@ -1,4 +1,4 @@
1
- # body_analyzer_v2.py
2
  import os
3
  import re
4
  import requests
@@ -8,31 +8,45 @@ HF_API_KEY = os.getenv("HF_API_KEY")
8
  HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
9
  HF_TIMEOUT = 20 # seconds
10
 
11
- # Hugging Face model names
12
- PHISHING_MODELS = [
13
- "cybersectony/phishing-email-detection-distilbert_v2.4.1",
14
- "ealvaradob/bert-finetuned-phishing"
15
- ]
16
  ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
17
 
18
  # Suspicious phrase patterns
19
  SUSPICIOUS_PATTERNS = [
20
- "verify your account", "urgent action", "click here", "reset password",
21
- "confirm your identity", "bank account", "invoice", "payment required",
22
- "unauthorized login", "compromised", "final reminder", "account suspended",
23
- "account deactivated", "update your information", "legal action",
24
- "limited time offer", "claim your prize", "verify immediately",
25
- "verify now", "verify your credentials",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  ]
27
 
28
- # zero-shot candidate labels for message behavior
29
  BEHAVIOR_LABELS = [
30
- "credential harvesting", "invoice/payment fraud", "marketing",
31
- "benign", "malware", "account takeover",
 
 
 
 
32
  ]
33
 
34
  def _call_hf_text_model(model_name: str, text: str):
35
- """Call HF Inference API for text classification"""
36
  if not HF_API_KEY:
37
  return None
38
  try:
@@ -48,7 +62,6 @@ def _call_hf_text_model(model_name: str, text: str):
48
  return None
49
 
50
  def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
51
- """Zero-shot classification for email behavior/intent"""
52
  if not HF_API_KEY:
53
  return None
54
  try:
@@ -63,35 +76,39 @@ def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
63
  except Exception:
64
  return None
65
 
66
- def _parse_hf_model_output(result):
67
- """Extract label and confidence from HF output"""
68
  if not result:
69
- return None, 0.0
70
- if isinstance(result, list) and len(result) > 0 and isinstance(result[0], dict):
71
- label = result[0].get("label")
72
- score = result[0].get("score", 0.0)
73
- return label, float(score or 0.0)
74
- if isinstance(result, dict) and "labels" in result and "scores" in result:
75
- return result["labels"][0], float(result["scores"][0])
76
- return None, 0.0
 
 
 
 
 
 
77
 
78
  def analyze_body(subject: str, body: str, urls: list, images: list):
79
  findings = []
80
  score = 0
81
- highlighted_body = body or ""
82
- combined_text = f"{subject}\n{body}".lower()
83
 
84
- # 1) Basic heuristics: suspicious phrases
85
  for pattern in SUSPICIOUS_PATTERNS:
86
- if pattern in combined_text:
87
  findings.append(f"Suspicious phrase detected: \"{pattern}\"")
88
- score += 30 if pattern in (subject or "").lower() else 18
89
  try:
90
  highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
91
  except Exception:
92
  pass
93
 
94
- # 2) URL heuristics
95
  for u in urls or []:
96
  findings.append(f"Suspicious URL detected: {u}")
97
  score += 10
@@ -99,47 +116,41 @@ def analyze_body(subject: str, body: str, urls: list, images: list):
99
  highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
100
  except Exception:
101
  pass
102
- domain_match = re.search(r"https?://([^/]+)/?", u)
103
- if domain_match:
104
- domain = domain_match.group(1)
105
- if len(domain) > 25 or any(ch.isdigit() for ch in domain.split(".")[0]):
106
- findings.append(f"URL: suspicious-looking domain {domain}")
107
- score += 10
108
-
109
- # 3) ML Phishing detection using multiple HF models
110
- ml_labels = []
111
- ml_confidences = []
112
- model_input = "\n".join([subject or "", body or ""] + (urls or []))
113
- for phish_model in PHISHING_MODELS:
114
- if HF_API_KEY and model_input:
115
- result = _call_hf_text_model(phish_model, model_input)
116
- label, conf = _parse_hf_model_output(result)
117
- if label:
118
- findings.append(f"HF phishing model ({phish_model}) β†’ {label} (conf {conf:.2f})")
119
- ml_labels.append(label)
120
- ml_confidences.append(conf)
121
- # Take the max confidence phishing prediction
122
- if ml_confidences:
123
- max_idx = ml_confidences.index(max(ml_confidences))
124
- if "phish" in (ml_labels[max_idx] or "").lower():
125
- score += int(ml_confidences[max_idx] * 100 * 0.9)
126
-
127
- # 4) Zero-shot intent/behavior classification
128
- behavior_label = None
129
  behavior_conf = 0.0
130
  if HF_API_KEY and model_input:
131
  zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
132
- if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
133
- behavior_label = zs["labels"][0]
134
- behavior_conf = float(zs["scores"][0])
135
- findings.append(f"Behavior inference β†’ {behavior_label} (conf {behavior_conf:.2f})")
136
- if behavior_conf >= 0.7:
137
- score += int(behavior_conf * 30)
 
 
 
 
 
 
138
 
139
- # 5) Final score clamping
140
- score = max(0, min(score, 100))
141
 
142
- # 6) Verdict
143
  if score >= 70:
144
  verdict = "🚨 Malicious"
145
  elif 50 <= score < 70:
@@ -150,16 +161,5 @@ def analyze_body(subject: str, body: str, urls: list, images: list):
150
  verdict = "βœ… Safe"
151
  findings.append("No strong phishing signals detected by models/heuristics.")
152
 
153
- # 7) Richer textual summary (like your example)
154
- summary = f"""
155
- Email analysis summary:
156
- - Subject: {subject}
157
- - Body length: {len(body)} chars
158
- - Detected behavior/intent: {behavior_label} (conf {behavior_conf:.2f})
159
- - Top phishing alert: {ml_labels[max_idx] if ml_labels else 'None'}
160
- - Suspicious phrases found: {len([f for f in findings if 'Suspicious phrase' in f])}
161
- - Total score: {score}/100
162
- Verdict: {verdict}
163
- """
164
-
165
- return findings, score, highlighted_body, verdict, summary
 
1
+ # body_analyzer.py
2
  import os
3
  import re
4
  import requests
 
8
  HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
9
  HF_TIMEOUT = 20 # seconds
10
 
11
+ # ML model names
12
+ PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1"
 
 
 
13
  ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
14
 
15
  # Suspicious phrase patterns
16
  SUSPICIOUS_PATTERNS = [
17
+ "verify your account",
18
+ "urgent action",
19
+ "click here",
20
+ "reset password",
21
+ "confirm your identity",
22
+ "bank account",
23
+ "invoice",
24
+ "payment required",
25
+ "unauthorized login",
26
+ "compromised",
27
+ "final reminder",
28
+ "account suspended",
29
+ "account deactivated",
30
+ "update your information",
31
+ "legal action",
32
+ "limited time offer",
33
+ "claim your prize",
34
+ "verify immediately",
35
+ "verify now",
36
+ "verify your credentials",
37
  ]
38
 
39
+ # Zero-shot candidate labels for intent/behavior
40
  BEHAVIOR_LABELS = [
41
+ "credential harvesting",
42
+ "invoice/payment fraud",
43
+ "marketing",
44
+ "benign",
45
+ "malware",
46
+ "account takeover",
47
  ]
48
 
49
  def _call_hf_text_model(model_name: str, text: str):
 
50
  if not HF_API_KEY:
51
  return None
52
  try:
 
62
  return None
63
 
64
  def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
 
65
  if not HF_API_KEY:
66
  return None
67
  try:
 
76
  except Exception:
77
  return None
78
 
79
+ def _parse_hf_phishing_model_output(result):
 
80
  if not result:
81
+ return None, 0.0, {}
82
+ if isinstance(result, list) and result and isinstance(result[0], dict):
83
+ r0 = result[0]
84
+ label = r0.get("label")
85
+ score = r0.get("score", 0.0)
86
+ return label, float(score), {label: float(score)}
87
+ if isinstance(result, dict):
88
+ labels = result.get("labels") or result.get("label") or []
89
+ scores = result.get("scores") or result.get("score") or []
90
+ if isinstance(labels, list) and isinstance(scores, list) and labels and scores:
91
+ all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)}
92
+ max_lab = max(all_probs.items(), key=lambda x: x[1])
93
+ return max_lab[0], float(max_lab[1]), all_probs
94
+ return None, 0.0, {}
95
 
96
  def analyze_body(subject: str, body: str, urls: list, images: list):
97
  findings = []
98
  score = 0
99
+ highlighted_body = (body or "")
 
100
 
101
+ combined_lower = ((subject or "") + "\n" + (body or "")).lower()
102
  for pattern in SUSPICIOUS_PATTERNS:
103
+ if pattern in combined_lower:
104
  findings.append(f"Suspicious phrase detected: \"{pattern}\"")
105
+ score += 18
106
  try:
107
  highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
108
  except Exception:
109
  pass
110
 
111
+ # URL checks
112
  for u in urls or []:
113
  findings.append(f"Suspicious URL detected: {u}")
114
  score += 10
 
116
  highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
117
  except Exception:
118
  pass
119
+
120
+ # ML phishing model
121
+ ml_label = None
122
+ ml_conf = 0.0
123
+ model_input = "\n".join([subject or "", body or "", "\n".join(urls or [])]).strip()
124
+ if model_input and HF_API_KEY:
125
+ raw = _call_hf_text_model(PHISHING_MODEL, model_input)
126
+ label, conf, _ = _parse_hf_phishing_model_output(raw)
127
+ if label:
128
+ ml_label = label
129
+ ml_conf = conf
130
+ findings.append(f"HuggingFace phishing model β†’ {label} (conf {conf:.2f})")
131
+ score += int(conf * 100 * 0.9)
132
+
133
+ # Zero-shot behavior
134
+ behavior = None
 
 
 
 
 
 
 
 
 
 
 
135
  behavior_conf = 0.0
136
  if HF_API_KEY and model_input:
137
  zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
138
+ try:
139
+ if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
140
+ behavior = zs["labels"][0]
141
+ behavior_conf = float(zs["scores"][0])
142
+ findings.append(f"Behavior inference β†’ {behavior} (conf {behavior_conf:.2f})")
143
+ if behavior_conf >= 0.7:
144
+ score += int(behavior_conf * 30)
145
+ except Exception:
146
+ pass
147
+
148
+ if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()):
149
+ score = max(score, 80)
150
 
151
+ score = int(max(0, min(score, 100)))
 
152
 
153
+ # Verdict
154
  if score >= 70:
155
  verdict = "🚨 Malicious"
156
  elif 50 <= score < 70:
 
161
  verdict = "βœ… Safe"
162
  findings.append("No strong phishing signals detected by models/heuristics.")
163
 
164
+ # Return exactly 4 values
165
+ return findings, score, highlighted_body, verdict