Spaces:
Running
Running
| # Dependencies | |
| import uuid | |
| import shutil | |
| import signal | |
| import uvicorn | |
| import traceback | |
| from typing import List | |
| from typing import Dict | |
| from pathlib import Path | |
| from fastapi import File | |
| from typing import Optional | |
| from fastapi import Request | |
| from fastapi import FastAPI | |
| from fastapi import UploadFile | |
| from fastapi import HTTPException | |
| from utils.logger import get_logger | |
| from config.settings import settings | |
| from fastapi.responses import Response | |
| from config.schemas import APIResponse | |
| from config.schemas import AnalysisResult | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.responses import JSONResponse | |
| from utils.validators import ImageValidator | |
| from fastapi.staticfiles import StaticFiles | |
| from utils.helpers import generate_unique_id | |
| from reporter.csv_reporter import CSVReporter | |
| from reporter.pdf_reporter import PDFReporter | |
| from config.schemas import BatchAnalysisResult | |
| from reporter.json_reporter import JSONReporter | |
| from utils.image_processor import ImageProcessor | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from features.batch_processor import BatchProcessor | |
| from features.threshold_manager import ThresholdManager | |
| # Logging | |
| logger = get_logger(__name__) | |
| # FastAPI App Definition | |
| app = FastAPI(title = "ImageScreenAI", | |
| version = settings.VERSION, | |
| description = "First-pass AI image screening tool for bulk workflows", | |
| ) | |
| # Serve static assets (if any later) | |
| app.mount("/ui", StaticFiles(directory = "ui"), name = "ui") | |
| # CORS (UI + API) | |
| app.add_middleware(CORSMiddleware, | |
| allow_origins = ["*"], | |
| allow_credentials = True, | |
| allow_methods = ["*"], | |
| allow_headers = ["*"], | |
| ) | |
| # Runtime State | |
| SESSION_STORE: Dict[str, Dict] = {} | |
| # Component Initialization | |
| image_validator = ImageValidator() | |
| image_processor = ImageProcessor() | |
| threshold_manager = ThresholdManager() | |
| threshold_manager = threshold_manager | |
| batch_processor = BatchProcessor(threshold_manager = threshold_manager) | |
| json_reporter = JSONReporter() | |
| csv_reporter = CSVReporter() | |
| pdf_reporter = PDFReporter() | |
| UPLOAD_DIR = settings.UPLOAD_DIR | |
| CACHE_DIR = settings.CACHE_DIR | |
| REPORTS_DIR = settings.REPORTS_DIR | |
| for d in [UPLOAD_DIR, CACHE_DIR, REPORTS_DIR]: | |
| d.mkdir(parents = True, | |
| exist_ok = True, | |
| ) | |
| # Utility: Progress Callback | |
| def _progress_callback(batch_id: str): | |
| def callback(image_idx: int, total: int, filename: str): | |
| session = SESSION_STORE.get(batch_id) | |
| if (not session or (session.get("status") != "processing")): | |
| return | |
| session["progress"] = {"current" : image_idx, | |
| "total" : total, | |
| "filename" : filename, | |
| } | |
| return callback | |
| # Utility: Housekeeping | |
| def cleanup_temp_files(): | |
| try: | |
| for folder in [UPLOAD_DIR, CACHE_DIR]: | |
| for item in folder.iterdir(): | |
| if item.is_file(): | |
| item.unlink(missing_ok = True) | |
| logger.info("Temporary files cleaned") | |
| except Exception as e: | |
| logger.warning(f"Cleanup failed: {e}") | |
| def shutdown_handler(*_): | |
| logger.warning("Shutdown signal received — cleaning up") | |
| cleanup_temp_files() | |
| signal.signal(signal.SIGINT, shutdown_handler) | |
| signal.signal(signal.SIGTERM, shutdown_handler) | |
| # Error Handling | |
| async def global_exception_handler(request: Request, exc: Exception): | |
| logger.error(f"Unhandled error: {exc}") | |
| logger.debug(traceback.format_exc()) | |
| return JSONResponse(status_code = 500, | |
| content = APIResponse(success = False, | |
| message = "Internal server error", | |
| ).model_dump() | |
| ) | |
| # Home | |
| def serve_frontend(): | |
| index_path = Path("ui/index.html") | |
| if not index_path.exists(): | |
| raise HTTPException(status_code = 404, | |
| detail = "UI not found", | |
| ) | |
| return index_path.read_text(encoding = "utf-8") | |
| # Health Check | |
| def health(): | |
| return {"status" : "ok", | |
| "version" : settings.VERSION, | |
| } | |
| # Single Image Analysis | |
| async def analyze_single_image(file: UploadFile = File(...)): | |
| image_id = generate_unique_id() | |
| image_path = UPLOAD_DIR / f"{image_id}_{file.filename}" | |
| image_validator.validate_image(file_path = image_path, | |
| filename = file.filename, | |
| file_size = file.size, | |
| ) | |
| try: | |
| with open(image_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| image = image_processor.load_image(image_path) | |
| # image is a NumPy array → shape = (H, W, C) or (H, W) | |
| height, width = image.shape[:2] | |
| result: AnalysisResult = batch_processor.process_single(image = image_path, | |
| filename = file.filename, | |
| image_size = (width, height), | |
| ) | |
| return APIResponse(success = True, | |
| message = "Image analysis completed", | |
| data = result.model_dump(), | |
| ) | |
| finally: | |
| image_path.unlink(missing_ok = True) | |
| # Batch Image Analysis | |
| async def analyze_batch(files: List[UploadFile] = File(...)): | |
| if not files: | |
| raise HTTPException(status_code = 400, | |
| detail = "No files provided", | |
| ) | |
| batch_id = str(uuid.uuid4()) | |
| SESSION_STORE[batch_id] = {"status" : "processing", | |
| "progress" : {"current" : 0, | |
| "total" : len(files), | |
| }, | |
| } | |
| image_entries = list() | |
| try: | |
| for file in files: | |
| uid = generate_unique_id() | |
| path = UPLOAD_DIR / f"{uid}_{file.filename}" | |
| with open(path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| image = image_processor.load_image(path) | |
| height, width = image.shape[:2] | |
| image_validator.validate_image(file_path = path, | |
| filename = file.filename, | |
| file_size = file.size, | |
| ) | |
| image_entries.append({"path" : path, | |
| "filename" : file.filename, | |
| "size" : (width, height), | |
| }) | |
| batch_result: BatchAnalysisResult = batch_processor.process_batch(image_files = image_entries, | |
| on_progress = _progress_callback(batch_id), | |
| ) | |
| SESSION_STORE[batch_id] = {"status" : "completed", | |
| "progress" : SESSION_STORE[batch_id]["progress"], | |
| "result" : batch_result, | |
| } | |
| return APIResponse(success = True, | |
| message = "Batch analysis completed", | |
| data = {"batch_id" : batch_id, | |
| "result" : batch_result.model_dump(), | |
| }, | |
| ) | |
| except KeyboardInterrupt: | |
| SESSION_STORE[batch_id] = {"status" : "interrupted", | |
| "progress" : SESSION_STORE[batch_id]["progress"], | |
| } | |
| raise HTTPException(status_code = 499, | |
| detail = "Processing interrupted", | |
| ) | |
| except Exception as e: | |
| logger.error(f"Batch {batch_id} failed: {e}", exc_info = True) | |
| SESSION_STORE[batch_id] = {"status" : "failed", | |
| "error" : str(e), | |
| } | |
| raise HTTPException(status_code = 500, | |
| detail = "Batch processing failed", | |
| ) | |
| finally: | |
| for item in image_entries: | |
| Path(item["path"]).unlink(missing_ok = True) | |
| # Batch Progress | |
| def batch_progress(batch_id: str): | |
| session = SESSION_STORE.get(batch_id) | |
| if not session: | |
| raise HTTPException(status_code = 404, | |
| detail = "Batch not found", | |
| ) | |
| return session | |
| # Report Downloads | |
| def export_csv(batch_id: str): | |
| session = SESSION_STORE.get(batch_id) | |
| if (not session or ("result" not in session)): | |
| raise HTTPException(status_code = 404, | |
| detail = "Batch result not found", | |
| ) | |
| path = csv_reporter.export_batch_detailed(session["result"]) | |
| # Read the file and send it as a download | |
| with open(path, "rb") as f: | |
| content = f.read() | |
| # Clean up the file after sending | |
| path.unlink(missing_ok = True) | |
| return Response(content = content, | |
| media_type = "text/csv", | |
| headers = {"Content-Disposition" : f"attachment; filename=ai_screener_report_{batch_id}.csv", | |
| "Content-Type" : "text/csv" | |
| } | |
| ) | |
| def export_pdf(batch_id: str): | |
| session = SESSION_STORE.get(batch_id) | |
| if (not session or ("result" not in session)): | |
| raise HTTPException(status_code = 404, | |
| detail = "Batch result not found", | |
| ) | |
| path = pdf_reporter.export_batch(session["result"]) | |
| # Read the file and send it as a download | |
| with open(path, "rb") as f: | |
| content = f.read() | |
| # Clean up the file after sending | |
| path.unlink(missing_ok = True) | |
| return Response(content = content, | |
| media_type = "application/pdf", | |
| headers = {"Content-Disposition" : f"attachment; filename=ai_screener_report_{batch_id}.pdf", | |
| "Content-Type" : "application/pdf" | |
| } | |
| ) | |
| # ==================== MAIN ==================== | |
| if __name__ == "__main__": | |
| # Explicit startup log (forces log file creation) | |
| logger.info("Starting AI Image Screener API Server") | |
| uvicorn.run("app:app", | |
| host = settings.HOST, | |
| port = settings.PORT, | |
| reload = settings.DEBUG, | |
| log_level = settings.LOG_LEVEL.lower(), | |
| workers = 1 if settings.DEBUG else settings.WORKERS, | |
| ) |