Spaces:
Running
Running
| """API key + JWT verification.""" | |
| import secrets | |
| import time | |
| from typing import Optional | |
| import jwt | |
| from fastapi import HTTPException, Request | |
| from ..services.api_key_manager import ApiKeyManager | |
| from ..core import config | |
| _key_manager: Optional[ApiKeyManager] = None | |
| _JWT_SECRET = secrets.token_hex(32) | |
| _JWT_EXPIRE = 86400 * 7 # 7 days | |
| def init_auth(key_manager: ApiKeyManager): | |
| global _key_manager | |
| _key_manager = key_manager | |
| def create_login_token(username: str) -> str: | |
| payload = {"sub": username, "exp": int(time.time()) + _JWT_EXPIRE} | |
| return jwt.encode(payload, _JWT_SECRET, algorithm="HS256") | |
| def verify_login(username: str, password: str) -> bool: | |
| return username == config.ADMIN_USERNAME and password == config.ADMIN_PASSWORD | |
| def _extract_token(request: Request) -> Optional[str]: | |
| auth_header = request.headers.get("authorization", "") | |
| if auth_header.lower().startswith("bearer "): | |
| return auth_header[7:].strip() | |
| return request.headers.get("x-api-key") | |
| async def verify_api_key(request: Request) -> str: | |
| token = _extract_token(request) | |
| if not token: | |
| raise HTTPException(status_code=401, detail="Missing token") | |
| # Try JWT first | |
| try: | |
| payload = jwt.decode(token, _JWT_SECRET, algorithms=["HS256"]) | |
| if payload.get("exp", 0) > time.time(): | |
| return token | |
| except (jwt.InvalidTokenError, jwt.ExpiredSignatureError): | |
| pass | |
| # Fallback to API key | |
| if _key_manager and _key_manager.validate(token): | |
| return token | |
| raise HTTPException(status_code=401, detail="Invalid token") | |