setu / api /core /security.py
khagu's picture
chore: finally untrack large database files
3998131
import os
import json
from typing import Dict, Optional
from datetime import datetime, timedelta
from jwt import PyJWKClient, decode, InvalidTokenError, get_unverified_header
from fastapi import HTTPException, status
from api.core.config import settings
class SupabaseJWT:
"""Handle Supabase JWT token verification and user extraction."""
def __init__(self, supabase_url: str):
if not supabase_url:
raise ValueError("SUPABASE_URL is not configured")
self.supabase_url = supabase_url.rstrip('/')
self.jwks_url = f"{self.supabase_url}/auth/v1/.well-known/jwks.json"
self._jwk_client: Optional[PyJWKClient] = None
print(f"[DEBUG] Initialized SupabaseJWT with URL: {self.supabase_url}")
print(f"[DEBUG] JWKS URL: {self.jwks_url}")
@property
def jwk_client(self) -> PyJWKClient:
"""Lazily initialize and cache the JWK client."""
if self._jwk_client is None:
try:
self._jwk_client = PyJWKClient(self.jwks_url)
print("[DEBUG] PyJWKClient initialized successfully")
except Exception as e:
print(f"[ERROR] Failed to initialize PyJWKClient: {e}")
raise
return self._jwk_client
def verify_token(self, token: str) -> Dict:
"""
Verify a Supabase JWT token and return the payload.
Args:
token: JWT token string
Returns:
Decoded token payload
Raises:
HTTPException: If token is invalid or expired
"""
try:
# First, decode without verification to see the header and payload
unverified_header = get_unverified_header(token)
print(f"[DEBUG] Token algorithm: {unverified_header.get('alg')}")
print(f"[DEBUG] Token kid: {unverified_header.get('kid')}")
# Get signing key from JWKS
try:
signing_key = self.jwk_client.get_signing_key_from_jwt(token)
print(f"[DEBUG] Successfully retrieved signing key")
except Exception as e:
print(f"[ERROR] Failed to get signing key: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Could not retrieve signing key: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
# Decode and verify the token
payload = decode(
token,
signing_key.key,
algorithms=["RS256", "HS256", "ES256"], # Support RS256, HS256, and ES256
options={"verify_aud": False},
)
print("[DEBUG] Token verified successfully")
return payload
except InvalidTokenError as e:
print(f"[ERROR] Invalid token: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid authentication credentials: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
except HTTPException:
raise
except Exception as e:
print(f"[ERROR] Token verification failed: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def extract_user(self, payload: Dict) -> Dict:
"""
Extract user information from token payload.
Args:
payload: Decoded JWT payload
Returns:
User object with id, email, role
"""
return {
"id": payload.get("sub"),
"email": payload.get("email"),
"role": payload.get("role") or payload.get("app_metadata", {}).get("role", "user"),
"phone": payload.get("phone"),
"user_metadata": payload.get("user_metadata", {}),
}
# Initialize Supabase JWT handler
supabase_jwt = None
try:
if settings.supabase_url:
supabase_jwt = SupabaseJWT(settings.supabase_url)
else:
print("[WARNING] SUPABASE_URL not configured")
except Exception as e:
print(f"[ERROR] Failed to initialize Supabase JWT: {e}")
def verify_supabase_token(token: str) -> Dict:
"""Verify and decode a Supabase JWT token."""
if not supabase_jwt:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Supabase not configured",
)
return supabase_jwt.verify_token(token)
def extract_user_from_token(payload: Dict) -> Dict:
"""Extract user info from token payload."""
if not supabase_jwt:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Supabase not configured",
)
user = supabase_jwt.extract_user(payload)
if not user.get("id"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
return user