| """
|
| API Key authentication middleware.
|
|
|
| Provides middleware for API key validation.
|
| """
|
|
|
| from starlette.middleware.base import BaseHTTPMiddleware
|
| from starlette.requests import Request
|
| from starlette.responses import JSONResponse
|
|
|
| from app.config import get_settings
|
| from app.utils.logger import get_logger
|
|
|
| logger = get_logger(__name__)
|
|
|
|
|
| class APIKeyMiddleware(BaseHTTPMiddleware):
|
| """
|
| Middleware for API key authentication.
|
|
|
| This middleware checks for valid API keys on protected endpoints.
|
| Public endpoints (health, docs) are excluded from authentication.
|
| """
|
|
|
|
|
| PUBLIC_PATHS: set[str] = {
|
| "/api/health",
|
| "/api/ready",
|
| "/api/languages",
|
| "/api/",
|
| "/docs",
|
| "/redoc",
|
| "/openapi.json",
|
| "/",
|
| }
|
|
|
| async def dispatch(self, request: Request, call_next):
|
| """
|
| Process request and validate API key for protected endpoints.
|
|
|
| Args:
|
| request: Incoming request
|
| call_next: Next middleware/handler
|
|
|
| Returns:
|
| Response from next handler or 401 error
|
| """
|
|
|
| path = request.url.path.rstrip("/") or "/"
|
|
|
|
|
| is_public = path in self.PUBLIC_PATHS or any(
|
| path.startswith(public.rstrip("/")) for public in self.PUBLIC_PATHS if public != "/"
|
| )
|
|
|
| if is_public:
|
| return await call_next(request)
|
|
|
|
|
| api_key = request.headers.get("x-api-key")
|
|
|
| if not api_key:
|
| logger.warning(
|
| "Request without API key",
|
| path=path,
|
| method=request.method,
|
| )
|
| return JSONResponse(
|
| status_code=401,
|
| content={
|
| "status": "error",
|
| "message": "API key is required",
|
| },
|
| headers={"WWW-Authenticate": "ApiKey"},
|
| )
|
|
|
|
|
| settings = get_settings()
|
| valid_keys = settings.api_keys_list
|
|
|
| if not valid_keys:
|
| logger.error("No API keys configured")
|
| return JSONResponse(
|
| status_code=500,
|
| content={
|
| "status": "error",
|
| "message": "Server configuration error",
|
| },
|
| )
|
|
|
| if api_key not in valid_keys:
|
| logger.warning(
|
| "Invalid API key",
|
| path=path,
|
| key_prefix=api_key[:8] + "..." if len(api_key) > 8 else "***",
|
| )
|
| return JSONResponse(
|
| status_code=401,
|
| content={
|
| "status": "error",
|
| "message": "Invalid API key",
|
| },
|
| headers={"WWW-Authenticate": "ApiKey"},
|
| )
|
|
|
|
|
| return await call_next(request)
|
|
|