docling-parser / auth.py
Ibad ur Rehman
feat: deploy docling first parser
74cacc0
"""Bearer token authentication and URL validation (SSRF protection)."""
import ipaddress
import secrets
import socket
from urllib.parse import urlparse
from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from config import API_TOKEN, BLOCKED_HOSTNAMES
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""Verify the API token from Authorization header."""
if not API_TOKEN:
raise HTTPException(status_code=500, detail="No API token configured on server")
token = credentials.credentials
if not secrets.compare_digest(token, API_TOKEN):
raise HTTPException(status_code=401, detail="Invalid API token")
return token
def _validate_url(url: str) -> None:
"""Validate URL to prevent SSRF attacks."""
try:
parsed = urlparse(url)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid URL format: {str(e)}")
if parsed.scheme not in ("http", "https"):
raise HTTPException(
status_code=400,
detail=f"Invalid URL scheme '{parsed.scheme}'. Only http and https are allowed.",
)
hostname = parsed.hostname
if not hostname:
raise HTTPException(status_code=400, detail="Invalid URL: missing hostname.")
hostname_lower = hostname.lower()
if hostname_lower in BLOCKED_HOSTNAMES:
raise HTTPException(
status_code=400,
detail="Access to internal/metadata services is not allowed.",
)
for pattern in ("metadata", "internal", "localhost", "127.0.0.1", "::1"):
if pattern in hostname_lower:
raise HTTPException(
status_code=400,
detail="Access to internal/metadata services is not allowed.",
)
try:
ip_str = socket.gethostbyname(hostname)
ip = ipaddress.ip_address(ip_str)
except socket.gaierror:
raise HTTPException(status_code=400, detail=f"Could not resolve hostname: {hostname}")
except ValueError as e:
raise HTTPException(status_code=400, detail=f"Invalid IP address resolved: {str(e)}")
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved or ip.is_multicast:
raise HTTPException(
status_code=400,
detail="Access to private/internal IP addresses is not allowed.",
)