| """
|
| FastAPI dependency injection.
|
|
|
| Provides dependencies for route handlers.
|
| """
|
|
|
| from typing import Annotated
|
|
|
| from fastapi import Depends
|
| from fastapi import Header
|
| from fastapi import HTTPException
|
| from fastapi import Request
|
| from fastapi import status
|
|
|
| from app.config import Settings
|
| from app.config import get_settings
|
| from app.ml.model_loader import ModelLoader
|
| from app.services.voice_detector import VoiceDetector
|
| from app.utils.logger import get_logger
|
|
|
| logger = get_logger(__name__)
|
|
|
|
|
| def get_model_loader(request: Request) -> ModelLoader:
|
| """
|
| Get ModelLoader from app state.
|
|
|
| Args:
|
| request: FastAPI request object
|
|
|
| Returns:
|
| ModelLoader instance from app state
|
| """
|
| if hasattr(request.app.state, "model_loader"):
|
| return request.app.state.model_loader
|
| return ModelLoader()
|
|
|
|
|
| def get_voice_detector(
|
| model_loader: Annotated[ModelLoader, Depends(get_model_loader)],
|
| ) -> VoiceDetector:
|
| """
|
| Get VoiceDetector instance.
|
|
|
| Args:
|
| model_loader: ModelLoader from dependency
|
|
|
| Returns:
|
| Configured VoiceDetector instance
|
| """
|
| return VoiceDetector(model_loader=model_loader)
|
|
|
|
|
| async def validate_api_key(
|
| x_api_key: Annotated[
|
| str | None,
|
| Header(
|
| alias="x-api-key",
|
| description="API key for authentication",
|
| ),
|
| ] = None,
|
| settings: Annotated[Settings, Depends(get_settings)] = None,
|
| ) -> str:
|
| """
|
| Validate API key from request header.
|
|
|
| Args:
|
| x_api_key: API key from x-api-key header
|
| settings: Application settings
|
|
|
| Returns:
|
| Validated API key
|
|
|
| Raises:
|
| HTTPException: 401 if API key is missing or invalid
|
| """
|
| if settings is None:
|
| settings = get_settings()
|
|
|
| if not x_api_key:
|
| logger.warning("Request without API key")
|
| raise HTTPException(
|
| status_code=status.HTTP_401_UNAUTHORIZED,
|
| detail="API key is required. Provide it in the x-api-key header.",
|
| headers={"WWW-Authenticate": "ApiKey"},
|
| )
|
|
|
|
|
| valid_keys = settings.api_keys_list
|
|
|
| if not valid_keys:
|
| logger.error("No API keys configured on server")
|
| raise HTTPException(
|
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
| detail="Server configuration error",
|
| )
|
|
|
|
|
| key_valid = False
|
| for valid_key in valid_keys:
|
| if _constant_time_compare(x_api_key, valid_key):
|
| key_valid = True
|
| break
|
|
|
| if not key_valid:
|
| logger.warning(
|
| "Invalid API key attempt",
|
| key_prefix=x_api_key[:8] + "..." if len(x_api_key) > 8 else "***",
|
| )
|
| raise HTTPException(
|
| status_code=status.HTTP_401_UNAUTHORIZED,
|
| detail="Invalid API key",
|
| headers={"WWW-Authenticate": "ApiKey"},
|
| )
|
|
|
| return x_api_key
|
|
|
|
|
| def _constant_time_compare(val1: str, val2: str) -> bool:
|
| """
|
| Constant-time string comparison to prevent timing attacks.
|
|
|
| Args:
|
| val1: First string
|
| val2: Second string
|
|
|
| Returns:
|
| True if strings are equal
|
| """
|
| if len(val1) != len(val2):
|
| return False
|
|
|
| result = 0
|
| for x, y in zip(val1, val2):
|
| result |= ord(x) ^ ord(y)
|
|
|
| return result == 0
|
|
|
|
|
|
|
| ValidatedApiKey = Annotated[str, Depends(validate_api_key)]
|
| VoiceDetectorDep = Annotated[VoiceDetector, Depends(get_voice_detector)]
|
| SettingsDep = Annotated[Settings, Depends(get_settings)]
|
|
|