"""URL validation and SSRF protection.""" import re import ipaddress from urllib.parse import urlparse from typing import Optional # Allowed URL schemes ALLOWED_SCHEMES = {"http", "https"} # Private/reserved IP ranges to block (SSRF protection) BLOCKED_IP_RANGES = [ ipaddress.ip_network("10.0.0.0/8"), ipaddress.ip_network("172.16.0.0/12"), ipaddress.ip_network("192.168.0.0/16"), ipaddress.ip_network("127.0.0.0/8"), ipaddress.ip_network("169.254.0.0/16"), # link-local ipaddress.ip_network("::1/128"), # IPv6 loopback ipaddress.ip_network("fc00::/7"), # IPv6 unique local ipaddress.ip_network("fe80::/10"), # IPv6 link-local ipaddress.ip_network("0.0.0.0/8"), ipaddress.ip_network("100.64.0.0/10"), # CGNAT ] BLOCKED_HOSTNAMES = { "localhost", "metadata.google.internal", "169.254.169.254", # AWS/GCP metadata "100.100.100.200", # Alibaba Cloud metadata } PLATFORM_PATTERNS = { "youtube": re.compile( r"(?:https?://)?(?:www\.)?(?:youtube\.com/(?:watch|shorts|embed|v|live)|youtu\.be/)", re.IGNORECASE, ), "tiktok": re.compile( r"(?:https?://)?(?:www\.|vm\.)?tiktok\.com", re.IGNORECASE, ), "instagram": re.compile( r"(?:https?://)?(?:www\.)?instagram\.com", re.IGNORECASE, ), "facebook": re.compile( r"(?:https?://)?(?:www\.|m\.)?(?:facebook\.com|fb\.watch|fb\.com)", re.IGNORECASE, ), "twitter": re.compile( r"(?:https?://)?(?:www\.)?(?:twitter\.com|x\.com)", re.IGNORECASE, ), "reddit": re.compile( r"(?:https?://)?(?:www\.|old\.|new\.)?reddit\.com", re.IGNORECASE, ), "pinterest": re.compile( r"(?:https?://)?(?:www\.|[a-z]{2}\.)?pinterest\.(?:com|co\.[a-z]{2})", re.IGNORECASE, ), } def detect_platform(url: str) -> Optional[str]: for platform, pattern in PLATFORM_PATTERNS.items(): if pattern.search(url): return platform return None def validate_url(url: str) -> tuple[bool, str]: """ Returns (is_valid: bool, error_message: str). error_message is empty string when valid. """ if not url or not isinstance(url, str): return False, "URL is required" url = url.strip() if len(url) > 2048: return False, "URL is too long (max 2048 characters)" try: parsed = urlparse(url) except Exception: return False, "Malformed URL" if parsed.scheme.lower() not in ALLOWED_SCHEMES: return False, f"URL scheme '{parsed.scheme}' is not allowed. Use http or https." hostname = parsed.hostname if not hostname: return False, "URL has no hostname" hostname_lower = hostname.lower() if hostname_lower in BLOCKED_HOSTNAMES: return False, "Access to this host is not allowed" # Check if it's a raw IP address (SSRF protection) try: ip = ipaddress.ip_address(hostname) for blocked in BLOCKED_IP_RANGES: if ip in blocked: return False, "Access to private/reserved IP addresses is not allowed" except ValueError: pass # not an IP, fine # Require a supported platform if not detect_platform(url): return False, ( "Unsupported platform. Supported: YouTube, TikTok, Instagram, " "Facebook, Twitter/X, Reddit, Pinterest" ) return True, "" def sanitize_url(url: str) -> str: """Strip whitespace and trailing slashes.""" return url.strip().rstrip("/")