""" API Route Handlers """ import json import uuid from datetime import datetime from typing import Optional from fastapi import APIRouter, UploadFile, File, Form, HTTPException, status from loguru import logger from rq import Queue from rq.job import Job from app.models import ( TaskCreateResponse, TaskStatusResponse, TaskStatus, HealthResponse, ErrorResponse, Level ) from app.core.redis_client import get_redis_client from app.core.storage import get_storage_manager from app.config import settings # Create router router = APIRouter() @router.get("/", tags=["Root"]) async def root(): """Root endpoint""" return { "message": "SWARA API - AI-Powered Public Speaking Evaluation", "version": "1.0.0", "docs": "/docs" } @router.get("/health", response_model=HealthResponse, tags=["Health"]) async def health_check(): """ Health check endpoint Checks: - API status - Redis connection """ redis_client = get_redis_client() redis_connected = redis_client.is_connected() return HealthResponse( status="healthy" if redis_connected else "degraded", version=settings.API_VERSION, redis_connected=redis_connected ) @router.post( "/api/v1/analyze", response_model=TaskCreateResponse, status_code=status.HTTP_202_ACCEPTED, tags=["Analysis"] ) async def analyze_video( video: UploadFile = File(..., description="Video file to analyze"), level: int = Form(..., ge=1, le=5, description="Public speaking level (1-5)"), user_id: Optional[str] = Form(None, description="Optional user ID") ): """ Upload video for analysis This endpoint accepts a video file and queues it for processing. Returns a task_id that can be used to check the analysis status. **Parameters:** - **video**: Video file (MP4 format recommended, max 50MB, max 1 minute) - **level**: Public speaking level (1-5) - **user_id**: Optional user identifier for tracking **Returns:** - task_id: Unique identifier to check task status - status: Task status (pending) - created_at: Task creation timestamp """ try: # Validate file type if not video.content_type or not video.content_type.startswith("video/"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid file type. Please upload a video file." ) # Read video content video_content = await video.read() video_size = len(video_content) # Validate file size if video_size > settings.max_video_size_bytes: raise HTTPException( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail=f"Video size exceeds maximum allowed size of {settings.MAX_VIDEO_SIZE_MB}MB" ) # Save video to temporary storage storage = get_storage_manager() video_path = await storage.save_video(video_content) # Generate task ID task_id = uuid.uuid4().hex # Create task metadata task_data = { "task_id": task_id, "status": TaskStatus.PENDING.value, "video_path": video_path, "level": level, "user_id": user_id, "video_size": video_size, "original_filename": video.filename, "created_at": datetime.utcnow().isoformat() } # Store task in Redis redis_client = get_redis_client().get_client() task_key = f"task:{task_id}" redis_client.setex( task_key, settings.TASK_RESULT_TTL_SECONDS, json.dumps(task_data) ) # Queue the task for processing queue = Queue(settings.TASK_QUEUE_NAME, connection=redis_client) job = queue.enqueue( 'app.tasks.process_video_task', task_id, video_path, level, job_timeout=settings.TASK_TIMEOUT_SECONDS, result_ttl=settings.TASK_RESULT_TTL_SECONDS, job_id=task_id ) logger.info(f"✓ Task created: {task_id} (Level {level}, Size: {video_size} bytes)") return TaskCreateResponse( task_id=task_id, status=TaskStatus.PENDING, message="Video uploaded successfully. Processing has been queued.", created_at=datetime.utcnow() ) except HTTPException: raise except Exception as e: logger.error(f"✗ Error creating task: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create analysis task: {str(e)}" ) @router.get( "/api/v1/task/{task_id}", response_model=TaskStatusResponse, tags=["Analysis"] ) async def get_task_status(task_id: str): """ Get task status and results Check the status of a video analysis task. If the task is completed, this endpoint returns the full analysis results. **Parameters:** - **task_id**: Task identifier returned from the analyze endpoint **Returns:** - Task status (pending, processing, completed, failed) - Progress information (if processing) - Analysis results (if completed) """ try: redis_client = get_redis_client().get_client() # Get task data from Redis task_key = f"task:{task_id}" task_data_str = redis_client.get(task_key) if not task_data_str: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task {task_id} not found or has expired" ) task_data = json.loads(task_data_str) # Get RQ job status try: job = Job.fetch(task_id, connection=redis_client) # Update status based on job state if job.is_finished: task_data["status"] = TaskStatus.COMPLETED.value task_data["completed_at"] = datetime.utcnow().isoformat() if job.result: task_data["result"] = job.result elif job.is_failed: task_data["status"] = TaskStatus.FAILED.value task_data["error"] = str(job.exc_info) if job.exc_info else "Unknown error" task_data["completed_at"] = datetime.utcnow().isoformat() elif job.is_started: task_data["status"] = TaskStatus.PROCESSING.value except: # Job not found in queue, use stored status pass # Parse response response = TaskStatusResponse( task_id=task_data["task_id"], status=TaskStatus(task_data["status"]), progress=task_data.get("progress"), result=task_data.get("result"), error=task_data.get("error"), created_at=datetime.fromisoformat(task_data["created_at"]), completed_at=datetime.fromisoformat(task_data["completed_at"]) if task_data.get("completed_at") else None ) return response except HTTPException: raise except Exception as e: logger.error(f"✗ Error getting task status: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get task status: {str(e)}" ) @router.delete( "/api/v1/task/{task_id}", tags=["Analysis"] ) async def delete_task(task_id: str): """ Delete task and cleanup associated files **Parameters:** - **task_id**: Task identifier to delete **Returns:** - Success message """ try: redis_client = get_redis_client().get_client() storage = get_storage_manager() # Get task data task_key = f"task:{task_id}" task_data_str = redis_client.get(task_key) if task_data_str: task_data = json.loads(task_data_str) # Delete video file if "video_path" in task_data: storage.delete_video(task_data["video_path"]) # Delete task from Redis redis_client.delete(task_key) logger.info(f"✓ Task deleted: {task_id}") return {"message": f"Task {task_id} deleted successfully"} else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task {task_id} not found" ) except HTTPException: raise except Exception as e: logger.error(f"✗ Error deleting task: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete task: {str(e)}" )