cc3m / auth.py
kokokoasd's picture
Upload 21 files
36ce73b verified
"""
Auth dependencies for the Space backend.
Verifies JWT tokens by calling the Admin Worker's /auth/me endpoint.
Results are cached in-memory (TTL-based) to avoid hitting the Worker on every request.
No JWT_SECRET needed on the Space side.
"""
import time
from dataclasses import dataclass
import httpx
from fastapi import Request, WebSocket, HTTPException
from config import ADMIN_API_URL
@dataclass
class AuthUser:
"""Authenticated user extracted from JWT."""
sub: str # user ID
username: str
role: str # "admin" or "user"
# ── Token verification cache ──
# Maps token -> (AuthUser, expiry_timestamp)
_token_cache: dict[str, tuple[AuthUser, float]] = {}
_CACHE_TTL = 300 # 5 minutes
def _cleanup_cache():
"""Remove expired entries from cache."""
now = time.time()
expired = [k for k, (_, exp) in _token_cache.items() if exp < now]
for k in expired:
del _token_cache[k]
def verify_token(token: str) -> AuthUser | None:
"""Verify a token by calling Worker /auth/me, with caching."""
if not token or not ADMIN_API_URL:
return None
# Check cache first
now = time.time()
cached = _token_cache.get(token)
if cached:
user, expiry = cached
if expiry > now:
return user
else:
del _token_cache[token]
# Call Worker to verify
try:
resp = httpx.get(
f"{ADMIN_API_URL}/auth/me",
headers={"Authorization": f"Bearer {token}"},
timeout=10,
)
if resp.status_code != 200:
return None
data = resp.json()
user_data = data.get("user")
if not user_data:
return None
user = AuthUser(
sub=user_data.get("id", ""),
username=user_data.get("username", ""),
role=user_data.get("role", "user"),
)
if not user.sub or not user.username:
return None
# Cache the result
_token_cache[token] = (user, now + _CACHE_TTL)
# Periodic cleanup
if len(_token_cache) > 100:
_cleanup_cache()
return user
except Exception:
return None
def get_current_user(request: Request) -> AuthUser:
"""FastAPI dependency: extract and verify JWT from Authorization header."""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(401, "Chưa đăng nhập")
token = auth_header[7:]
user = verify_token(token)
if not user:
raise HTTPException(401, "Token không hợp lệ hoặc đã hết hạn")
return user
def get_ws_user(websocket: WebSocket) -> AuthUser | None:
"""Extract and verify JWT from WebSocket query parameter ?token=..."""
token = websocket.query_params.get("token", "")
if not token:
return None
return verify_token(token)