Thumbapi / app /utils /validators.py
earncoding's picture
Upload 6 files
ca31b67 verified
Raw
History Blame Contribute Delete
3.56 kB
"""URL validation and SSRF protection."""
import re
import ipaddress
from urllib.parse import urlparse
from typing import Optional
# Allowed URL schemes
ALLOWED_SCHEMES = {"http", "https"}
# Private/reserved IP ranges to block (SSRF protection)
BLOCKED_IP_RANGES = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"), # link-local
ipaddress.ip_network("::1/128"), # IPv6 loopback
ipaddress.ip_network("fc00::/7"), # IPv6 unique local
ipaddress.ip_network("fe80::/10"), # IPv6 link-local
ipaddress.ip_network("0.0.0.0/8"),
ipaddress.ip_network("100.64.0.0/10"), # CGNAT
]
BLOCKED_HOSTNAMES = {
"localhost", "metadata.google.internal",
"169.254.169.254", # AWS/GCP metadata
"100.100.100.200", # Alibaba Cloud metadata
}
PLATFORM_PATTERNS = {
"youtube": re.compile(
r"(?:https?://)?(?:www\.)?(?:youtube\.com/(?:watch|shorts|embed|v|live)|youtu\.be/)",
re.IGNORECASE,
),
"tiktok": re.compile(
r"(?:https?://)?(?:www\.|vm\.)?tiktok\.com",
re.IGNORECASE,
),
"instagram": re.compile(
r"(?:https?://)?(?:www\.)?instagram\.com",
re.IGNORECASE,
),
"facebook": re.compile(
r"(?:https?://)?(?:www\.|m\.)?(?:facebook\.com|fb\.watch|fb\.com)",
re.IGNORECASE,
),
"twitter": re.compile(
r"(?:https?://)?(?:www\.)?(?:twitter\.com|x\.com)",
re.IGNORECASE,
),
"reddit": re.compile(
r"(?:https?://)?(?:www\.|old\.|new\.)?reddit\.com",
re.IGNORECASE,
),
"pinterest": re.compile(
r"(?:https?://)?(?:www\.|[a-z]{2}\.)?pinterest\.(?:com|co\.[a-z]{2})",
re.IGNORECASE,
),
}
def detect_platform(url: str) -> Optional[str]:
for platform, pattern in PLATFORM_PATTERNS.items():
if pattern.search(url):
return platform
return None
def validate_url(url: str) -> tuple[bool, str]:
"""
Returns (is_valid: bool, error_message: str).
error_message is empty string when valid.
"""
if not url or not isinstance(url, str):
return False, "URL is required"
url = url.strip()
if len(url) > 2048:
return False, "URL is too long (max 2048 characters)"
try:
parsed = urlparse(url)
except Exception:
return False, "Malformed URL"
if parsed.scheme.lower() not in ALLOWED_SCHEMES:
return False, f"URL scheme '{parsed.scheme}' is not allowed. Use http or https."
hostname = parsed.hostname
if not hostname:
return False, "URL has no hostname"
hostname_lower = hostname.lower()
if hostname_lower in BLOCKED_HOSTNAMES:
return False, "Access to this host is not allowed"
# Check if it's a raw IP address (SSRF protection)
try:
ip = ipaddress.ip_address(hostname)
for blocked in BLOCKED_IP_RANGES:
if ip in blocked:
return False, "Access to private/reserved IP addresses is not allowed"
except ValueError:
pass # not an IP, fine
# Require a supported platform
if not detect_platform(url):
return False, (
"Unsupported platform. Supported: YouTube, TikTok, Instagram, "
"Facebook, Twitter/X, Reddit, Pinterest"
)
return True, ""
def sanitize_url(url: str) -> str:
"""Strip whitespace and trailing slashes."""
return url.strip().rstrip("/")