anderson-ufrj
fix: correct User model import path
477f960
"""
Module: api.routes.api_keys
Description: API routes for API key management
Author: Anderson H. Silva
Date: 2025-01-25
License: Proprietary - All rights reserved
"""
from typing import Optional, List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from pydantic import BaseModel, Field, EmailStr
from src.core import get_logger
from src.api.dependencies import get_current_user, get_db, require_admin
from src.services.api_key_service import APIKeyService
from src.models.api_key import APIKeyTier, APIKeyStatus
from src.api.auth import User
logger = get_logger(__name__)
router = APIRouter(prefix="/api-keys", tags=["API Keys"])
class CreateAPIKeyRequest(BaseModel):
"""Request model for creating API key."""
name: str = Field(..., description="Key name/description")
client_id: str = Field(..., description="Client identifier")
client_name: Optional[str] = Field(None, description="Client display name")
client_email: Optional[EmailStr] = Field(None, description="Client email")
tier: APIKeyTier = Field(APIKeyTier.FREE, description="API key tier")
expires_in_days: Optional[int] = Field(None, ge=1, le=365, description="Days until expiration")
rotation_period_days: int = Field(90, ge=0, le=365, description="Rotation period (0=disabled)")
allowed_ips: Optional[List[str]] = Field(None, description="Allowed IP addresses")
allowed_origins: Optional[List[str]] = Field(None, description="Allowed CORS origins")
scopes: Optional[List[str]] = Field(None, description="API scopes/permissions")
metadata: Optional[dict] = Field(None, description="Additional metadata")
class APIKeyResponse(BaseModel):
"""Response model for API key."""
id: str
name: str
key_prefix: str
status: str
tier: str
client_id: str
client_name: Optional[str]
expires_at: Optional[datetime]
last_used_at: Optional[datetime]
is_active: bool
needs_rotation: bool
rate_limits: dict
total_requests: int
created_at: datetime
class APIKeyCreateResponse(APIKeyResponse):
"""Response with the actual key (only shown once)."""
api_key: str = Field(..., description="The actual API key (save this!)")
class UpdateRateLimitsRequest(BaseModel):
"""Request model for updating rate limits."""
per_minute: Optional[int] = Field(None, ge=0, description="Requests per minute")
per_hour: Optional[int] = Field(None, ge=0, description="Requests per hour")
per_day: Optional[int] = Field(None, ge=0, description="Requests per day")
@router.post("", response_model=APIKeyCreateResponse)
async def create_api_key(
request: CreateAPIKeyRequest,
background_tasks: BackgroundTasks,
db=Depends(get_db),
current_user: User = Depends(require_admin)
) -> APIKeyCreateResponse:
"""
Create a new API key.
**Note**: The API key is only shown once. Save it securely!
Requires admin privileges.
"""
service = APIKeyService(db)
try:
api_key, plain_key = await service.create_api_key(
name=request.name,
client_id=request.client_id,
client_name=request.client_name,
client_email=request.client_email,
tier=request.tier,
expires_in_days=request.expires_in_days,
rotation_period_days=request.rotation_period_days,
allowed_ips=request.allowed_ips,
allowed_origins=request.allowed_origins,
scopes=request.scopes,
metadata=request.metadata
)
logger.info(
"api_key_created_via_api",
user=current_user.email,
client_id=request.client_id,
key_id=str(api_key.id)
)
return APIKeyCreateResponse(
**api_key.to_dict(),
api_key=plain_key
)
except Exception as e:
logger.error(
"api_key_creation_failed",
user=current_user.email,
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to create API key: {str(e)}"
)
@router.get("/{api_key_id}", response_model=APIKeyResponse)
async def get_api_key(
api_key_id: str,
db=Depends(get_db),
current_user: User = Depends(require_admin)
) -> APIKeyResponse:
"""
Get API key details.
Requires admin privileges.
"""
service = APIKeyService(db)
api_key = await service.get_by_id(api_key_id)
if not api_key:
raise HTTPException(
status_code=404,
detail=f"API key {api_key_id} not found"
)
return APIKeyResponse(**api_key.to_dict())
@router.get("/client/{client_id}", response_model=List[APIKeyResponse])
async def get_client_keys(
client_id: str,
include_inactive: bool = Query(False, description="Include inactive keys"),
db=Depends(get_db),
current_user: User = Depends(require_admin)
) -> List[APIKeyResponse]:
"""
Get all API keys for a client.
Requires admin privileges.
"""
service = APIKeyService(db)
api_keys = await service.get_by_client(client_id, include_inactive)
return [
APIKeyResponse(**key.to_dict())
for key in api_keys
]
@router.post("/{api_key_id}/rotate", response_model=APIKeyCreateResponse)
async def rotate_api_key(
api_key_id: str,
background_tasks: BackgroundTasks,
reason: str = Query(..., description="Rotation reason"),
grace_period_hours: int = Query(24, ge=1, le=168, description="Grace period in hours"),
db=Depends(get_db),
current_user: User = Depends(require_admin)
) -> APIKeyCreateResponse:
"""
Rotate an API key.
The old key will remain valid for the grace period.
**Note**: The new API key is only shown once. Save it securely!
Requires admin privileges.
"""
service = APIKeyService(db)
try:
api_key, new_plain_key = await service.rotate_api_key(
api_key_id=api_key_id,
reason=reason,
initiated_by=current_user.email,
grace_period_hours=grace_period_hours
)
logger.info(
"api_key_rotated_via_api",
user=current_user.email,
key_id=api_key_id,
reason=reason
)
return APIKeyCreateResponse(
**api_key.to_dict(),
api_key=new_plain_key
)
except Exception as e:
logger.error(
"api_key_rotation_failed",
user=current_user.email,
key_id=api_key_id,
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to rotate API key: {str(e)}"
)
@router.delete("/{api_key_id}")
async def revoke_api_key(
api_key_id: str,
reason: str = Query(..., description="Revocation reason"),
db=Depends(get_db),
current_user: User = Depends(require_admin)
) -> dict:
"""
Revoke an API key.
The key will be immediately invalidated.
Requires admin privileges.
"""
service = APIKeyService(db)
try:
api_key = await service.revoke_api_key(
api_key_id=api_key_id,
reason=reason,
revoked_by=current_user.email
)
logger.warning(
"api_key_revoked_via_api",
user=current_user.email,
key_id=api_key_id,
reason=reason
)
return {
"message": "API key revoked successfully",
"api_key_id": api_key_id,
"status": api_key.status
}
except Exception as e:
logger.error(
"api_key_revocation_failed",
user=current_user.email,
key_id=api_key_id,
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to revoke API key: {str(e)}"
)
@router.put("/{api_key_id}/rate-limits", response_model=APIKeyResponse)
async def update_rate_limits(
api_key_id: str,
request: UpdateRateLimitsRequest,
db=Depends(get_db),
current_user: User = Depends(require_admin)
) -> APIKeyResponse:
"""
Update custom rate limits for an API key.
Set to null to use tier defaults.
Requires admin privileges.
"""
service = APIKeyService(db)
try:
api_key = await service.update_rate_limits(
api_key_id=api_key_id,
per_minute=request.per_minute,
per_hour=request.per_hour,
per_day=request.per_day
)
return APIKeyResponse(**api_key.to_dict())
except Exception as e:
logger.error(
"rate_limit_update_failed",
user=current_user.email,
key_id=api_key_id,
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to update rate limits: {str(e)}"
)
@router.get("/{api_key_id}/usage", response_model=dict)
async def get_usage_stats(
api_key_id: str,
days: int = Query(30, ge=1, le=365, description="Days of history"),
db=Depends(get_db),
current_user: User = Depends(require_admin)
) -> dict:
"""
Get usage statistics for an API key.
Requires admin privileges.
"""
service = APIKeyService(db)
try:
stats = await service.get_usage_stats(api_key_id, days)
return stats
except Exception as e:
logger.error(
"usage_stats_failed",
user=current_user.email,
key_id=api_key_id,
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to get usage stats: {str(e)}"
)
@router.post("/rotate-all")
async def rotate_all_due_keys(
background_tasks: BackgroundTasks,
db=Depends(get_db),
current_user: User = Depends(require_admin)
) -> dict:
"""
Rotate all API keys that are due for rotation.
This is typically run as a scheduled job.
Requires admin privileges.
"""
service = APIKeyService(db)
try:
rotated_keys = await service.check_and_rotate_keys()
return {
"message": "Key rotation check completed",
"rotated_count": len(rotated_keys),
"rotated_keys": rotated_keys
}
except Exception as e:
logger.error(
"bulk_rotation_failed",
user=current_user.email,
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to rotate keys: {str(e)}"
)
@router.post("/cleanup-expired")
async def cleanup_expired_keys(
db=Depends(get_db),
current_user: User = Depends(require_admin)
) -> dict:
"""
Clean up expired API keys.
This is typically run as a scheduled job.
Requires admin privileges.
"""
service = APIKeyService(db)
try:
cleaned_count = await service.cleanup_expired_keys()
return {
"message": "Expired keys cleanup completed",
"cleaned_count": cleaned_count
}
except Exception as e:
logger.error(
"cleanup_failed",
user=current_user.email,
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to cleanup keys: {str(e)}"
)