ABSA / src /utils /admin_endpoints.py
parthnuwal7's picture
Adding Mongo+Redis concept
4b62d23
"""
Admin analytics endpoints for monitoring and metrics.
"""
import os
import logging
from typing import Optional
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, HTTPException, Header, Depends
from .mongodb_service import get_mongodb_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/admin", tags=["admin"])
def verify_admin_token(authorization: Optional[str] = Header(None)) -> bool:
"""
Verify admin token from Authorization header.
Args:
authorization: Authorization header value
Returns:
True if valid, raises HTTPException otherwise
"""
admin_token = os.getenv("ADMIN_TOKEN")
if not admin_token:
logger.error("ADMIN_TOKEN not configured")
raise HTTPException(status_code=500, detail="Admin authentication not configured")
if not authorization:
raise HTTPException(status_code=401, detail="Authorization header required")
# Expected format: "Bearer <token>"
try:
scheme, token = authorization.split()
if scheme.lower() != "bearer":
raise HTTPException(status_code=401, detail="Invalid authorization scheme")
if token != admin_token:
raise HTTPException(status_code=401, detail="Invalid admin token")
return True
except ValueError:
raise HTTPException(status_code=401, detail="Invalid authorization header format")
@router.get("/metrics/summary")
async def get_metrics_summary(authorized: bool = Depends(verify_admin_token)):
"""
Get summary metrics for all events.
Returns:
- Total events by type
- Unique devices
- Unique users
- Rate limit hits
- Time range of data
"""
mongodb_service = get_mongodb_service()
try:
# Aggregate events by type
pipeline = [
{
"$group": {
"_id": "$event_type",
"count": {"$sum": 1}
}
},
{
"$sort": {"count": -1}
}
]
events_by_type = mongodb_service.aggregate_events(pipeline)
# Count unique devices
pipeline_devices = [
{
"$group": {
"_id": "$device_id"
}
},
{
"$count": "total"
}
]
unique_devices = mongodb_service.aggregate_events(pipeline_devices)
device_count = unique_devices[0]["total"] if unique_devices else 0
# Count unique users
pipeline_users = [
{
"$match": {"user_id": {"$ne": None}}
},
{
"$group": {
"_id": "$user_id"
}
},
{
"$count": "total"
}
]
unique_users = mongodb_service.aggregate_events(pipeline_users)
user_count = unique_users[0]["total"] if unique_users else 0
# Get time range
pipeline_timerange = [
{
"$group": {
"_id": None,
"first_event": {"$min": "$created_at"},
"last_event": {"$max": "$created_at"}
}
}
]
timerange = mongodb_service.aggregate_events(pipeline_timerange)
# Format response
summary = {
"events_by_type": {item["_id"]: item["count"] for item in events_by_type},
"unique_devices": device_count,
"unique_users": user_count,
"time_range": timerange[0] if timerange else None
}
return {"status": "success", "data": summary}
except Exception as e:
logger.error(f"Failed to get metrics summary: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/metrics/events")
async def get_events_timeline(
event_type: Optional[str] = None,
days: int = 7,
authorized: bool = Depends(verify_admin_token)
):
"""
Get events timeline (events per day/hour).
Args:
event_type: Filter by event type
days: Number of days to include (default 7)
Returns:
Events grouped by date
"""
mongodb_service = get_mongodb_service()
try:
# Calculate date range
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
# Build match stage
match_stage = {
"created_at": {
"$gte": start_date,
"$lte": end_date
}
}
if event_type:
match_stage["event_type"] = event_type
# Aggregate by day
pipeline = [
{"$match": match_stage},
{
"$group": {
"_id": {
"year": {"$year": "$created_at"},
"month": {"$month": "$created_at"},
"day": {"$dayOfMonth": "$created_at"},
"event_type": "$event_type"
},
"count": {"$sum": 1}
}
},
{
"$sort": {
"_id.year": 1,
"_id.month": 1,
"_id.day": 1
}
}
]
results = mongodb_service.aggregate_events(pipeline)
# Format results
timeline = []
for item in results:
timeline.append({
"date": f"{item['_id']['year']}-{item['_id']['month']:02d}-{item['_id']['day']:02d}",
"event_type": item["_id"]["event_type"],
"count": item["count"]
})
return {
"status": "success",
"data": {
"timeline": timeline,
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
}
}
except Exception as e:
logger.error(f"Failed to get events timeline: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/metrics/funnel")
async def get_funnel_analysis(
days: int = 7,
authorized: bool = Depends(verify_admin_token)
):
"""
Get funnel analysis: DASHBOARD_VIEW → ANALYSIS_REQUEST → TASK_COMPLETED.
Args:
days: Number of days to analyze (default 7)
Returns:
Funnel metrics with conversion rates
"""
mongodb_service = get_mongodb_service()
try:
# Calculate date range
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
# Count each stage
funnel_stages = {
"DASHBOARD_VIEW": 0,
"ANALYSIS_REQUEST": 0,
"TASK_QUEUED": 0,
"TASK_COMPLETED": 0
}
for event_type in funnel_stages.keys():
pipeline = [
{
"$match": {
"event_type": event_type,
"created_at": {
"$gte": start_date,
"$lte": end_date
}
}
},
{
"$count": "total"
}
]
result = mongodb_service.aggregate_events(pipeline)
funnel_stages[event_type] = result[0]["total"] if result else 0
# Calculate conversion rates
dashboard_views = funnel_stages["DASHBOARD_VIEW"]
analysis_requests = funnel_stages["ANALYSIS_REQUEST"]
tasks_queued = funnel_stages["TASK_QUEUED"]
tasks_completed = funnel_stages["TASK_COMPLETED"]
conversions = {
"view_to_request": (
(analysis_requests / dashboard_views * 100)
if dashboard_views > 0 else 0
),
"request_to_queued": (
(tasks_queued / analysis_requests * 100)
if analysis_requests > 0 else 0
),
"queued_to_completed": (
(tasks_completed / tasks_queued * 100)
if tasks_queued > 0 else 0
),
"overall_completion": (
(tasks_completed / dashboard_views * 100)
if dashboard_views > 0 else 0
)
}
# Get unique devices at each stage
device_counts = {}
for event_type in funnel_stages.keys():
pipeline = [
{
"$match": {
"event_type": event_type,
"created_at": {
"$gte": start_date,
"$lte": end_date
}
}
},
{
"$group": {
"_id": "$device_id"
}
},
{
"$count": "total"
}
]
result = mongodb_service.aggregate_events(pipeline)
device_counts[event_type] = result[0]["total"] if result else 0
return {
"status": "success",
"data": {
"funnel_stages": funnel_stages,
"conversion_rates": conversions,
"unique_devices_per_stage": device_counts,
"date_range": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"days": days
}
}
}
except Exception as e:
logger.error(f"Failed to get funnel analysis: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/metrics/rate-limits")
async def get_rate_limit_stats(
days: int = 7,
authorized: bool = Depends(verify_admin_token)
):
"""
Get rate limit hit statistics.
Args:
days: Number of days to analyze (default 7)
Returns:
Rate limit statistics
"""
mongodb_service = get_mongodb_service()
try:
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
# Count total rate limit hits
pipeline_total = [
{
"$match": {
"event_type": "RATE_LIMIT_HIT",
"created_at": {
"$gte": start_date,
"$lte": end_date
}
}
},
{
"$count": "total"
}
]
total_result = mongodb_service.aggregate_events(pipeline_total)
total_hits = total_result[0]["total"] if total_result else 0
# Group by device
pipeline_devices = [
{
"$match": {
"event_type": "RATE_LIMIT_HIT",
"created_at": {
"$gte": start_date,
"$lte": end_date
}
}
},
{
"$group": {
"_id": "$device_id",
"count": {"$sum": 1}
}
},
{
"$sort": {"count": -1}
},
{
"$limit": 10
}
]
top_devices = mongodb_service.aggregate_events(pipeline_devices)
# Timeline
pipeline_timeline = [
{
"$match": {
"event_type": "RATE_LIMIT_HIT",
"created_at": {
"$gte": start_date,
"$lte": end_date
}
}
},
{
"$group": {
"_id": {
"year": {"$year": "$created_at"},
"month": {"$month": "$created_at"},
"day": {"$dayOfMonth": "$created_at"}
},
"count": {"$sum": 1}
}
},
{
"$sort": {
"_id.year": 1,
"_id.month": 1,
"_id.day": 1
}
}
]
timeline = mongodb_service.aggregate_events(pipeline_timeline)
return {
"status": "success",
"data": {
"total_hits": total_hits,
"top_devices": [
{"device_id": item["_id"], "hits": item["count"]}
for item in top_devices
],
"timeline": [
{
"date": f"{item['_id']['year']}-{item['_id']['month']:02d}-{item['_id']['day']:02d}",
"count": item["count"]
}
for item in timeline
]
}
}
except Exception as e:
logger.error(f"Failed to get rate limit stats: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))