|
import logging |
|
from fastapi import Depends, HTTPException |
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
import jwt |
|
from jwt import PyJWKClient |
|
from config import JWKS_URL |
|
|
|
security = HTTPBearer() |
|
|
|
def get_public_key(token: str): |
|
try: |
|
jwks_client = PyJWKClient(JWKS_URL) |
|
signing_key = jwks_client.get_signing_key_from_jwt(token) |
|
return signing_key.key |
|
except Exception as e: |
|
logging.error(f"Error fetching public key: {e}") |
|
raise |
|
|
|
def token_required(credentials: HTTPAuthorizationCredentials = Depends(security)): |
|
token = credentials.credentials |
|
try: |
|
public_key = get_public_key(token) |
|
decoded = jwt.decode( |
|
token, |
|
public_key, |
|
algorithms=['RS256'], |
|
issuer="https://assuring-lobster-64.clerk.accounts.dev" |
|
) |
|
customer_id = decoded.get('org_id') |
|
user_id = decoded.get('sub') |
|
logging.info(f"Customer/Org ID: {customer_id}, User ID: {user_id}") |
|
if not customer_id: |
|
logging.error("Customer ID is missing in the token!") |
|
raise HTTPException(status_code=401, detail="Customer ID is missing in the token!") |
|
return customer_id, user_id |
|
except jwt.ExpiredSignatureError: |
|
logging.error("Token has expired") |
|
raise HTTPException(status_code=401, detail="Token has expired") |
|
except jwt.InvalidTokenError as e: |
|
logging.error(f"Invalid token: {e}") |
|
raise HTTPException(status_code=401, detail="Invalid token") |
|
except Exception as e: |
|
logging.error(f"Error decoding token: {e}") |
|
raise HTTPException(status_code=401, detail=str(e)) |
|
|