kh03's picture
Upload 7 files
8d70efc verified
import os
import re
import socket
import json
import traceback
import math
import logging
from datetime import datetime, timedelta
from urllib.parse import urlparse
from collections import Counter
import requests
import numpy as np
import tensorflow as tf
import pickle
import h5py # For working with H5 files
from flask import Flask, jsonify, request, render_template, session, flash, redirect, url_for, send_file
from werkzeug.middleware.proxy_fix import ProxyFix
import ssl
from sklearn.preprocessing import StandardScaler
from typing import Dict, List, Tuple, Optional, Union, Any
from difflib import SequenceMatcher
import sys
import flask
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Try to import whois for domain registration data
try:
import whois
whois_available = True
logger.info("python-whois is available for domain registration checks")
except ImportError:
whois_available = False
logger.warning("python-whois not available, domain age features will be limited")
# Import model service - using direct path instead of package import
import sys
import os.path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from model_service import get_model, get_scaler, get_status, predict
# Add Beautiful Soup import
try:
from bs4 import BeautifulSoup
BeautifulSoup_available = True
logger.info("BeautifulSoup is available for HTML analysis")
except ImportError:
BeautifulSoup_available = False
logger.warning("BeautifulSoup not available, HTML security checks will be limited")
BeautifulSoup = None
# Initialize Flask app
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1)
app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'default-secret-key')
# Global variables for model and scaler access
def get_model_instance():
return get_model()
def get_scaler_instance():
return get_scaler()
def is_ip(domain):
"""
Check if the domain is an IP address
Args:
domain (str): Domain to check
Returns:
bool: True if the domain is an IP address, False otherwise
"""
# IPv4 pattern
pattern = r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$"
match = re.match(pattern, domain)
if not match:
return False
# Check that each octet is valid (0-255)
for i in range(1, 5):
octet = int(match.group(i))
if octet < 0 or octet > 255:
return False
return True
def calculate_entropy(string):
"""
Calculate the Shannon entropy of a string to measure randomness
Args:
string (str): Input string
Returns:
float: Shannon entropy value
"""
if not string:
return 0
# Count character occurrences
counts = Counter(string)
# Calculate frequencies
frequencies = [count/len(string) for count in counts.values()]
# Calculate entropy
entropy = -sum(f * math.log2(f) for f in frequencies)
return entropy
def check_suspicious_patterns(url):
"""Check for suspicious patterns in a URL that may indicate phishing"""
suspicious_patterns = []
try:
# Parse URL
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
path = parsed_url.path.lower()
query = parsed_url.query.lower()
# Check for HTTP instead of HTTPS
if parsed_url.scheme == 'http':
suspicious_patterns.append({
"pattern": "Insecure HTTP protocol",
"severity": "high",
"explanation": "The site uses HTTP instead of HTTPS, which means the connection is not encrypted.",
"risk_score": 15
})
# Check for suspicious TLDs
suspicious_tlds = ['tk', 'ml', 'ga', 'cf', 'gq', 'top', 'xyz', 'online', 'site', 'club', 'icu', 'pw', 'rest', 'zip']
tld = domain.split('.')[-1] if '.' in domain else ''
if tld in suspicious_tlds:
suspicious_patterns.append({
"pattern": f"Suspicious TLD: '{tld}'",
"severity": "medium",
"explanation": f"The domain uses a TLD ('{tld}') that is commonly associated with free domains and frequently used in phishing attacks.",
"risk_score": 10
})
# Check for numeric subdomain or long subdomain
subdomain_parts = domain.split('.')
if len(subdomain_parts) > 2:
subdomain = '.'.join(subdomain_parts[:-2])
if subdomain.isdigit() or re.match(r'^\d+-\d+-\d+', subdomain):
suspicious_patterns.append({
"pattern": "Numeric subdomain pattern",
"severity": "medium",
"explanation": "The URL uses a numeric pattern in the subdomain, which is often seen in automatically generated phishing domains.",
"risk_score": 10
})
elif len(subdomain) > 20:
suspicious_patterns.append({
"pattern": "Unusually long subdomain",
"severity": "medium",
"explanation": "The subdomain is unusually long, which is often a characteristic of phishing URLs trying to obscure their true nature.",
"risk_score": 5
})
# Check for URL shortening services
shortening_services = ['bit.ly', 'tinyurl.com', 'goo.gl', 't.co', 'is.gd',
'buff.ly', 'ow.ly', 'rebrand.ly', 'tr.im']
# Modified check to prevent false positives
is_shortener = False
domain_parts = domain.split('.')
base_domain = '.'.join(domain_parts[-2:]) if len(domain_parts) > 1 else domain
# First check exact domain match
if any(base_domain == service for service in shortening_services):
is_shortener = True
# Then check subdomain match (e.g., sub.bit.ly)
elif any(domain.endswith('.' + service) for service in shortening_services):
is_shortener = True
if is_shortener:
suspicious_patterns.append({
"pattern": "URL shortening service",
"severity": "medium",
"explanation": "The URL uses a shortening service, which can hide the actual destination.",
"risk_score": 8
})
# Check for suspicious words in URL
suspicious_words = ['login', 'signin', 'verify', 'secure', 'account', 'update', 'confirm',
'password', 'credential', 'wallet', 'authenticate', 'verification',
'banking', 'security', 'alert', 'suspended', 'unusual']
found_words = [word for word in suspicious_words if word in url.lower()]
if found_words:
words_str = ', '.join(found_words)
suspicious_patterns.append({
"pattern": f"Suspicious keywords: {words_str}",
"severity": "medium",
"explanation": f"The URL contains words often associated with phishing attempts that try to create urgency or request credentials.",
"risk_score": 12
})
# Check for IP address as domain
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', domain):
suspicious_patterns.append({
"pattern": "IP address used as domain",
"severity": "high",
"explanation": "The URL uses an IP address instead of a domain name, which is rarely done for legitimate websites and often indicates phishing.",
"risk_score": 25
})
# Check for excessive number of dots in domain
if domain.count('.') > 3:
suspicious_patterns.append({
"pattern": "Excessive subdomains",
"severity": "medium",
"explanation": "The URL contains an unusually high number of subdomains, which can be an attempt to confuse users.",
"risk_score": 8
})
# Check for excessive URL length
if len(url) > 100:
suspicious_patterns.append({
"pattern": "Excessively long URL",
"severity": "medium",
"explanation": "The URL is unusually long, which can be an attempt to hide suspicious elements.",
"risk_score": 5
})
# Check for presence of @ symbol in URL
if '@' in url:
suspicious_patterns.append({
"pattern": "@ symbol in URL",
"severity": "high",
"explanation": "The URL contains an @ symbol, which can be used to trick users by hiding the actual destination.",
"risk_score": 20
})
# Check for excessive number of special characters
special_char_count = sum(c in '!@#$%^&*()_+-={}[]|\\:;"\'<>,.?/' for c in url)
if special_char_count > 15:
suspicious_patterns.append({
"pattern": "Excessive special characters",
"severity": "medium",
"explanation": "The URL contains an unusually high number of special characters, which can be an attempt to obfuscate malicious content.",
"risk_score": 10
})
# If no patterns were found but domain can't be resolved
if not suspicious_patterns:
try:
socket.gethostbyname(domain)
except:
suspicious_patterns.append({
"pattern": "Domain does not resolve",
"severity": "high",
"explanation": "The domain cannot be resolved to an IP address, which means it may not exist or may be newly registered for phishing.",
"risk_score": 20
})
logger.info(f"Suspicious patterns found for {url}: {len(suspicious_patterns)}")
return suspicious_patterns
except Exception as e:
logger.error(f"Error checking suspicious patterns: {e}")
return []
def rule_based_prediction(url, scaled_features=None):
"""
Rule-based prediction when model is unavailable
Args:
url: URL to analyze
scaled_features: Optional feature array
Returns:
float: Risk score (0-100)
"""
try:
# Parse the URL
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
path = parsed_url.path.lower()
# Initialize risk score
risk_score = 0
risk_factors = {}
# 1. Basic protocol check (part of URL features - 40%)
if parsed_url.scheme != 'https':
risk_score += 20
risk_factors["insecure_protocol"] = {
"description": "The site uses HTTP instead of HTTPS",
"impact": "high",
"contribution": 20
}
# 2. Domain-based checks (part of URL features - 40%)
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', domain):
# IP address as domain
risk_score += 25
risk_factors["ip_as_domain"] = {
"description": "IP address used as domain instead of a domain name",
"impact": "high",
"contribution": 25
}
# Check for suspicious TLDs
suspicious_tlds = ['tk', 'ml', 'ga', 'cf', 'gq', 'top', 'xyz', 'online', 'site']
tld = domain.split('.')[-1] if '.' in domain else ''
if tld in suspicious_tlds:
risk_score += 15
risk_factors["suspicious_tld"] = {
"description": f"Domain uses suspicious TLD (.{tld})",
"impact": "medium",
"contribution": 15
}
# Check domain length
if len(domain) > 30:
risk_score += 10
risk_factors["long_domain"] = {
"description": "Unusually long domain name",
"impact": "medium",
"contribution": 10
}
# Check for excessive subdomains
if domain.count('.') > 3:
risk_score += 15
risk_factors["excessive_subdomains"] = {
"description": f"Domain has {domain.count('.')} subdomains",
"impact": "medium",
"contribution": 15
}
# 3. URL structure checks (part of URL features - 40%)
if len(url) > 100:
risk_score += 10
risk_factors["long_url"] = {
"description": "Excessively long URL",
"impact": "medium",
"contribution": 10
}
# Check for suspicious keywords
suspicious_words = ['login', 'signin', 'verify', 'secure', 'account', 'update', 'confirm',
'password', 'credential', 'wallet', 'authenticate', 'verification']
keyword_count = 0
for word in suspicious_words:
if word in url.lower():
keyword_count += 1
risk_score += 5
# Cap keyword penalty at 30
if risk_score > 30:
break
if keyword_count > 0:
risk_factors["suspicious_keywords"] = {
"description": f"URL contains {keyword_count} suspicious keywords",
"impact": "medium",
"contribution": min(keyword_count * 5, 30)
}
# Check special characters
special_char_count = sum(c in '!@#$%^&*()_+-={}[]|\\:;"\'<>,.?/' for c in url)
risk_score += min(special_char_count, 15)
if special_char_count > 5:
risk_factors["special_chars"] = {
"description": f"URL contains {special_char_count} special characters",
"impact": "low" if special_char_count < 10 else "medium",
"contribution": min(special_char_count, 15)
}
# Check for URL shortening services
shortening_services = ['bit.ly', 'tinyurl.com', 'goo.gl', 't.co', 'is.gd']
if any(service in domain for service in shortening_services):
risk_score += 15
risk_factors["url_shortener"] = {
"description": "Uses URL shortening service",
"impact": "medium",
"contribution": 15
}
# 4. Check if trusted domain
if is_trusted_domain(url):
risk_score = max(0, risk_score - 40) # Significant reduction for trusted domains
risk_factors["trusted_domain"] = {
"description": "Domain is in trusted list",
"impact": "positive",
"contribution": -40
}
# 5. Add results from suspicious patterns check (30%)
suspicious_patterns = check_suspicious_patterns(url)
pattern_risk = sum(p.get("risk_score", 0) for p in suspicious_patterns)
risk_score += pattern_risk
if pattern_risk > 0:
risk_factors["suspicious_patterns"] = {
"description": f"Found {len(suspicious_patterns)} suspicious patterns",
"impact": "high" if pattern_risk > 20 else "medium",
"contribution": pattern_risk
}
# 6. Try to resolve domain (part of domain information - 10%)
domain_info = get_domain_info(url)
domain_penalty = 0
if domain_info.get("ip_address") == "Could not resolve":
# Domain cannot be resolved, apply significant penalty
domain_penalty = 10 # 10% of total score as penalty
risk_score += domain_penalty
risk_factors["unresolvable_domain"] = {
"description": "Domain could not be resolved to an IP address",
"impact": "high",
"contribution": domain_penalty
}
else:
# Check country risk if domain could be resolved
high_risk_countries = ["RU", "CN", "IR", "KP", "NG"]
country = domain_info.get("country", "Unknown")
if country in high_risk_countries:
country_penalty = 5
risk_score += country_penalty
risk_factors["high_risk_country"] = {
"description": f"Domain hosted in high-risk country ({country})",
"impact": "medium",
"contribution": country_penalty
}
# 7. Consider HTML content risk if available (20%)
try:
html_security = check_html_security(url)
html_risk = html_security.get("content_score", 0) / 5 # Scale down from 0-100 to 0-20
risk_score += html_risk
if html_risk > 0:
risk_factors["html_content"] = {
"description": f"HTML content has suspicious elements",
"impact": "high" if html_risk > 10 else "medium",
"contribution": html_risk
}
except Exception as e:
logger.error(f"Error checking HTML security: {e}")
# Ensure final score is within 0-100 range
final_score = max(0, min(100, risk_score))
# Create the result dictionary
result = {
"status": "success",
"url": url,
"score": final_score,
"risk_level": get_risk_level(final_score),
"risk_factors": risk_factors,
"using_fallback": True,
"domain_info": domain_info,
"suspicious_patterns": suspicious_patterns
}
return result
except Exception as e:
logger.error(f"Error in rule_based_prediction: {e}")
# Default moderate risk on error
return {
"status": "error",
"url": url,
"score": 50, # Default moderate risk
"risk_level": get_risk_level(50),
"using_fallback": True,
"error": str(e)
}
def is_trusted_domain(url):
"""
Check if a URL belongs to a trusted domain
Args:
url (str): URL to check
Returns:
bool: True if the domain is trusted, False otherwise
"""
try:
# Parse the URL to extract the domain
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
# Remove www. prefix if present
if domain.startswith('www.'):
domain = domain[4:]
# List of trusted domains
trusted_domains = [
'google.com', 'gmail.com', 'youtube.com',
'facebook.com', 'instagram.com', 'twitter.com', 'x.com',
'microsoft.com', 'office.com', 'outlook.com', 'linkedin.com',
'apple.com', 'icloud.com', 'amazon.com', 'paypal.com',
'github.com', 'dropbox.com', 'netflix.com', 'spotify.com',
'wikipedia.org', 'adobe.com', 'cloudflare.com',
'wordpress.com', 'yahoo.com', 'twitch.tv',
'reddit.com', 'pinterest.com', 'ebay.com',
'zoom.us', 'slack.com', 'shopify.com'
]
# Check if domain ends with any trusted domain
return any(domain == td or domain.endswith('.' + td) for td in trusted_domains)
except Exception as e:
logger.error(f"Error in is_trusted_domain: {e}")
return False
# Create a custom InputLayer that can handle batch_shape
class CompatibleInputLayer(tf.keras.layers.InputLayer):
def __init__(self, **kwargs):
# Handle the batch_shape case
if 'batch_shape' in kwargs:
input_shape = kwargs.pop('batch_shape')
if input_shape is not None and len(input_shape) > 1:
kwargs['input_shape'] = input_shape[1:]
super().__init__(**kwargs)
def tld_risk_score(tld: str) -> float:
"""
Calculate risk score for top-level domains.
Some TLDs are more associated with fraudulent activity than others.
Args:
tld: Top-level domain (e.g., 'com', 'org')
Returns:
float: Risk score between 0 and 1
"""
risky_tlds = {
'xyz': 0.7, 'top': 0.65, 'loan': 0.85, 'bid': 0.8,
'online': 0.75, 'site': 0.7, 'club': 0.65, 'stream': 0.8,
'icu': 0.75, 'live': 0.6, 'vip': 0.7, 'fit': 0.6,
'tk': 0.8, 'ml': 0.75, 'ga': 0.75, 'cf': 0.7
}
return risky_tlds.get(tld.lower(), 0.2)
def extract_features(url: str):
"""
Extract features from a URL for machine learning prediction
Args:
url: URL to analyze
Returns:
tuple: (feature_dict, feature_array)
"""
logger.info(f"Extracting features for URL: {url}")
try:
# Parse the URL
parsed_url = urlparse(url)
# Basic URL components
domain = parsed_url.netloc.lower()
path = parsed_url.path.lower()
query = parsed_url.query.lower()
fragment = parsed_url.fragment.lower()
# Basic Feature extraction (original features)
# Length-based features
url_length = len(url)
domain_length = len(domain)
path_length = len(path)
query_length = len(query)
fragment_length = len(fragment)
# Domain-based features
subdomain_count = domain.count('.') - 1 if '.' in domain else 0
subdomain_count = max(0, subdomain_count) # Ensure non-negative
# Path-based features
path_depth = path.count('/') if path else 0
# Get TLD risk score
tld = domain.split('.')[-1] if '.' in domain else ''
tld_score = tld_risk_score(tld)
# Calculate entropy as a measure of randomness
domain_entropy = calculate_entropy(domain)
# Security features
https_present = 1 if parsed_url.scheme == 'https' else 0
# Character-based features
special_char_count = sum(c in '!@#$%^&*()_+-={}[]|\\:;"\'<>,.?/' for c in url)
digit_count = sum(c.isdigit() for c in url)
letter_count = sum(c.isalpha() for c in url)
digit_percentage = (digit_count / len(url)) * 100 if len(url) > 0 else 0
letter_percentage = (letter_count / len(url)) * 100 if len(url) > 0 else 0
# Check if path is all numeric
numeric_path = 1 if path and all(c.isdigit() or c == '/' for c in path) else 0
# Suspicious patterns
ip_url = 1 if re.match(r'\d+\.\d+\.\d+\.\d+', domain) else 0
# Looking for suspicious keywords
suspicious_keywords = ['login', 'signin', 'account', 'secure', 'update', 'verify',
'confirm', 'banking', 'payment', 'wallet', 'ebay', 'paypal']
keyword_count = sum(1 for keyword in suspicious_keywords if keyword in url.lower())
# Create a dictionary of basic feature names and values
basic_features = {
"url_length": url_length,
"domain_length": domain_length,
"path_length": path_length,
"query_length": query_length,
"fragment_length": fragment_length,
"subdomain_count": subdomain_count,
"path_depth": path_depth,
"tld_score": tld_score,
"domain_entropy": domain_entropy,
"https_present": https_present,
"special_char_count": special_char_count,
"digit_percentage": digit_percentage,
"letter_percentage": letter_percentage,
"numeric_path": numeric_path,
"ip_url": ip_url,
"keyword_count": keyword_count
}
# Get domain information for additional features
domain_info = get_domain_info(url)
ip_address = domain_info.get("ip_address", "Unknown")
# NEW: Extract enhanced features
whois_features = extract_whois_features(domain)
nlp_features = extract_nlp_features(domain)
reputation_features = extract_reputation_features(domain, ip_address)
# Try to get content features - might fail if site is down
content_features = {}
html_security = {}
try:
# Reuse HTML content if we can get it once
response = requests.get(url, timeout=10,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})
html_content = response.text
# Extract content features from the HTML
content_features = extract_content_features(url, html_content)
# Get HTML security data
if BeautifulSoup_available:
html_security_data = check_html_security(url, html_content)
html_security = {
"security_score": html_security_data.get("content_score", 0),
"risk_factor_count": len(html_security_data.get("risk_factors", [])),
"has_password_field": 1 if any("password" in rf.lower() for rf in html_security_data.get("risk_factors", [])) else 0,
"has_obfuscated_js": 1 if any("obfuscated" in rf.lower() for rf in html_security_data.get("risk_factors", [])) else 0
}
except Exception as content_error:
logger.warning(f"Could not extract content features: {content_error}")
content_features = extract_content_features(url) # Empty defaults
html_security = {"security_score": 0, "risk_factor_count": 0, "has_password_field": 0, "has_obfuscated_js": 0}
# Try to get certificate transparency log features
ct_features = extract_ct_log_features(domain)
# Combine all features into a single dictionary
all_features = {**basic_features}
# Add new feature groups with prefixes to avoid name collisions
for key, value in whois_features.items():
all_features[f"whois_{key}"] = value
for key, value in nlp_features.items():
all_features[f"nlp_{key}"] = value
for key, value in reputation_features.items():
all_features[f"rep_{key}"] = value
for key, value in content_features.items():
all_features[f"content_{key}"] = value
for key, value in html_security.items():
all_features[f"html_{key}"] = value
for key, value in ct_features.items():
all_features[f"ct_{key}"] = value
# Add additional domain info features
all_features["geo_suspicious_country"] = 1 if domain_info.get("country") in ["RU", "CN", "IR", "KP"] else 0
# Convert feature dictionary to array for the model
# Extract values in a stable order for the model
basic_feature_values = list(basic_features.values())
# Create a list of values for the additional features
additional_values = []
for key in sorted(all_features.keys()):
if key not in basic_features:
additional_values.append(all_features[key])
# Full feature array - basic features plus new features
full_features = basic_feature_values + additional_values
# Convert to numpy array
base_features = np.array(full_features, dtype=np.float32)
# Pad to expected size for model compatibility (should be 96 for your model)
# Adjust padding as needed based on your model's expectations
padding_size = max(0, 96 - len(full_features))
if padding_size > 0:
padding = np.zeros(padding_size, dtype=np.float32)
feature_array = np.concatenate([base_features, padding])
else:
# If we have more features than expected, truncate to 96
feature_array = base_features[:96]
# Log feature count
logger.info(f"Extracted {len(full_features)} features, adjusted to {len(feature_array)} for model compatibility")
return all_features, feature_array
except Exception as e:
logger.error(f"Error extracting features: {e}")
logger.error(traceback.format_exc())
# Return default values in case of error
feature_dict = {"error": str(e)}
feature_array = np.zeros(96, dtype=np.float32)
return feature_dict, feature_array
def check_html_security(url, html_content=None):
"""
Check HTML content for suspicious or malicious patterns
Args:
url: URL to analyze
html_content: Optional pre-fetched HTML content
Returns:
dict: Dictionary with security information
"""
if not BeautifulSoup_available:
return {
"error": "BeautifulSoup not available",
"content_score": 0,
"risk_factors": ["Unable to analyze HTML content - BeautifulSoup not installed"]
}
try:
# Get the HTML content if not provided)
if html_content is None:
response = requests.get(url, timeout=10,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})
html_content = response.text
# Parse HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Initialize security data
security_data = {
"content_score": 0, # 0-100 scale, higher means more risky
"risk_factors": [], # List of risk factors found
"security_checks": [] # List of security checks passed
}
# Check 1: Forms without HTTPS action
forms = soup.find_all("form")
insecure_forms = [f for f in forms if f.get('action') and f.get('action').startswith('http://')]
if insecure_forms:
security_data["content_score"] += 30
security_data["risk_factors"].append(f"Found {len(insecure_forms)} form(s) submitting to insecure HTTP")
# Check 2: Password inputs
password_inputs = soup.find_all("input", {"type": "password"})
if password_inputs:
security_data["content_score"] += 15
security_data["risk_factors"].append(f"Found {len(password_inputs)} password input(s)")
# Check if password input is in an insecure form
for p_input in password_inputs:
parent_form = p_input.find_parent("form")
if parent_form and parent_form.get('action') and parent_form.get('action').startswith('http://'):
security_data["content_score"] += 25
security_data["risk_factors"].append("Password being submitted over insecure HTTP")
break
# Check 3: Hidden inputs with suspicious names
suspicious_hidden = soup.find_all("input", {"type": "hidden", "name": re.compile(r'user|email|account|pass|auth|token|id|login', re.I)})
if suspicious_hidden:
security_data["content_score"] += 10
security_data["risk_factors"].append(f"Found {len(suspicious_hidden)} hidden fields with suspicious names")
# Check 4: Scripts with suspicious URLs or obfuscated code
scripts = soup.find_all("script")
obfuscated_scripts = 0
suspicious_urls = 0
for script in scripts:
if script.string:
# Check for obfuscated code patterns
if re.search(r'eval\(', script.string) or re.search(r'\\x[0-9a-f]{2}', script.string):
obfuscated_scripts += 1
# Check for suspicious URLs in scripts
if re.search(r'(https?://[^\'"]+\.(xyz|tk|ml|ga|cf|gq|top))', script.string):
suspicious_urls += 1
if obfuscated_scripts > 0:
security_data["content_score"] += 20
security_data["risk_factors"].append(f"Found {obfuscated_scripts} script(s) with potentially obfuscated code")
if suspicious_urls > 0:
security_data["content_score"] += 15
security_data["risk_factors"].append(f"Found {suspicious_urls} script(s) with suspicious URLs")
# Check 5: Excessive use of iframes
iframes = soup.find_all("iframe")
if len(iframes) > 3:
security_data["content_score"] += 10
security_data["risk_factors"].append(f"Excessive use of iframes ({len(iframes)} found)")
# Add passed security checks
if not insecure_forms:
security_data["security_checks"].append("No insecure forms found")
if not password_inputs:
security_data["security_checks"].append("No password inputs found")
if security_data["content_score"] < 20:
security_data["security_checks"].append("Low-risk HTML content")
# Add HTTPS security check if URL uses HTTPS
if url.startswith("https://"):
security_data["security_checks"].append("HTTPS protocol used")
return security_data
except Exception as e:
logger.error(f"Error checking HTML security: {e}")
return {
"error": str(e),
"content_score": 0,
"risk_factors": [f"Error analyzing HTML content: {str(e)}"]
}
def predict_with_model(url, features=None):
"""
Make a prediction using the loaded model.
If model is not available, falls back to rule-based prediction.
Args:
url: URL to predict
features: Optional pre-computed features
Returns:
dict: Prediction result with risk score and details
"""
try:
logger.info(f"Making prediction for URL: {url}")
# Extract features if not provided
if features is None:
logger.info("No features provided, extracting new features")
features, feature_vector = extract_features(url)
else:
logger.info(f"Features provided, type: {type(features)}")
# The feature parameter might be just the dictionary without the feature_vector
# Always re-extract to get the proper numpy array
_, feature_vector = extract_features(url)
# Initialize response
result = {
"status": "success",
"url": url,
"score": 0,
"risk_level": "Unknown",
"feature_contributions": [],
"risk_factors": {},
"domain_info": {},
"suspicious_patterns": []
}
# Check if model is available
if get_model_instance() is not None and get_scaler_instance() is not None:
try:
# Ensure feature_vector is a numpy array before reshaping
if not isinstance(feature_vector, np.ndarray):
logger.error(f"feature_vector is not a numpy array: {type(feature_vector)}")
# Fall back to rule-based prediction
return rule_based_prediction(url, features)
# Prepare feature vector for prediction
features_reshaped = feature_vector.reshape(1, -1)
logger.info(f"Feature shape: {features_reshaped.shape}")
# Scale features if scaler is available
scaled_features = get_scaler_instance().transform(features_reshaped)
# Make prediction
prediction = get_model_instance().predict(scaled_features)
raw_score = float(prediction[0][0]) if hasattr(prediction, 'shape') else float(prediction)
score = raw_score * 100 # Convert to percentage
logger.info(f"Model prediction raw score: {raw_score}, scaled: {score}")
# Set result fields
result["score"] = score
result["raw_score"] = raw_score
result["risk_level"] = get_risk_level(score)
# Handle unresolvable domains - apply domain information penalty (10% of total score)
domain_info = get_domain_info(url)
if domain_info.get("ip_address") == "Could not resolve":
# Apply domain information penalty (add up to 10 points to the risk score)
domain_penalty = 10.0 # Maximum penalty for unresolvable domains (10% of total score)
original_score = score
score = min(100, score + domain_penalty) # Cap at 100
result["score"] = score
logger.info(f"Domain could not be resolved, applying penalty: {original_score} -> {score}")
# Add a risk factor for unresolvable domain
if "risk_factors" not in result:
result["risk_factors"] = {}
result["risk_factors"]["unresolvable_domain"] = {
"description": "Domain could not be resolved to an IP address",
"impact": "high",
"contribution": domain_penalty
}
# Feature name mapping to user-friendly names
feature_name_map = {
"url_length": "URL Length",
"domain_length": "Domain Length",
"path_length": "Path Length",
"query_length": "Query Parameters Length",
"fragment_length": "Fragment Length",
"subdomain_count": "Number of Subdomains",
"path_depth": "Path Depth",
"tld_score": "Risky TLD Score",
"domain_entropy": "Domain Entropy",
"https_present": "Security Weights",
"special_char_count": "Special Characters",
"digit_percentage": "Digit Percentage",
"letter_percentage": "Letter Percentage",
"numeric_path": "Numeric Path Present",
"ip_url": "IP as Domain",
"keyword_count": "Suspicious Keywords",
# Content feature friendly names
"content_page_size_bytes": "Page Size",
"content_external_resources_count": "External Resource Count",
"content_form_count": "Form Count",
"content_password_field_count": "Password Fields",
"content_js_to_html_ratio": "JavaScript to HTML Ratio",
"content_title_brand_mismatch": "Title-Domain Mismatch",
"content_favicon_exists": "Favicon Present",
"content_similar_domain_redirect": "Similar Domain Redirect",
# HTML security feature friendly names
"html_security_score": "HTML Security Score",
"html_risk_factor_count": "Security Risk Factor Count",
"html_has_password_field": "Contains Password Field",
"html_has_obfuscated_js": "Contains Obfuscated JavaScript",
# SSL certificate feature friendly names
"ct_suspicious_cert_pattern": "Suspicious Certificate Pattern",
# Geographic feature friendly names
"geo_suspicious_country": "Suspicious Country"
}
# Add feature contributions
result["feature_contributions"] = []
if isinstance(features, dict):
for name, value in features.items():
if name != "error":
# Estimate contribution based on feature value and type
contribution = 0.0
section = "Key Risk Factors" # Default section
# ====== Core URL & Domain Features (High Impact) ======
if name == "url_length" and value > 50:
contribution = 0.1 * (value / 100)
section = "Key Risk Factors"
elif name == "domain_length" and value > 15:
contribution = 0.15 * (value / 30)
section = "Key Risk Factors"
elif name == "domain_entropy" and value > 0:
contribution = 0.1 * min(value / 3.0, 1.0)
section = "Key Risk Factors"
elif name == "special_char_count" and value > 3:
contribution = 0.1 * (value / 10)
section = "Key Risk Factors"
elif name == "tld_score" and value > 0:
contribution = 0.15 * value / 0.5 # Scale based on value
section = "Key Risk Factors"
elif name == "https_present" and value < 1:
contribution = 24.6 # Fixed percentage for consistency
section = "Key Risk Factors"
# ====== Domain Reputation & WHOIS Features (Important) ======
elif name == "rep_domain_age_category" and value < 2:
contribution = 0.15 * (2 - value) / 2 # Newer domains are riskier
section = "Domain Information"
elif name == "rep_suspicious_tld_category" and value > 0:
contribution = 0.15 * value # TLD category risk
section = "Domain Information"
elif name == "rep_suspicious_country" and value > 0:
contribution = 0.15 # Suspicious country
section = "Domain Information"
elif name == "whois_recently_registered" and value > 0:
contribution = 0.2 # Recently registered domains are highly suspicious
section = "Domain Information"
# ====== Critical HTML Content Features (Highest Impact) ======
elif name == "content_form_count" and value > 0:
contribution = 0.15 * min(value / 2, 1.0) # Forms are key phishing indicators
section = "Suspicious Patterns"
elif name == "content_password_field_count" and value > 0:
contribution = 0.3 * min(value / 2.0, 1.0) # Password fields are critical for phishing
section = "Suspicious Patterns"
elif name == "content_external_resources_count" and value > 3:
contribution = 0.12 * min(value / 15, 1.0) # External resources
section = "Suspicious Patterns"
elif name == "content_js_to_html_ratio" and value > 0.3:
contribution = 0.15 * min(value / 0.5, 1.0) # High JS ratio can indicate obfuscation
section = "Suspicious Patterns"
elif name == "content_title_brand_mismatch" and value > 0:
contribution = 0.2 # Title not matching domain is suspicious
section = "Suspicious Patterns"
elif name == "content_similar_domain_redirect" and value > 0:
contribution = 0.35 # Redirects to similar domains are highly suspicious
section = "Suspicious Patterns"
elif name == "content_favicon_exists" and value < 1:
contribution = 0.08 # Missing favicon often indicates phishing
section = "Key Risk Factors"
# ====== HTML Security Metrics (High Impact) ======
elif name == "html_security_score" and value > 0:
contribution = 0.2 * min(value / 50, 1.0) # Overall security score
section = "Suspicious Patterns"
elif name == "html_risk_factor_count" and value > 0:
contribution = 0.15 * min(value / 3, 1.0) # Number of risks found
section = "Suspicious Patterns"
elif name == "html_has_password_field" and value > 0:
contribution = 0.25 # Password fields in HTML are suspicious
section = "Suspicious Patterns"
elif name == "html_has_obfuscated_js" and value > 0:
contribution = 0.3 # Obfuscated JavaScript is highly suspicious
section = "Suspicious Patterns"
# ====== SSL Certificate Features (Medium Impact) ======
elif name == "ct_suspicious_cert_pattern" and value > 0:
contribution = 0.15 # Suspicious certificate patterns
section = "Domain Information"
# ====== Geographic Features (Medium Impact) ======
elif name == "geo_suspicious_country" and value > 0:
contribution = 0.15 # Suspicious country
section = "Domain Information"
# Use friendly name if available
display_name = feature_name_map.get(name, name.replace("_", " ").title())
# Determine color based on contribution
color_class = "success" # Default green
if contribution > 60:
color_class = "danger" # Red for high risk
elif contribution > 20:
color_class = "warning" # Orange for medium risk
result["feature_contributions"].append({
"name": name,
"value": value,
"contribution": contribution,
"direction": "increases" if contribution > 0 else "decreases",
"percentage": contribution, # No need to convert for HTTPS present
"feature_name": display_name,
"color_class": color_class,
"section": section # Add section to each feature
})
# Normalize contributions to match total risk score, but preserve HTTPS percentage
if result["feature_contributions"]:
# Sort by contribution (descending)
result["feature_contributions"].sort(key=lambda x: -x["percentage"])
# Get total of all contributions
total_contribution = sum(item["percentage"] for item in result["feature_contributions"]
if item["name"] != "https_present")
https_contribution = next((item["percentage"] for item in result["feature_contributions"]
if item["name"] == "https_present"), 0)
# Calculate what's left for other features
remaining_score = max(0, score - https_contribution)
# If total is > 0, normalize the remaining features
if total_contribution > 0 and remaining_score > 0:
normalization_factor = remaining_score / total_contribution
for item in result["feature_contributions"]:
if item["name"] != "https_present":
item["percentage"] = round(item["percentage"] * normalization_factor, 1)
# Calculate section totals based on fixed weights
# URL features (40%), Domain info (10%), Suspicious patterns (50%)
section_weights = {
"Key Risk Factors": 40.0, # URL features (40%)
"Domain Information": 10.0, # Domain information (10%)
"Suspicious Patterns": 50.0 # Suspicious patterns (50%)
}
# Use fixed weights but distribute actual feature contributions within them
total_feature_impact = sum(item["percentage"] for item in result["feature_contributions"])
if total_feature_impact > 0:
# Normalize all feature impacts to a 0-100 scale
normalization_factor = score / total_feature_impact
for item in result["feature_contributions"]:
item["percentage"] = round(item["percentage"] * normalization_factor, 1)
# Calculate actual section distribution based on feature categorization
actual_section_totals = {
"Key Risk Factors": 0,
"Domain Information": 0,
"Suspicious Patterns": 0
}
for item in result["feature_contributions"]:
section = item["section"]
if section in actual_section_totals:
actual_section_totals[section] += item["percentage"]
# Ensure the overall risk score is preserved
result["section_totals"] = {
# Use fixed weights but make sure they sum to the overall score
"Key Risk Factors": round((section_weights["Key Risk Factors"] / 100) * score, 1),
"Domain Information": round((section_weights["Domain Information"] / 100) * score, 1),
"Suspicious Patterns": round((section_weights["Suspicious Patterns"] / 100) * score, 1)
}
# Get suspicious patterns
suspicious_patterns = check_suspicious_patterns(url)
result["suspicious_patterns"] = suspicious_patterns
# Get domain information with more detail
domain_info = get_domain_info(url)
# Try to enhance domain info with more details if possible
try:
# Parse URL to get domain
parsed_url = urlparse(url)
domain = parsed_url.netloc
# Try to get more domain info using socket
if not domain_info.get("organization"):
try:
ip = socket.gethostbyname(domain)
domain_info["ip_address"] = ip
# Try to determine organization and location from IP
# Note: In a real implementation, you'd use a GeoIP service here
domain_info["organization"] = "Unknown Organization"
domain_info["country"] = "Unknown Country"
domain_info["city"] = "Unknown City"
except Exception as e:
logger.warning(f"Could not enhance domain info: {e}")
except Exception as e:
logger.warning(f"Error enhancing domain info: {e}")
result["domain_info"] = domain_info
# Add HTML security data if available
html_security = None
try:
html_security = check_html_security(url)
result["html_security"] = html_security
except Exception as e:
logger.error(f"Error checking HTML security: {e}")
# Explicitly add feature_table for UI
result['feature_table'] = []
# Process features and organize by category
for key, value in features.items():
if key != "error":
# Find the corresponding contribution
impact = 0.0
color_class = "success"
for contrib in result["feature_contributions"]:
if contrib["name"] == key:
impact = contrib["percentage"]
color_class = contrib["color_class"]
break
# Use friendly name if available
display_name = feature_name_map.get(key, key.replace("_", " ").title())
# Always include HTTPS with fixed impact
if key == "https_present" and value < 1:
result['feature_table'].append({
'feature': "Security Weights",
'value': "No" if value < 1 else "Yes",
'impact': 24.6, # Fixed percentage
'color_class': "danger"
})
# Only include features with significant impact or specifically important ones
elif impact > 3 or key in ["tld_score", "content_password_field_count",
"content_form_count", "html_security_score",
"domain_entropy", "content_favicon_exists",
"rep_domain_age_category"]:
# Format value based on type
formatted_value = value # Default value
if isinstance(value, bool) or (isinstance(value, (int, float)) and value in [0, 1]):
formatted_value = "No" if value == 0 or value is False else "Yes"
elif isinstance(value, float) and value < 1:
formatted_value = round(value, 2)
# Append to feature table
result['feature_table'].append({
'feature': display_name,
'value': formatted_value,
'impact': impact,
'color_class': color_class
})
# Sort feature_table by impact (descending)
result['feature_table'] = sorted(
result['feature_table'],
key=lambda x: -x['impact']
)
return result
except Exception as e:
logger.error(f"Error making prediction with model: {e}")
logger.error(traceback.format_exc())
# Fall back to rule-based prediction
# Rule-based prediction (fallback)
logger.info("Using rule-based prediction as fallback")
return rule_based_prediction(url, features)
except Exception as e:
logger.error(f"Unexpected error in predict_with_model: {e}")
logger.error(traceback.format_exc())
return {
"status": "error",
"url": url,
"message": f"Error making prediction: {str(e)}",
"using_fallback": True,
"score": 50, # Default moderate risk
"risk_level": "moderate",
"domain_info": get_domain_info(url),
"suspicious_patterns": check_suspicious_patterns(url)
}
def get_risk_level(score):
"""
Convert numerical risk score to categorical risk level
Args:
score: Numerical risk score (0-100)
Returns:
str: Risk level category
"""
if score < 20:
return "low"
elif score < 50:
return "moderate"
elif score < 75:
return "high"
else:
return "critical"
def get_domain_info(url):
"""
Get information about a domain
Args:
url: URL to get domain info for
Returns:
dict: Domain information including IP, organization, location
"""
try:
# Parse the URL to extract domain
parsed_url = urlparse(url)
domain = parsed_url.netloc
# Extract domain without port if present
if ':' in domain:
domain = domain.split(':')[0]
# Initialize domain info
domain_info = {
"domain": domain,
"ip_address": "Unknown",
"organization": "Unknown",
"country": "Unknown",
"city": "Unknown",
"created": "Unknown",
"expires": "Unknown",
"latitude": 0,
"longitude": 0
}
# Try to get IP address
try:
ip_address = socket.gethostbyname(domain)
domain_info["ip_address"] = ip_address
# Use ip-api.com for geolocation data
try:
geo_response = requests.get(f"http://ip-api.com/json/{ip_address}", timeout=5)
if geo_response.status_code == 200:
geo_data = geo_response.json()
if geo_data.get("status") == "success":
domain_info["country"] = geo_data.get("country", "Unknown")
domain_info["city"] = geo_data.get("city", "Unknown")
domain_info["latitude"] = geo_data.get("lat", 0)
domain_info["longitude"] = geo_data.get("lon", 0)
domain_info["organization"] = geo_data.get("org", "Unknown") or geo_data.get("isp", "Unknown")
domain_info["region"] = geo_data.get("regionName", "Unknown")
domain_info["timezone"] = geo_data.get("timezone", "Unknown")
domain_info["as"] = geo_data.get("as", "Unknown")
logger.info(f"Retrieved geolocation data for {ip_address}: {geo_data}")
else:
logger.warning(f"Failed to get geolocation data: {geo_data}")
# Fall back to default coordinates if geolocation fails
domain_info["latitude"] = 40.7128 # Default latitude (New York)
domain_info["longitude"] = -74.0060 # Default longitude (New York)
else:
logger.warning(f"Failed to get geolocation data, status code: {geo_response.status_code}")
# Fall back to default coordinates if geolocation fails
domain_info["latitude"] = 40.7128
domain_info["longitude"] = -74.0060
except Exception as geo_error:
logger.error(f"Error getting geolocation data: {geo_error}")
# Fall back to default coordinates if geolocation fails
domain_info["latitude"] = 40.7128
domain_info["longitude"] = -74.0060
except socket.gaierror:
domain_info["ip_address"] = "Could not resolve"
return domain_info
except Exception as e:
logger.error(f"Error getting domain info: {e}")
return {
"domain": urlparse(url).netloc,
"error": str(e),
"ip_address": "Error",
"organization": "Unknown",
"country": "Unknown",
"latitude": 0,
"longitude": 0
}
def check_ssl_certificate(domain):
"""
Check SSL certificate information for a domain
Args:
domain: Domain to check SSL for
Returns:
dict: SSL certificate information
"""
ssl_info = {
"has_ssl": False,
"issuer": "Unknown",
"valid_from": "Unknown",
"valid_until": "Unknown",
"days_until_expiry": 0
}
try:
# Try to connect with TLS/SSL
context = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
# Get certificate
cert = ssock.getpeercert()
ssl_info["has_ssl"] = True
# Extract certificate details
if cert:
# Get issuer
issuer = dict(x[0] for x in cert['issuer'])
ssl_info["issuer"] = issuer.get('organizationName', 'Unknown')
# Get validity dates
ssl_info["valid_from"] = cert.get('notBefore', 'Unknown')
ssl_info["valid_until"] = cert.get('notAfter', 'Unknown')
# Calculate days until expiry
if ssl_info["valid_until"] != 'Unknown':
expiry_date = datetime.strptime(ssl_info["valid_until"], '%b %d %H:%M:%S %Y %Z')
days_until_expiry = (expiry_date - datetime.now()).days
ssl_info["days_until_expiry"] = max(0, days_until_expiry)
except Exception as e:
ssl_info["error"] = str(e)
return ssl_info
def extract_whois_features(domain):
"""Extract features from WHOIS data for a domain"""
whois_features = {
"domain_age_days": 0,
"expiration_remaining_days": 0,
"recently_registered": 0,
"privacy_protected": 0,
"suspicious_registrar": 0
}
if not whois_available:
return whois_features
try:
w = whois.whois(domain)
# Calculate domain age
if w.creation_date:
creation_date = w.creation_date
if isinstance(creation_date, list):
creation_date = creation_date[0]
domain_age = (datetime.now() - creation_date).days
whois_features["domain_age_days"] = domain_age
whois_features["recently_registered"] = 1 if domain_age < 60 else 0
# Calculate expiration time
if w.expiration_date:
expiry_date = w.expiration_date
if isinstance(expiry_date, list):
expiry_date = expiry_date[0]
days_until_expiry = (expiry_date - datetime.now()).days
whois_features["expiration_remaining_days"] = max(0, days_until_expiry)
# Check for privacy protection
if w.registrar and "privacy" in str(w.registrar).lower():
whois_features["privacy_protected"] = 1
# Check for suspicious registrars
suspicious_registrars = ["namecheap", "namesilo", "porkbun"]
if w.registrar and any(r in str(w.registrar).lower() for r in suspicious_registrars):
whois_features["suspicious_registrar"] = 1
return whois_features
except Exception as e:
logger.error(f"Error getting WHOIS data: {e}")
return whois_features
def extract_ct_log_features(domain):
"""Extract features from Certificate Transparency logs"""
ct_features = {
"cert_count": 0,
"recent_cert_count": 0,
"suspicious_cert_pattern": 0
}
try:
# Use crt.sh API to check certificate history
response = requests.get(f"https://crt.sh/?q={domain}&output=json", timeout=5)
if response.status_code == 200:
try:
certs = response.json()
# Total certificates
ct_features["cert_count"] = len(certs)
# Recent certificates (last 30 days)
thirty_days_ago = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
recent_certs = [c for c in certs if c.get("not_before", "") > thirty_days_ago]
ct_features["recent_cert_count"] = len(recent_certs)
# Check for suspicious patterns in certificate names
for cert in certs:
common_name = cert.get("common_name", "").lower()
if any(p in common_name for p in ["secure", "login", "banking", "verify"]):
ct_features["suspicious_cert_pattern"] = 1
break
except json.JSONDecodeError:
logger.warning("Failed to parse certificate data as JSON")
return ct_features
except Exception as e:
logger.error(f"Error getting certificate data: {e}")
return ct_features
def extract_content_features(url, html_content=None):
"""Extract features from webpage content"""
content_features = {
"page_size_bytes": 0,
"external_resources_count": 0,
"form_count": 0,
"password_field_count": 0,
"js_to_html_ratio": 0,
"title_brand_mismatch": 0,
"favicon_exists": 0,
"similar_domain_redirect": 0
}
if not BeautifulSoup_available:
return content_features
try:
# Get the HTML content if not provided
if html_content is None:
try:
response = requests.get(url, timeout=10,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
html_content = response.text
content_features["page_size_bytes"] = len(html_content)
except Exception as req_error:
logger.error(f"Error fetching HTML content: {req_error}")
return content_features
# Parse HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Count forms and password fields
content_features["form_count"] = len(soup.find_all("form"))
content_features["password_field_count"] = len(soup.find_all("input", {"type": "password"}))
# External resources
external_resources = 0
parsed_url = urlparse(url)
base_domain = parsed_url.netloc
for tag in soup.find_all(["script", "img", "iframe", "link"], src=True):
src = tag.get("src", "")
if src and not src.startswith(('/', '#', 'data:')):
if base_domain not in src:
external_resources += 1
for tag in soup.find_all("link", href=True):
href = tag.get("href", "")
if href and not href.startswith(('/', '#', 'data:')):
if base_domain not in href:
external_resources += 1
content_features["external_resources_count"] = external_resources
# JS to HTML ratio
js_content = 0
for script in soup.find_all("script"):
if script.string:
js_content += len(script.string)
if len(html_content) > 0:
content_features["js_to_html_ratio"] = js_content / len(html_content)
# Title brand mismatch
if soup.title and soup.title.string:
title = soup.title.string.lower()
domain_parts = base_domain.lower().split(".")
brand_name = domain_parts[0] if domain_parts[0] != "www" else domain_parts[1]
if title and brand_name not in title:
content_features["title_brand_mismatch"] = 1
# Check for favicon
if soup.find("link", rel="icon") or soup.find("link", rel="shortcut icon"):
content_features["favicon_exists"] = 1
# Check for redirects to similar domains
meta_refresh = soup.find("meta", {"http-equiv": "refresh"})
if meta_refresh and "content" in meta_refresh.attrs:
content = meta_refresh["content"]
if "url=" in content.lower():
redirect_url = content.split("url=")[1].strip()
redirect_domain = urlparse(redirect_url).netloc
# Check if redirect domain is similar but different
similarity = SequenceMatcher(None, base_domain, redirect_domain).ratio()
if 0.6 < similarity < 0.9: # Similar but not identical
content_features["similar_domain_redirect"] = 1
return content_features
except Exception as e:
logger.error(f"Error extracting content features: {e}")
return content_features
def extract_nlp_features(domain):
"""Extract NLP-based features from the domain name"""
nlp_features = {
"character_distribution": 0,
"vowel_consonant_ratio": 0,
"contains_digits": 0,
"contains_repeated_chars": 0,
"ngram_score": 0,
"word_length_avg": 0
}
try:
# Remove TLD for analysis
domain_parts = domain.split('.')
domain_without_tld = '.'.join(domain_parts[:-1]) if len(domain_parts) > 1 else domain_parts[0]
# Character distribution (normalized entropy)
entropy = calculate_entropy(domain_without_tld)
nlp_features["character_distribution"] = entropy / 4.7 # Normalize, 4.7 is max entropy for English text
# Vowel to consonant ratio
vowels = sum(c.lower() in 'aeiou' for c in domain_without_tld)
consonants = sum(c.lower() in 'bcdfghjklmnpqrstvwxyz' for c in domain_without_tld)
nlp_features["vowel_consonant_ratio"] = vowels / consonants if consonants > 0 else 0
# Contains digits
nlp_features["contains_digits"] = 1 if any(c.isdigit() for c in domain_without_tld) else 0
# Contains repeated characters (3 or more)
if re.search(r'(.)\1{2,}', domain_without_tld):
nlp_features["contains_repeated_chars"] = 1
# N-gram probability score (approximated)
common_english_bigrams = ["th", "he", "in", "er", "an", "re", "on", "at", "en", "nd", "ti", "es", "or"]
bigram_count = sum(domain_without_tld.lower().count(bigram) for bigram in common_english_bigrams)
domain_length = len(domain_without_tld)
nlp_features["ngram_score"] = bigram_count / (domain_length - 1) if domain_length > 1 else 0
# Average word length if domain has words
words = re.findall(r'[a-zA-Z]+', domain_without_tld)
if words:
avg_word_length = sum(len(word) for word in words) / len(words)
nlp_features["word_length_avg"] = avg_word_length
return nlp_features
except Exception as e:
logger.error(f"Error extracting NLP features: {e}")
return nlp_features
def extract_reputation_features(domain, ip_address):
"""Extract reputation-based features from various sources"""
reputation_features = {
"domain_age_category": 0, # 0: unknown, 1: new, 2: medium, 3: established
"ip_blacklisted": 0,
"domain_blacklisted": 0,
"suspicious_tld_category": 0,
"suspicious_country": 0
}
try:
# Domain age categorization (if whois is available)
if whois_available:
try:
w = whois.whois(domain)
if w.creation_date:
creation_date = w.creation_date
if isinstance(creation_date, list):
creation_date = creation_date[0]
domain_age_days = (datetime.now() - creation_date).days
if domain_age_days < 30:
reputation_features["domain_age_category"] = 1 # New
elif domain_age_days < 180:
reputation_features["domain_age_category"] = 2 # Medium
else:
reputation_features["domain_age_category"] = 3 # Established
except Exception as whois_error:
logger.warning(f"Whois error for reputation features: {whois_error}")
# Check for blacklisted IP (simplified - would use an actual API)
high_risk_countries = ["RU", "CN", "IR", "KP", "NG"]
suspicious_asn_orgs = ["Cloudflare", "OVH", "DigitalOcean", "Amazon"]
# Get IP geolocation
if ip_address and ip_address != "Unknown" and ip_address != "Could not resolve":
try:
geo_response = requests.get(f"http://ip-api.com/json/{ip_address}", timeout=5)
if geo_response.status_code == 200:
geo_data = geo_response.json()
if geo_data.get("status") == "success":
# Check country risk
if geo_data.get("countryCode") in high_risk_countries:
reputation_features["suspicious_country"] = 1
# Check ASN risk
asn_org = geo_data.get("org", "").lower()
if any(org.lower() in asn_org for org in suspicious_asn_orgs):
reputation_features["ip_blacklisted"] = 0.5 # Partial flag
except Exception as geo_error:
logger.warning(f"Error getting geolocation for reputation: {geo_error}")
# Check TLD risk category
tld = domain.split('.')[-1] if '.' in domain else ''
high_risk_tlds = ['tk', 'ml', 'ga', 'cf', 'gq', 'xyz', 'top', 'icu', 'rest', 'zip']
medium_risk_tlds = ['online', 'site', 'club', 'live', 'vip', 'fit', 'pw']
if tld in high_risk_tlds:
reputation_features["suspicious_tld_category"] = 2
elif tld in medium_risk_tlds:
reputation_features["suspicious_tld_category"] = 1
return reputation_features
except Exception as e:
logger.error(f"Error extracting reputation features: {e}")
return reputation_features
def analyze_url(url):
"""
Comprehensive URL analysis function that combines multiple checks
Args:
url: URL to analyze
Returns:
dict: Comprehensive analysis result
"""
logger.info(f"Analyzing URL: {url}")
# Ensure URL has a scheme
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
logger.info(f"Added scheme to URL: {url}")
try:
# Extract features and make prediction
features, feature_vector = extract_features(url)
prediction_result = predict_with_model(url)
# Get suspicious patterns
suspicious_patterns = check_suspicious_patterns(url)
# Check HTML security
html_security = check_html_security(url)
# Parse URL components for display
parsed_url = urlparse(url)
domain = parsed_url.netloc
scheme = parsed_url.scheme
# Get domain information if available
domain_info = get_domain_info(url)
# Create comprehensive analysis result
result = {
"status": "success",
"url": url,
"domain": domain,
"protocol": scheme,
"analysis_date": datetime.now().isoformat(),
"score": prediction_result.get("score", 0),
"fraud_score": prediction_result.get("score", 0), # Duplicate for UI compatibility
"risk_level": prediction_result.get("risk_level", "unknown"),
"is_suspicious": prediction_result.get("score", 0) > 50,
"suspicious_patterns": suspicious_patterns,
"html_security": html_security,
"risk_factors": prediction_result.get("risk_factors", {}),
"feature_values": features,
"domain_info": domain_info,
"feature_contributions": prediction_result.get("feature_contributions", []),
"feature_table": prediction_result.get("feature_table", []),
"section_totals": prediction_result.get("section_totals", {})
}
# Ensure section totals are set using fixed weights if missing
if not result["section_totals"]:
score = result["score"]
result["section_totals"] = {
"Key Risk Factors": round(0.4 * score, 1), # URL features (40%)
"Domain Information": round(0.1 * score, 1), # Domain information (10%)
"Suspicious Patterns": round(0.5 * score, 1) # Suspicious patterns + HTML content (50%)
}
# Special handling for trusted domains - reduce Suspicious Patterns section score
# when no actual suspicious patterns were found
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
# If no suspicious patterns were found, set that section to 0%
# regardless of whether it's a trusted domain or not
if not suspicious_patterns:
# Set Suspicious Patterns to 0% since none were found
original_suspicious_patterns_score = result["section_totals"]["Suspicious Patterns"]
result["section_totals"]["Suspicious Patterns"] = 0.0
# Recalculate overall score by removing the suspicious patterns contribution
original_score = result["score"]
# When Suspicious Patterns is set to 0, recalculate the total score
# by considering only the remaining sections (Key Risk Factors + Domain Information)
key_risk_score = result["section_totals"]["Key Risk Factors"]
domain_info_score = result["section_totals"]["Domain Information"]
# Set the adjusted score to be just the sum of the remaining sections
adjusted_score = key_risk_score + domain_info_score
# Update the overall score
result["score"] = adjusted_score
result["fraud_score"] = adjusted_score
result["risk_level"] = get_risk_level(adjusted_score)
logger.info(f"Adjusted score due to no suspicious patterns: {original_score} -> {adjusted_score}")
# Add SSL info if available
try:
ssl_info = check_ssl_certificate(domain)
result["ssl_info"] = ssl_info
except Exception as e:
logger.warning(f"Unable to check SSL certificate: {str(e)}")
result["ssl_info"] = {"error": str(e)}
logger.info(f"Analysis complete for {url} - Risk score: {result['score']}")
return result
except Exception as e:
logger.error(f"Error analyzing URL: {str(e)}")
logger.error(traceback.format_exc())
return {
"status": "error",
"url": url,
"message": f"Error analyzing URL: {str(e)}",
"error": str(e),
"traceback": traceback.format_exc(),
"domain_info": get_domain_info(url),
"suspicious_patterns": check_suspicious_patterns(url)
}
@app.route("/")
def home():
logger.info("Home route accessed")
try:
return render_template("index.html")
except Exception as e:
logger.error(f"Error rendering index.html: {e}")
return f"Error: {str(e)}", 500
@app.route("/about")
def about():
return render_template("about.html")
@app.route("/features")
def features():
return render_template("features.html")
@app.route("/health-check")
def health_check():
"""Health check endpoint for the integrated application"""
return jsonify({
"status": "healthy",
"message": "Integrated Flask app is running",
"model_loaded": get_model_instance() is not None,
"scaler_loaded": get_scaler_instance() is not None
})
@app.route("/predict", methods=["POST", "OPTIONS"])
def predict():
# Handle CORS preflight requests
if request.method == 'OPTIONS':
response = jsonify({'status': 'success'})
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'POST,OPTIONS')
return response
if request.method == 'POST':
try:
# Log request headers for debugging
logger.info(f"Request headers: {dict(request.headers)}")
logger.info(f"Request content type: {request.content_type}")
logger.info(f"Raw request data: {request.data.decode('utf-8', errors='replace') if request.data else 'None'}")
# Extract URL from request
url = None
# Try different methods to extract the URL
if request.is_json:
data = request.get_json(force=True)
logger.info(f"JSON data: {data}")
url = data.get('url', '')
elif request.form:
logger.info(f"Form data: {dict(request.form)}")
url = request.form.get('url', '')
elif request.data:
try:
data = json.loads(request.data.decode('utf-8'))
logger.info(f"Parsed JSON from raw data: {data}")
url = data.get('url', '')
except json.JSONDecodeError as e:
logger.error(f"Failed to parse raw data as JSON: {e}")
logger.info(f"Extracted URL: {url}")
if not url or len(url.strip()) == 0:
logger.error("No URL provided in request")
return jsonify({
"status": "error",
"message": "No URL provided",
"details": "Please enter a valid URL to analyze"
}), 400
# Ensure URL has a scheme
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
logger.info(f"Added http:// prefix to URL: {url}")
# Process the URL directly without backend API call
logger.info("Processing prediction request directly")
# Extract features
features, feature_vector = extract_features(url)
# Get prediction
result = predict_with_model(url, features)
# For debugging the feature display issue
logger.info(f"Feature contributions: {result.get('feature_contributions', [])}")
# Explicitly add this field for the UI
if 'feature_table' not in result:
result['feature_table'] = []
# Add entries to feature_table if feature_contributions exists
if 'feature_contributions' in result and result['feature_contributions']:
for contrib in result['feature_contributions']:
result['feature_table'].append({
'feature': contrib['name'],
'value': contrib['value'],
'impact': contrib['contribution'] * 100 # Convert to percentage
})
# Sort feature_table: non-zero values in ascending order, zero values at the bottom
result['feature_table'] = sorted(
result['feature_table'],
key=lambda x: (x['value'] == 0, x['value'])
)
logger.info(f"Feature table: {result.get('feature_table', [])}")
logger.info(f"Prediction result: {result}")
return jsonify(result)
except Exception as e:
logger.error(f"Unexpected error in predict route: {e}")
logger.error(traceback.format_exc())
return jsonify({
"status": "error",
"message": "An unexpected error occurred",
"details": str(e)
}), 500
@app.route("/login", methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
# Just simulate successful login since we're not connecting to a real DB
session['user_id'] = 1
session['username'] = username
flash('Login successful', 'success')
return redirect(url_for('home'))
return render_template('weblogin.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
# Just simulate successful registration
flash('Registration successful! Please log in.', 'success')
return redirect(url_for('login'))
return render_template('weblogin.html', register=True)
@app.route('/logout')
def logout():
session.clear()
flash('You have been logged out successfully!', 'success')
return redirect(url_for('home'))
@app.route('/dashboard')
def dashboard():
"""User dashboard page"""
return render_template('dashboard.html')
@app.route("/analyze", methods=['GET', 'POST', 'OPTIONS'])
def analyze():
"""
Generate analysis report for a URL.
Forward the request to the backend instead of handling it directly.
"""
# Handle CORS preflight requests
if request.method == 'OPTIONS':
response = jsonify({'status': 'success'})
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,POST,OPTIONS')
return response
# Get the requested format (pdf or json)
report_format = request.args.get('format', '').lower()
# Extract URL from request
url = None
if request.method == 'POST':
if request.is_json:
data = request.get_json(force=True)
url = data.get('url', '')
elif request.form:
url = request.form.get('url', '')
elif request.data:
try:
data = json.loads(request.data.decode('utf-8'))
url = data.get('url', '')
except json.JSONDecodeError:
pass
else: # GET request
url = request.args.get('url', '')
if not url or len(url.strip()) == 0:
return jsonify({
"status": "error",
"message": "No URL provided",
"details": "Please enter a valid URL to analyze"
}), 400
# Ensure URL has a scheme
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
try:
# Forward the request to the backend API
backend_url = os.environ.get('BACKEND_URL', 'http://localhost:5000').rstrip('/') + '/analyze'
# Prepare the request parameters
params = {}
if report_format:
params['format'] = report_format
# Send the request to the backend
logger.info(f"Forwarding analyze request to backend: {backend_url}")
if request.method == 'POST':
response = requests.post(
backend_url,
json={"url": url},
params=params,
headers={"Content-Type": "application/json"}
)
else: # GET request
response = requests.get(
backend_url,
params={"url": url, **params}
)
# Check if the response was successful
if response.status_code == 200:
# Try to parse the response as JSON
try:
result = response.json()
return jsonify(result)
except:
# If we couldn't parse as JSON, return the raw response
return response.text, 200, {'Content-Type': 'text/html'}
else:
# If the backend returned an error, log it and fall back to local analysis
logger.warning(f"Backend returned error {response.status_code}: {response.text}")
logger.info("Using local analysis as fallback")
# Fall back to local implementation using analyze_url
analysis_result = analyze_url(url)
return jsonify(analysis_result)
except requests.RequestException as e:
logger.error(f"Error connecting to backend API: {e}")
logger.info("Using local analysis as fallback")
# Fall back to local implementation using analyze_url
analysis_result = analyze_url(url)
return jsonify(analysis_result)
except Exception as e:
logger.error(f"Error generating analysis: {e}")
logger.error(traceback.format_exc())
return jsonify({
"status": "error",
"message": "Failed to generate analysis",
"details": str(e)
}), 500
@app.route("/test")
def test():
"""Test route to verify the Flask app is running properly"""
return jsonify({
"status": "success",
"message": "Integrated Flask app is running successfully!",
"model_loaded": get_model_instance() is not None,
"scaler_loaded": get_scaler_instance() is not None
})
@app.route('/diagnostic')
def diagnostic_page():
"""Serve the diagnostic page to test functionality"""
return render_template('diagnostic.html')
@app.route('/model-status', methods=['GET'])
def model_status():
"""Check the status of the model"""
status = {
"model_loaded": get_model_instance() is not None,
"scaler_loaded": get_scaler_instance() is not None,
"status": "operational" if get_model_instance() is not None and get_scaler_instance() is not None else "error",
"model_type": str(type(get_model_instance())) if get_model_instance() else "None",
"using_fallback": hasattr(get_model_instance(), 'summary') and get_model_instance().summary() == "Fallback model (SimpleModel)"
}
return jsonify(status)
@app.route('/debug', methods=['GET'])
def debug():
"""Debug endpoint showing environment and configuration"""
debug_info = {
"environment": {k: v for k, v in os.environ.items() if not k.startswith("_") and not "TOKEN" in k and not "SECRET" in k},
"model_path": os.environ.get('MODEL_FILE', 'models/fraud_detection_model.h5'),
"model_loaded": get_model_instance() is not None,
"scaler_loaded": get_scaler_instance() is not None,
"model_type": str(type(get_model_instance())) if get_model_instance() else "None"
}
return jsonify(debug_info)
# Function to fix dtype policy in model config
def fix_dtype_policy(config):
"""Fix issues with DTypePolicy deserialization"""
if isinstance(config, dict):
# Replace dtype objects with string representation
if 'dtype' in config and isinstance(config['dtype'], dict) and config['dtype'].get('class_name') == 'DTypePolicy':
config['dtype'] = 'float32'
# Recursively process nested configs
for key, value in config.items():
if isinstance(value, dict):
config[key] = fix_dtype_policy(value)
elif isinstance(value, list):
config[key] = [fix_dtype_policy(item) if isinstance(item, (dict, list)) else item for item in value]
elif isinstance(config, list):
config = [fix_dtype_policy(item) if isinstance(item, (dict, list)) else item for item in config]
return config
def safe_decode_model_config(raw_config):
"""Safely decode model configuration to handle any version compatibility issues."""
try:
# Parse the raw model config
config = json.loads(raw_config)
# Apply fixes to the config
config = fix_dtype_policy(config)
# Re-encode as JSON string
return json.dumps(config)
except Exception as e:
logger.error(f"Error processing model config: {e}")
# Return original if processing failed
return raw_config
def build_compatible_model(model_path):
"""Build a compatible model manually from the H5 file."""
try:
# Open the H5 file
with h5py.File(model_path, 'r') as h5file:
# Check if the model config exists
if 'model_config' in h5file.attrs:
# Get the model config as a JSON string
model_config = h5file.attrs['model_config']
# Fix compatibility issues in the config
fixed_config = safe_decode_model_config(model_config)
# Create a model from the fixed config
model = tf.keras.models.model_from_json(
fixed_config,
custom_objects={
'InputLayer': CompatibleInputLayer,
'FairnessConstraint': tf.keras.constraints.UnitNorm,
'FairnessPenalty': tf.keras.layers.Layer
}
)
# Load weights
model.load_weights(model_path)
logger.info("Built compatible model manually from H5 file")
return model
else:
logger.error("No model config found in H5 file")
return None
except Exception as e:
logger.error(f"Error building compatible model: {e}")
return None
@app.route('/debug-connection', methods=['GET', 'POST'])
def debug_connection():
"""Debugging endpoint for connection issues"""
try:
if request.method == 'POST':
# Echo back the request data
data = request.get_json() if request.is_json else {}
# Add additional debugging info
response_data = {
"status": "success",
"message": "Connection is working",
"timestamp": datetime.now().isoformat(),
"request_data": data,
"request_headers": dict(request.headers),
"content_type": request.content_type,
"method": request.method,
"environment": {
"python_version": sys.version,
"flask_version": flask.__version__,
"tensorflow_version": tf.__version__
}
}
return jsonify(response_data)
else:
# Simple GET response for connection testing
return jsonify({
"status": "success",
"message": "Connection is working",
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Error in debug-connection endpoint: {e}")
return jsonify({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
}), 500
# Run the app - modified for HuggingFace Spaces compatibility
if __name__ == "__main__":
# For HuggingFace Spaces, we need to listen on 0.0.0.0:7860
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, debug=False)
# Preload model to avoid cold start issues
try:
model = get_model_instance()
scaler = get_scaler_instance()
logger.info("Model and scaler preloaded successfully")
except Exception as e:
logger.error(f"Error preloading model: {str(e)}")