Spaces:
Sleeping
Sleeping
| import asyncio | |
| import io | |
| import time | |
| import uuid | |
| from typing import List | |
| from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile, Depends | |
| from src.core.config import IDX_FACES, IDX_OBJECTS, MAX_FILES_PER_UPLOAD | |
| from src.core.security import get_verified_keys | |
| from src.services.db_client import cld_upload, pinecone_pool | |
| from src.core.logging import log | |
| from src.common.utils import get_ip, standardize_category_name, to_list | |
| router = APIRouter() | |
| def chunker(seq, size): | |
| return (seq[pos:pos + size] for pos in range(0, len(seq), size)) | |
| async def upload_images( | |
| request: Request, | |
| files: List[UploadFile] = File(...), | |
| folder_name: str = Form(...), | |
| detect_faces: bool = Form(True), | |
| user_id: str = Form(""), | |
| keys: dict = Depends(get_verified_keys) | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| if len(files) > MAX_FILES_PER_UPLOAD: | |
| raise HTTPException(400, f"Too many files. Max {MAX_FILES_PER_UPLOAD} per request.") | |
| folder = standardize_category_name(folder_name) | |
| pc = pinecone_pool.get(keys["pinecone_key"]) | |
| idx_obj = pc.Index(IDX_OBJECTS) | |
| idx_face = pc.Index(IDX_FACES) | |
| ai_manager = request.app.state.ai | |
| sem = request.app.state.ai_semaphore | |
| all_face_upserts: list[dict] = [] | |
| all_object_upserts: list[dict] = [] | |
| uploaded_urls: list[str] = [] | |
| async def _process_file(file: UploadFile) -> tuple[str, str, list]: | |
| file_bytes = await file.read() | |
| file_id = uuid.uuid4().hex | |
| async def _run_ai(): | |
| async with sem: | |
| return await ai_manager.process_image_bytes_async(file_bytes, detect_faces=detect_faces) | |
| cld_task = asyncio.to_thread(cld_upload, io.BytesIO(file_bytes), folder, keys["cloudinary_creds"]) | |
| ai_task = _run_ai() | |
| cld_res, vectors = await asyncio.gather(cld_task, ai_task) | |
| return file_id, cld_res["secure_url"], vectors | |
| results = await asyncio.gather(*[_process_file(f) for f in files]) | |
| for file_id, image_url, vectors in results: | |
| uploaded_urls.append(image_url) | |
| for i, v in enumerate(vectors): | |
| vector_id = f"{file_id}_{i}" | |
| if v["type"] == "face": | |
| all_face_upserts.append({ | |
| "id": vector_id, | |
| "values": to_list(v["vector"]), | |
| "metadata": { | |
| "url": image_url, | |
| "folder": folder, | |
| "face_crop": v.get("face_crop", ""), | |
| "det_score": float(v.get("det_score", 1.0)), | |
| "face_width_px": int(v.get("face_width_px", 0)), | |
| }, | |
| }) | |
| else: | |
| all_object_upserts.append({ | |
| "id": vector_id, | |
| "values": to_list(v["vector"]), | |
| "metadata": {"url": image_url, "folder": folder}, | |
| }) | |
| db_tasks = [] | |
| def batched_upsert(index, vectors): | |
| for batch in chunker(vectors, 200): | |
| index.upsert(vectors=batch) | |
| if all_face_upserts: | |
| db_tasks.append(asyncio.to_thread(batched_upsert, idx_face, all_face_upserts)) | |
| if all_object_upserts: | |
| db_tasks.append(asyncio.to_thread(batched_upsert, idx_obj, all_object_upserts)) | |
| if db_tasks: | |
| try: | |
| await asyncio.gather(*db_tasks) | |
| except Exception as e: | |
| raise HTTPException(500, f"Database insertion failed: {e}") | |
| duration_ms = round((time.perf_counter() - start) * 1000) | |
| log("INFO", "upload.complete", user_id=user_id or "anonymous", ip=ip, | |
| files=len(files), folder=folder, duration_ms=duration_ms) | |
| return { | |
| "message": "Done!", | |
| "urls": uploaded_urls, | |
| "summary": { | |
| "files": len(files), | |
| "face_vectors": len(all_face_upserts), | |
| "object_vectors": len(all_object_upserts), | |
| }, | |
| } |