|
|
""" |
|
|
REST API server for BackgroundFX Pro. |
|
|
Provides HTTP endpoints for all processing functionality. |
|
|
""" |
|
|
|
|
|
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks, Depends, status |
|
|
from fastapi.responses import FileResponse, StreamingResponse, JSONResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from pydantic import BaseModel, Field, validator |
|
|
from typing import Dict, List, Optional, Union, Any |
|
|
from enum import Enum |
|
|
import asyncio |
|
|
import aiofiles |
|
|
from pathlib import Path |
|
|
import tempfile |
|
|
import shutil |
|
|
import uuid |
|
|
import time |
|
|
from datetime import datetime, timedelta |
|
|
import jwt |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import io |
|
|
import base64 |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import redis |
|
|
from contextlib import asynccontextmanager |
|
|
|
|
|
from ..utils.logger import setup_logger |
|
|
from .pipeline import ProcessingPipeline, PipelineConfig, ProcessingMode |
|
|
from .video_processor import VideoProcessorAPI, StreamConfig, VideoStreamMode |
|
|
from .batch_processor import BatchProcessor, BatchConfig, BatchItem, BatchPriority |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ServerConfig: |
|
|
"""Server configuration.""" |
|
|
HOST: str = "0.0.0.0" |
|
|
PORT: int = 8000 |
|
|
UPLOAD_DIR: str = "uploads" |
|
|
OUTPUT_DIR: str = "outputs" |
|
|
TEMP_DIR: str = "temp" |
|
|
MAX_UPLOAD_SIZE: int = 500 * 1024 * 1024 |
|
|
ALLOWED_EXTENSIONS: List[str] = [".jpg", ".jpeg", ".png", ".mp4", ".avi", ".mov"] |
|
|
|
|
|
|
|
|
SECRET_KEY: str = "your-secret-key-change-in-production" |
|
|
ALGORITHM: str = "HS256" |
|
|
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 |
|
|
|
|
|
|
|
|
REDIS_URL: str = "redis://localhost:6379" |
|
|
CACHE_TTL: int = 3600 |
|
|
|
|
|
|
|
|
RATE_LIMIT_REQUESTS: int = 100 |
|
|
RATE_LIMIT_WINDOW: int = 60 |
|
|
|
|
|
|
|
|
MAX_WORKERS: int = 4 |
|
|
ENABLE_GPU: bool = True |
|
|
|
|
|
|
|
|
config = ServerConfig() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BackgroundType(str, Enum): |
|
|
"""Background types.""" |
|
|
BLUR = "blur" |
|
|
OFFICE = "office" |
|
|
GRADIENT = "gradient" |
|
|
NATURE = "nature" |
|
|
CUSTOM = "custom" |
|
|
NONE = "none" |
|
|
|
|
|
|
|
|
class QualityPreset(str, Enum): |
|
|
"""Quality presets.""" |
|
|
LOW = "low" |
|
|
MEDIUM = "medium" |
|
|
HIGH = "high" |
|
|
ULTRA = "ultra" |
|
|
|
|
|
|
|
|
class ProcessingRequest(BaseModel): |
|
|
"""Base processing request.""" |
|
|
background: BackgroundType = BackgroundType.BLUR |
|
|
background_url: Optional[str] = None |
|
|
quality: QualityPreset = QualityPreset.HIGH |
|
|
preserve_original: bool = False |
|
|
|
|
|
class Config: |
|
|
schema_extra = { |
|
|
"example": { |
|
|
"background": "office", |
|
|
"quality": "high", |
|
|
"preserve_original": False |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class ImageProcessingRequest(ProcessingRequest): |
|
|
"""Image processing request.""" |
|
|
resize: Optional[tuple[int, int]] = None |
|
|
apply_effects: List[str] = Field(default_factory=list) |
|
|
output_format: str = "png" |
|
|
|
|
|
|
|
|
class VideoProcessingRequest(ProcessingRequest): |
|
|
"""Video processing request.""" |
|
|
start_time: Optional[float] = None |
|
|
end_time: Optional[float] = None |
|
|
fps: Optional[float] = None |
|
|
resolution: Optional[tuple[int, int]] = None |
|
|
codec: str = "h264" |
|
|
|
|
|
|
|
|
class BatchProcessingRequest(BaseModel): |
|
|
"""Batch processing request.""" |
|
|
items: List[Dict[str, Any]] |
|
|
parallel: bool = True |
|
|
priority: str = "normal" |
|
|
callback_url: Optional[str] = None |
|
|
|
|
|
|
|
|
class StreamingRequest(BaseModel): |
|
|
"""Streaming request.""" |
|
|
source: str |
|
|
stream_type: str = "webcam" |
|
|
output_format: str = "hls" |
|
|
quality: QualityPreset = QualityPreset.MEDIUM |
|
|
|
|
|
|
|
|
class ProcessingResponse(BaseModel): |
|
|
"""Processing response.""" |
|
|
job_id: str |
|
|
status: str |
|
|
progress: float = 0.0 |
|
|
message: Optional[str] = None |
|
|
result_url: Optional[str] = None |
|
|
metadata: Dict[str, Any] = Field(default_factory=dict) |
|
|
created_at: datetime = Field(default_factory=datetime.now) |
|
|
completed_at: Optional[datetime] = None |
|
|
|
|
|
|
|
|
class JobStatus(BaseModel): |
|
|
"""Job status response.""" |
|
|
job_id: str |
|
|
status: str |
|
|
progress: float |
|
|
current_stage: Optional[str] = None |
|
|
time_elapsed: float |
|
|
time_remaining: Optional[float] = None |
|
|
errors: List[str] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class JobManager: |
|
|
"""Manage processing jobs.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.jobs: Dict[str, ProcessingResponse] = {} |
|
|
self.executor = ThreadPoolExecutor(max_workers=config.MAX_WORKERS) |
|
|
self.redis_client = None |
|
|
try: |
|
|
self.redis_client = redis.from_url(config.REDIS_URL) |
|
|
except: |
|
|
logger.warning("Redis not available, using in-memory storage") |
|
|
|
|
|
def create_job(self) -> str: |
|
|
"""Create new job ID.""" |
|
|
job_id = str(uuid.uuid4()) |
|
|
self.jobs[job_id] = ProcessingResponse( |
|
|
job_id=job_id, |
|
|
status="pending" |
|
|
) |
|
|
return job_id |
|
|
|
|
|
def update_job(self, job_id: str, **kwargs): |
|
|
"""Update job status.""" |
|
|
if job_id in self.jobs: |
|
|
for key, value in kwargs.items(): |
|
|
if hasattr(self.jobs[job_id], key): |
|
|
setattr(self.jobs[job_id], key, value) |
|
|
|
|
|
|
|
|
if self.redis_client: |
|
|
try: |
|
|
self.redis_client.setex( |
|
|
f"job:{job_id}", |
|
|
config.CACHE_TTL, |
|
|
self.jobs[job_id].json() |
|
|
) |
|
|
except: |
|
|
pass |
|
|
|
|
|
def get_job(self, job_id: str) -> Optional[ProcessingResponse]: |
|
|
"""Get job status.""" |
|
|
|
|
|
if job_id in self.jobs: |
|
|
return self.jobs[job_id] |
|
|
|
|
|
|
|
|
if self.redis_client: |
|
|
try: |
|
|
data = self.redis_client.get(f"job:{job_id}") |
|
|
if data: |
|
|
return ProcessingResponse.parse_raw(data) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@asynccontextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
"""Application lifespan manager.""" |
|
|
|
|
|
logger.info("Starting BackgroundFX Pro API Server") |
|
|
|
|
|
|
|
|
for dir_path in [config.UPLOAD_DIR, config.OUTPUT_DIR, config.TEMP_DIR]: |
|
|
Path(dir_path).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
app.state.pipeline = ProcessingPipeline( |
|
|
PipelineConfig(use_gpu=config.ENABLE_GPU) |
|
|
) |
|
|
app.state.video_processor = VideoProcessorAPI() |
|
|
app.state.batch_processor = BatchProcessor() |
|
|
app.state.job_manager = JobManager() |
|
|
|
|
|
yield |
|
|
|
|
|
|
|
|
logger.info("Shutting down BackgroundFX Pro API Server") |
|
|
app.state.pipeline.shutdown() |
|
|
app.state.video_processor.cleanup() |
|
|
app.state.batch_processor.cleanup() |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="BackgroundFX Pro API", |
|
|
description="Professional background removal and replacement API", |
|
|
version="1.0.0", |
|
|
lifespan=lifespan |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
security = HTTPBearer() |
|
|
|
|
|
|
|
|
def create_access_token(data: dict) -> str: |
|
|
"""Create JWT access token.""" |
|
|
to_encode = data.copy() |
|
|
expire = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES) |
|
|
to_encode.update({"exp": expire}) |
|
|
return jwt.encode(to_encode, config.SECRET_KEY, algorithm=config.ALGORITHM) |
|
|
|
|
|
|
|
|
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: |
|
|
"""Verify JWT token.""" |
|
|
token = credentials.credentials |
|
|
try: |
|
|
payload = jwt.decode(token, config.SECRET_KEY, algorithms=[config.ALGORITHM]) |
|
|
username: str = payload.get("sub") |
|
|
if username is None: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid authentication credentials", |
|
|
) |
|
|
return username |
|
|
except jwt.PyJWTError: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid authentication credentials", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
"""Root endpoint.""" |
|
|
return { |
|
|
"name": "BackgroundFX Pro API", |
|
|
"version": "1.0.0", |
|
|
"status": "running", |
|
|
"endpoints": { |
|
|
"health": "/health", |
|
|
"docs": "/docs", |
|
|
"process_image": "/api/v1/process/image", |
|
|
"process_video": "/api/v1/process/video", |
|
|
"batch": "/api/v1/batch", |
|
|
"stream": "/api/v1/stream" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Health check endpoint.""" |
|
|
return { |
|
|
"status": "healthy", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"services": { |
|
|
"pipeline": "ready", |
|
|
"video_processor": "ready", |
|
|
"batch_processor": "ready", |
|
|
"redis": "connected" if app.state.job_manager.redis_client else "disconnected" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/api/v1/stats") |
|
|
async def get_statistics(current_user: str = Depends(verify_token)): |
|
|
"""Get processing statistics.""" |
|
|
return { |
|
|
"pipeline": app.state.pipeline.get_statistics(), |
|
|
"video": app.state.video_processor.get_stats(), |
|
|
"batch": app.state.batch_processor.get_status() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/v1/process/image", response_model=ProcessingResponse) |
|
|
async def process_image( |
|
|
background_tasks: BackgroundTasks, |
|
|
file: UploadFile = File(...), |
|
|
request: ImageProcessingRequest = Depends(), |
|
|
current_user: str = Depends(verify_token) |
|
|
): |
|
|
"""Process a single image.""" |
|
|
|
|
|
|
|
|
if not file.filename.lower().endswith(tuple(config.ALLOWED_EXTENSIONS)): |
|
|
raise HTTPException(400, "Invalid file format") |
|
|
|
|
|
if file.size > config.MAX_UPLOAD_SIZE: |
|
|
raise HTTPException(413, "File too large") |
|
|
|
|
|
|
|
|
job_id = app.state.job_manager.create_job() |
|
|
|
|
|
|
|
|
upload_path = Path(config.UPLOAD_DIR) / f"{job_id}_{file.filename}" |
|
|
async with aiofiles.open(upload_path, 'wb') as f: |
|
|
content = await file.read() |
|
|
await f.write(content) |
|
|
|
|
|
|
|
|
background_tasks.add_task( |
|
|
process_image_task, |
|
|
app.state, |
|
|
job_id, |
|
|
str(upload_path), |
|
|
request |
|
|
) |
|
|
|
|
|
return ProcessingResponse( |
|
|
job_id=job_id, |
|
|
status="processing", |
|
|
message="Image processing started" |
|
|
) |
|
|
|
|
|
|
|
|
async def process_image_task(app_state, job_id: str, input_path: str, request: ImageProcessingRequest): |
|
|
"""Background task for image processing.""" |
|
|
try: |
|
|
|
|
|
app_state.job_manager.update_job(job_id, status="processing", progress=0.1) |
|
|
|
|
|
|
|
|
image = cv2.imread(input_path) |
|
|
|
|
|
|
|
|
background = None |
|
|
if request.background == BackgroundType.CUSTOM and request.background_url: |
|
|
|
|
|
|
|
|
pass |
|
|
elif request.background != BackgroundType.NONE: |
|
|
background = request.background.value |
|
|
|
|
|
|
|
|
config = PipelineConfig( |
|
|
quality_preset=request.quality.value, |
|
|
apply_effects=request.apply_effects |
|
|
) |
|
|
|
|
|
|
|
|
result = app_state.pipeline.process_image(image, background) |
|
|
|
|
|
if result.success: |
|
|
|
|
|
output_filename = f"{job_id}_output.{request.output_format}" |
|
|
output_path = Path(config.OUTPUT_DIR) / output_filename |
|
|
cv2.imwrite(str(output_path), result.output_image) |
|
|
|
|
|
|
|
|
app_state.job_manager.update_job( |
|
|
job_id, |
|
|
status="completed", |
|
|
progress=1.0, |
|
|
result_url=f"/api/v1/download/{output_filename}", |
|
|
completed_at=datetime.now(), |
|
|
metadata={ |
|
|
"quality_score": result.quality_score, |
|
|
"processing_time": result.processing_time |
|
|
} |
|
|
) |
|
|
else: |
|
|
app_state.job_manager.update_job( |
|
|
job_id, |
|
|
status="failed", |
|
|
message="Processing failed" |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Image processing failed for job {job_id}: {e}") |
|
|
app_state.job_manager.update_job( |
|
|
job_id, |
|
|
status="failed", |
|
|
message=str(e) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/v1/process/video", response_model=ProcessingResponse) |
|
|
async def process_video( |
|
|
background_tasks: BackgroundTasks, |
|
|
file: UploadFile = File(...), |
|
|
request: VideoProcessingRequest = Depends(), |
|
|
current_user: str = Depends(verify_token) |
|
|
): |
|
|
"""Process a video file.""" |
|
|
|
|
|
|
|
|
if not file.filename.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')): |
|
|
raise HTTPException(400, "Invalid video format") |
|
|
|
|
|
|
|
|
job_id = app_state.job_manager.create_job() |
|
|
|
|
|
|
|
|
upload_path = Path(config.UPLOAD_DIR) / f"{job_id}_{file.filename}" |
|
|
async with aiofiles.open(upload_path, 'wb') as f: |
|
|
content = await file.read() |
|
|
await f.write(content) |
|
|
|
|
|
|
|
|
background_tasks.add_task( |
|
|
process_video_task, |
|
|
app.state, |
|
|
job_id, |
|
|
str(upload_path), |
|
|
request |
|
|
) |
|
|
|
|
|
return ProcessingResponse( |
|
|
job_id=job_id, |
|
|
status="processing", |
|
|
message="Video processing started" |
|
|
) |
|
|
|
|
|
|
|
|
async def process_video_task(app_state, job_id: str, input_path: str, request: VideoProcessingRequest): |
|
|
"""Background task for video processing.""" |
|
|
try: |
|
|
|
|
|
def progress_callback(progress: float, info: Dict): |
|
|
app_state.job_manager.update_job( |
|
|
job_id, |
|
|
progress=progress, |
|
|
metadata=info |
|
|
) |
|
|
|
|
|
|
|
|
output_path = Path(config.OUTPUT_DIR) / f"{job_id}_output.mp4" |
|
|
|
|
|
stats = await app_state.video_processor.process_video_async( |
|
|
input_path, |
|
|
str(output_path), |
|
|
background=request.background.value if request.background != BackgroundType.NONE else None, |
|
|
progress_callback=progress_callback |
|
|
) |
|
|
|
|
|
|
|
|
app_state.job_manager.update_job( |
|
|
job_id, |
|
|
status="completed", |
|
|
progress=1.0, |
|
|
result_url=f"/api/v1/download/{output_path.name}", |
|
|
completed_at=datetime.now(), |
|
|
metadata={ |
|
|
"frames_processed": stats.frames_processed, |
|
|
"processing_fps": stats.processing_fps, |
|
|
"avg_quality": stats.avg_quality_score |
|
|
} |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video processing failed for job {job_id}: {e}") |
|
|
app_state.job_manager.update_job( |
|
|
job_id, |
|
|
status="failed", |
|
|
message=str(e) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/v1/batch", response_model=ProcessingResponse) |
|
|
async def process_batch( |
|
|
background_tasks: BackgroundTasks, |
|
|
request: BatchProcessingRequest, |
|
|
current_user: str = Depends(verify_token) |
|
|
): |
|
|
"""Process multiple files in batch.""" |
|
|
|
|
|
|
|
|
job_id = app.state.job_manager.create_job() |
|
|
|
|
|
|
|
|
background_tasks.add_task( |
|
|
process_batch_task, |
|
|
app.state, |
|
|
job_id, |
|
|
request |
|
|
) |
|
|
|
|
|
return ProcessingResponse( |
|
|
job_id=job_id, |
|
|
status="processing", |
|
|
message=f"Batch processing started for {len(request.items)} items" |
|
|
) |
|
|
|
|
|
|
|
|
async def process_batch_task(app_state, job_id: str, request: BatchProcessingRequest): |
|
|
"""Background task for batch processing.""" |
|
|
try: |
|
|
|
|
|
batch_items = [] |
|
|
for item_data in request.items: |
|
|
batch_item = BatchItem( |
|
|
id=item_data.get('id', str(uuid.uuid4())), |
|
|
input_path=item_data['input_path'], |
|
|
output_path=item_data['output_path'], |
|
|
file_type=item_data.get('file_type', 'image'), |
|
|
priority=BatchPriority[request.priority.upper()], |
|
|
background=item_data.get('background') |
|
|
) |
|
|
batch_items.append(batch_item) |
|
|
|
|
|
|
|
|
def progress_callback(progress: float, info: Dict): |
|
|
app_state.job_manager.update_job( |
|
|
job_id, |
|
|
progress=progress, |
|
|
metadata=info |
|
|
) |
|
|
|
|
|
|
|
|
batch_config = BatchConfig( |
|
|
progress_callback=progress_callback, |
|
|
max_workers=config.MAX_WORKERS if request.parallel else 1 |
|
|
) |
|
|
|
|
|
processor = BatchProcessor(batch_config) |
|
|
report = processor.process_batch(batch_items) |
|
|
|
|
|
|
|
|
app_state.job_manager.update_job( |
|
|
job_id, |
|
|
status="completed", |
|
|
progress=1.0, |
|
|
completed_at=datetime.now(), |
|
|
metadata={ |
|
|
"total_items": report.total_items, |
|
|
"successful_items": report.successful_items, |
|
|
"failed_items": report.failed_items, |
|
|
"avg_quality": report.quality_metrics.get('avg_quality', 0) |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if request.callback_url: |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Batch processing failed for job {job_id}: {e}") |
|
|
app_state.job_manager.update_job( |
|
|
job_id, |
|
|
status="failed", |
|
|
message=str(e) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/v1/stream/start") |
|
|
async def start_stream( |
|
|
request: StreamingRequest, |
|
|
current_user: str = Depends(verify_token) |
|
|
): |
|
|
"""Start a streaming session.""" |
|
|
|
|
|
|
|
|
stream_config = StreamConfig( |
|
|
source=request.source, |
|
|
stream_mode=VideoStreamMode[request.stream_type.upper()], |
|
|
output_format=request.output_format, |
|
|
output_path=f"{config.OUTPUT_DIR}/stream_{uuid.uuid4()}" |
|
|
) |
|
|
|
|
|
|
|
|
success = app.state.video_processor.start_stream_processing( |
|
|
stream_config, |
|
|
background=None |
|
|
) |
|
|
|
|
|
if success: |
|
|
return { |
|
|
"status": "streaming", |
|
|
"stream_url": f"/api/v1/stream/live/{stream_config.output_path}", |
|
|
"message": "Streaming started" |
|
|
} |
|
|
else: |
|
|
raise HTTPException(500, "Failed to start streaming") |
|
|
|
|
|
|
|
|
@app.get("/api/v1/stream/stop") |
|
|
async def stop_stream(current_user: str = Depends(verify_token)): |
|
|
"""Stop streaming session.""" |
|
|
app.state.video_processor.stop_stream_processing() |
|
|
return {"status": "stopped", "message": "Streaming stopped"} |
|
|
|
|
|
|
|
|
@app.get("/api/v1/stream/preview") |
|
|
async def get_stream_preview(current_user: str = Depends(verify_token)): |
|
|
"""Get stream preview frame.""" |
|
|
frame = app.state.video_processor.get_preview_frame() |
|
|
|
|
|
if frame is not None: |
|
|
|
|
|
_, buffer = cv2.imencode('.jpg', frame) |
|
|
return StreamingResponse( |
|
|
io.BytesIO(buffer), |
|
|
media_type="image/jpeg" |
|
|
) |
|
|
else: |
|
|
raise HTTPException(404, "No preview available") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/api/v1/job/{job_id}", response_model=ProcessingResponse) |
|
|
async def get_job_status( |
|
|
job_id: str, |
|
|
current_user: str = Depends(verify_token) |
|
|
): |
|
|
"""Get job status.""" |
|
|
job = app.state.job_manager.get_job(job_id) |
|
|
|
|
|
if job: |
|
|
return job |
|
|
else: |
|
|
raise HTTPException(404, "Job not found") |
|
|
|
|
|
|
|
|
@app.get("/api/v1/jobs") |
|
|
async def list_jobs( |
|
|
current_user: str = Depends(verify_token), |
|
|
limit: int = 10, |
|
|
offset: int = 0 |
|
|
): |
|
|
"""List recent jobs.""" |
|
|
jobs = list(app.state.job_manager.jobs.values()) |
|
|
return { |
|
|
"total": len(jobs), |
|
|
"jobs": jobs[offset:offset + limit] |
|
|
} |
|
|
|
|
|
|
|
|
@app.delete("/api/v1/job/{job_id}") |
|
|
async def cancel_job( |
|
|
job_id: str, |
|
|
current_user: str = Depends(verify_token) |
|
|
): |
|
|
"""Cancel a job.""" |
|
|
|
|
|
app.state.job_manager.update_job(job_id, status="cancelled") |
|
|
return {"message": "Job cancelled"} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/api/v1/download/{filename}") |
|
|
async def download_file( |
|
|
filename: str, |
|
|
current_user: str = Depends(verify_token) |
|
|
): |
|
|
"""Download processed file.""" |
|
|
file_path = Path(config.OUTPUT_DIR) / filename |
|
|
|
|
|
if file_path.exists(): |
|
|
return FileResponse( |
|
|
path=file_path, |
|
|
filename=filename, |
|
|
media_type='application/octet-stream' |
|
|
) |
|
|
else: |
|
|
raise HTTPException(404, "File not found") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from fastapi import WebSocket, WebSocketDisconnect |
|
|
|
|
|
@app.websocket("/ws/job/{job_id}") |
|
|
async def websocket_job_updates(websocket: WebSocket, job_id: str): |
|
|
"""WebSocket for real-time job updates.""" |
|
|
await websocket.accept() |
|
|
|
|
|
try: |
|
|
while True: |
|
|
|
|
|
job = app.state.job_manager.get_job(job_id) |
|
|
|
|
|
if job: |
|
|
await websocket.send_json(job.dict()) |
|
|
|
|
|
if job.status in ["completed", "failed", "cancelled"]: |
|
|
break |
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
logger.info(f"WebSocket disconnected for job {job_id}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
|
|
|
uvicorn.run( |
|
|
app, |
|
|
host=config.HOST, |
|
|
port=config.PORT, |
|
|
log_level="info" |
|
|
) |