|
|
import re
|
|
|
import joblib
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
from urllib.parse import urlparse
|
|
|
from typing import Dict, Any
|
|
|
|
|
|
_SUSPICIOUS_TOKENS = [
|
|
|
"login", "verify", "secure", "update", "bank", "pay", "account", "webscr"
|
|
|
]
|
|
|
_IPV4_PATTERN = re.compile(r"(?:\d{1,3}\.){3}\d{1,3}")
|
|
|
|
|
|
_BRAND_NAMES = [
|
|
|
"facebook","paypal","google","amazon","apple","microsoft",
|
|
|
"instagram","netflix","bank","hsbc","linkedin","yahoo","outlook"
|
|
|
]
|
|
|
_SUSPICIOUS_TLDS = {"zip","xyz","top","ru","kim","support","ltd","work","gq","tk","ml"}
|
|
|
|
|
|
try:
|
|
|
from rapidfuzz import fuzz
|
|
|
def _sim(a: str, b: str) -> float:
|
|
|
return fuzz.ratio(a, b) / 100.0
|
|
|
except Exception:
|
|
|
import difflib
|
|
|
def _sim(a: str, b: str) -> float:
|
|
|
return difflib.SequenceMatcher(None, a, b).ratio()
|
|
|
|
|
|
|
|
|
def _ensure_scheme(u: str) -> str:
|
|
|
return u if re.match(r'^[a-zA-Z][a-zA-Z0-9+.\-]*://', u) else 'http://' + u
|
|
|
|
|
|
|
|
|
def _get_hostname(u: str) -> str:
|
|
|
try:
|
|
|
host = urlparse(_ensure_scheme(u)).hostname or ''
|
|
|
try:
|
|
|
host = host.encode('ascii').decode('idna')
|
|
|
except Exception:
|
|
|
pass
|
|
|
return host.lower()
|
|
|
except Exception:
|
|
|
return ''
|
|
|
|
|
|
|
|
|
def _get_sld(host: str) -> str:
|
|
|
parts = host.split('.')
|
|
|
if len(parts) >= 2:
|
|
|
return parts[-2]
|
|
|
return host
|
|
|
|
|
|
|
|
|
def _get_tld(host: str) -> str:
|
|
|
parts = host.split('.')
|
|
|
return parts[-1] if len(parts) >= 2 else ''
|
|
|
|
|
|
|
|
|
def _shannon_entropy(s: str) -> float:
|
|
|
if not s:
|
|
|
return 0.0
|
|
|
counts = {}
|
|
|
for ch in s:
|
|
|
counts[ch] = counts.get(ch, 0) + 1
|
|
|
probs = np.array(list(counts.values()), dtype=float)
|
|
|
probs /= probs.sum()
|
|
|
return float(-(probs * np.log2(probs)).sum())
|
|
|
|
|
|
|
|
|
def _clean_for_brand(s: str) -> str:
|
|
|
return re.sub(r'[^a-z]', '', re.sub(r'\d+', '', s.lower()))
|
|
|
|
|
|
|
|
|
def _engineer_features(url_series: pd.Series) -> pd.DataFrame:
|
|
|
s = url_series.astype(str)
|
|
|
out = pd.DataFrame(index=s.index)
|
|
|
|
|
|
|
|
|
out["url_len"] = s.str.len().fillna(0)
|
|
|
out["count_dot"] = s.str.count(r"\.")
|
|
|
out["count_hyphen"] = s.str.count("-")
|
|
|
out["count_digit"] = s.str.count(r"\d")
|
|
|
out["count_at"] = s.str.count("@")
|
|
|
out["count_qmark"] = s.str.count("\?")
|
|
|
out["count_eq"] = s.str.count("=")
|
|
|
out["count_slash"] = s.str.count("/")
|
|
|
out["digit_ratio"] = (out["count_digit"] / out["url_len"].replace(0, np.nan)).fillna(0)
|
|
|
out["has_ip"] = s.str.contains(_IPV4_PATTERN).astype(int)
|
|
|
for tok in _SUSPICIOUS_TOKENS:
|
|
|
out[f"has_{tok}"] = s.str.contains(tok, case=False, regex=False).astype(int)
|
|
|
out["starts_https"] = s.str.startswith("https").astype(int)
|
|
|
out["ends_with_exe"] = s.str.endswith(".exe").astype(int)
|
|
|
out["ends_with_zip"] = s.str.endswith(".zip").astype(int)
|
|
|
|
|
|
|
|
|
host = s.apply(_get_hostname)
|
|
|
sld = host.apply(_get_sld)
|
|
|
tld = host.apply(_get_tld)
|
|
|
|
|
|
out['host_len'] = host.str.len().fillna(0)
|
|
|
sub_count = host.str.count(r'\.') - 1
|
|
|
out['subdomain_count'] = sub_count.fillna(0).clip(lower=0).astype(int)
|
|
|
out['tld_suspicious'] = tld.isin(list(_SUSPICIOUS_TLDS)).astype(int)
|
|
|
out['has_punycode'] = host.str.contains('xn--', na=False).astype(int)
|
|
|
|
|
|
out['sld_len'] = sld.str.len().fillna(0)
|
|
|
sld_digit_count = sld.str.count(r'\d')
|
|
|
out['sld_digit_ratio'] = (sld_digit_count / out['sld_len'].replace(0, np.nan)).fillna(0)
|
|
|
out['sld_entropy'] = sld.apply(_shannon_entropy).astype(float)
|
|
|
|
|
|
|
|
|
sld_clean = sld.apply(_clean_for_brand)
|
|
|
|
|
|
def _max_brand_sim(name: str) -> float:
|
|
|
if not isinstance(name, str) or not name:
|
|
|
return 0.0
|
|
|
best = 0.0
|
|
|
for b in _BRAND_NAMES:
|
|
|
sc = _sim(name, b)
|
|
|
if sc > best:
|
|
|
best = sc
|
|
|
return float(best)
|
|
|
|
|
|
out['max_brand_sim'] = sld_clean.apply(_max_brand_sim).astype(float)
|
|
|
out['like_facebook'] = sld_clean.apply(lambda x: 1 if _sim(x, 'facebook') >= 0.82 else 0).astype(int)
|
|
|
|
|
|
OFFICIAL_DOMAINS = {
|
|
|
'facebook': ['facebook.com'],
|
|
|
'paypal': ['paypal.com'],
|
|
|
'google': ['google.com'],
|
|
|
'amazon': ['amazon.com'],
|
|
|
'apple': ['apple.com'],
|
|
|
'microsoft': ['microsoft.com'],
|
|
|
'instagram': ['instagram.com'],
|
|
|
'netflix': ['netflix.com'],
|
|
|
'hsbc': ['hsbc.com'],
|
|
|
'linkedin': ['linkedin.com'],
|
|
|
'yahoo': ['yahoo.com'],
|
|
|
'outlook': ['outlook.com']
|
|
|
}
|
|
|
|
|
|
def _normalize_leet(name: str) -> str:
|
|
|
if not isinstance(name, str):
|
|
|
return ''
|
|
|
table = str.maketrans({'0':'o','1':'l','3':'e','4':'a','5':'s','7':'t','2':'z','8':'b'})
|
|
|
return name.translate(table)
|
|
|
|
|
|
def _best_brand(name: str):
|
|
|
if not isinstance(name, str) or not name:
|
|
|
return '', 0.0
|
|
|
best_b, best_s = '', 0.0
|
|
|
for b in _BRAND_NAMES:
|
|
|
sc = _sim(name, b)
|
|
|
if sc > best_s:
|
|
|
best_b, best_s = b, sc
|
|
|
return best_b, float(best_s)
|
|
|
|
|
|
def _get_etld1(h: str) -> str:
|
|
|
parts = h.split('.') if isinstance(h, str) else []
|
|
|
if len(parts) >= 2:
|
|
|
return parts[-2] + '.' + parts[-1]
|
|
|
return h
|
|
|
|
|
|
etld1 = host.apply(_get_etld1)
|
|
|
brand_best_and_sim = sld_clean.apply(_best_brand)
|
|
|
brand_best = brand_best_and_sim.apply(lambda x: x[0])
|
|
|
brand_best_sim = brand_best_and_sim.apply(lambda x: x[1])
|
|
|
|
|
|
out['is_official_brand_domain'] = [
|
|
|
1 if bb and et in OFFICIAL_DOMAINS.get(bb, []) else 0
|
|
|
for bb, et in zip(brand_best, etld1)
|
|
|
]
|
|
|
|
|
|
out['brand_digit_insertion'] = ((sld_clean == brand_best) & (sld.str.contains(r'\d'))).astype(int)
|
|
|
|
|
|
sld_leet_norm = sld.apply(_normalize_leet).apply(_clean_for_brand)
|
|
|
def _max_brand_sim_leet(name: str) -> float:
|
|
|
if not isinstance(name, str) or not name:
|
|
|
return 0.0
|
|
|
best = 0.0
|
|
|
for b in _BRAND_NAMES:
|
|
|
sc = _sim(name, b)
|
|
|
if sc > best:
|
|
|
best = sc
|
|
|
return float(best)
|
|
|
out['max_brand_sim_leet'] = sld_leet_norm.apply(_max_brand_sim_leet).astype(float)
|
|
|
out['like_brand_leet'] = (out['max_brand_sim_leet'] >= 0.88).astype(int)
|
|
|
|
|
|
def _contains_brand_extra(name: str) -> int:
|
|
|
if not isinstance(name, str) or not name:
|
|
|
return 0
|
|
|
for b in _BRAND_NAMES:
|
|
|
if name != b and b in name:
|
|
|
return 1
|
|
|
return 0
|
|
|
out['sld_contains_brand_extra'] = sld_clean.apply(_contains_brand_extra).astype(int)
|
|
|
|
|
|
out['brand_impersonation'] = (
|
|
|
((brand_best_sim >= 0.88) | (out['like_brand_leet'] == 1) | (out['sld_contains_brand_extra'] == 1))
|
|
|
& (out['is_official_brand_domain'] == 0)
|
|
|
).astype(int)
|
|
|
|
|
|
out['sld_has_hyphen'] = sld.str.contains('-', na=False).astype(int)
|
|
|
out['sld_has_digits'] = (sld.str.count(r'\d') > 0).astype(int)
|
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
def load_bundle(path: str) -> Dict[str, Any]:
|
|
|
"""Load the saved joblib bundle produced by the notebook.
|
|
|
|
|
|
Returns a dict with keys: model, feature_cols, url_col, label_col, model_type
|
|
|
"""
|
|
|
bundle = joblib.load(path)
|
|
|
required = {"model", "feature_cols", "url_col", "label_col", "model_type"}
|
|
|
missing = required - set(bundle.keys())
|
|
|
if missing:
|
|
|
raise ValueError(f"Bundle missing keys: {missing}")
|
|
|
return bundle
|
|
|
|
|
|
|
|
|
def predict_url(url: str, bundle: Dict[str, Any], threshold: float = 0.5) -> Dict[str, Any]:
|
|
|
"""Predict phishing probability for a single URL using the saved bundle.
|
|
|
|
|
|
Applies a rule-based typosquatting guard to catch cases like face123book.com
|
|
|
even if the model probability is low.
|
|
|
"""
|
|
|
url_col = bundle["url_col"]
|
|
|
feature_cols = bundle["feature_cols"]
|
|
|
trained_feature_cols = bundle.get("trained_feature_cols")
|
|
|
model_type = bundle.get("model_type", "xgboost_bst")
|
|
|
model = bundle["model"]
|
|
|
|
|
|
row = pd.DataFrame({url_col: [url]})
|
|
|
feats_full = _engineer_features(row[url_col])
|
|
|
desired_cols = list(trained_feature_cols) if trained_feature_cols is not None else list(feature_cols)
|
|
|
feats = feats_full.reindex(columns=desired_cols, fill_value=0)
|
|
|
|
|
|
if model_type == "xgboost_bst":
|
|
|
import xgboost as xgb
|
|
|
dmat = xgb.DMatrix(feats)
|
|
|
proba = float(model.predict(dmat)[0])
|
|
|
elif model_type == "cuml_rf":
|
|
|
try:
|
|
|
import cudf
|
|
|
gfeats = cudf.DataFrame.from_pandas(feats)
|
|
|
proba = float(model.predict_proba(gfeats)[:, 1].to_pandas().values[0])
|
|
|
except Exception as e:
|
|
|
raise RuntimeError("cudf/cuml required for this bundle but not available") from e
|
|
|
else:
|
|
|
proba = float(model.predict_proba(feats)[:, 1][0])
|
|
|
|
|
|
|
|
|
def _bool(feature: str, default: int = 0) -> int:
|
|
|
return int(feature in feats_full.columns and bool(feats_full.iloc[0].get(feature, default)))
|
|
|
|
|
|
def _float(feature: str, default: float = 0.0) -> float:
|
|
|
return float(feats_full.iloc[0].get(feature, default)) if feature in feats_full.columns else default
|
|
|
|
|
|
like_brand = (
|
|
|
_bool('brand_impersonation') == 1 or
|
|
|
_bool('like_brand_leet') == 1 or
|
|
|
_float('max_brand_sim_leet') >= 0.90 or
|
|
|
_float('max_brand_sim') >= 0.90 or
|
|
|
_bool('sld_contains_brand_extra') == 1
|
|
|
)
|
|
|
risky_host = (
|
|
|
_bool('is_official_brand_domain') == 0 and
|
|
|
(
|
|
|
_bool('sld_has_digits') == 1 or
|
|
|
_bool('sld_has_hyphen') == 1 or
|
|
|
_bool('tld_suspicious') == 1 or
|
|
|
_bool('has_punycode') == 1
|
|
|
)
|
|
|
)
|
|
|
rule_triggered = bool(like_brand and risky_host)
|
|
|
|
|
|
pred = int(proba >= threshold)
|
|
|
if rule_triggered and pred == 0:
|
|
|
pred = 1
|
|
|
proba = max(proba, 0.9)
|
|
|
|
|
|
result = {
|
|
|
"url": url,
|
|
|
"phishing_probability": proba,
|
|
|
"predicted_label": pred,
|
|
|
"backend": model_type,
|
|
|
}
|
|
|
if rule_triggered:
|
|
|
result["rule"] = "typosquat_guard"
|
|
|
return result
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
try:
|
|
|
bundle = load_bundle("models/rf_url_phishing_xgboost_bst.joblib")
|
|
|
print(
|
|
|
predict_url(
|
|
|
"www.face123book.com",
|
|
|
bundle=bundle,
|
|
|
)
|
|
|
)
|
|
|
except FileNotFoundError:
|
|
|
print("Bundle not found in current directory. This is expected inside the source repo.")
|
|
|
|
|
|
|
|
|
|