Spaces:
Running on T4
Running on T4
| """Bearer token authentication and URL validation (SSRF protection).""" | |
| import ipaddress | |
| import secrets | |
| import socket | |
| from urllib.parse import urlparse | |
| from fastapi import Depends, HTTPException | |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | |
| from config import API_TOKEN, BLOCKED_HOSTNAMES | |
| security = HTTPBearer() | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: | |
| """Verify the API token from Authorization header.""" | |
| if not API_TOKEN: | |
| raise HTTPException(status_code=500, detail="No API token configured on server") | |
| token = credentials.credentials | |
| if not secrets.compare_digest(token, API_TOKEN): | |
| raise HTTPException(status_code=401, detail="Invalid API token") | |
| return token | |
| def _validate_url(url: str) -> None: | |
| """Validate URL to prevent SSRF attacks.""" | |
| try: | |
| parsed = urlparse(url) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid URL format: {str(e)}") | |
| if parsed.scheme not in ("http", "https"): | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid URL scheme '{parsed.scheme}'. Only http and https are allowed.", | |
| ) | |
| hostname = parsed.hostname | |
| if not hostname: | |
| raise HTTPException(status_code=400, detail="Invalid URL: missing hostname.") | |
| hostname_lower = hostname.lower() | |
| if hostname_lower in BLOCKED_HOSTNAMES: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to internal/metadata services is not allowed.", | |
| ) | |
| for pattern in ("metadata", "internal", "localhost", "127.0.0.1", "::1"): | |
| if pattern in hostname_lower: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to internal/metadata services is not allowed.", | |
| ) | |
| try: | |
| ip_str = socket.gethostbyname(hostname) | |
| ip = ipaddress.ip_address(ip_str) | |
| except socket.gaierror: | |
| raise HTTPException(status_code=400, detail=f"Could not resolve hostname: {hostname}") | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid IP address resolved: {str(e)}") | |
| if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved or ip.is_multicast: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to private/internal IP addresses is not allowed.", | |
| ) | |