Spaces:
Running
Running
| import asyncio | |
| import hashlib | |
| import time | |
| import traceback | |
| from typing import Optional | |
| from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile, Depends | |
| from src.core.config import ( | |
| DEFAULT_PINECONE_KEY, IDX_FACES, IDX_OBJECTS, | |
| IDX_FACES_ARCFACE, IDX_FACES_ADAFACE, | |
| USE_SPLIT_FACE_INDEXES, USE_CLUSTER_AWARE_SEARCH, | |
| ) | |
| from src.core.security import get_verified_keys | |
| from src.services.db_client import ( | |
| merge_face_results, merge_object_results, | |
| pinecone_pool, search_faces, search_faces_split, search_objects, | |
| ensure_indexes, | |
| ) | |
| from src.core.logging import log | |
| from src.common.utils import face_ui_score, get_ip, is_default_key, to_list | |
| router = APIRouter() | |
| async def search_database( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| detect_faces: bool = Form(True), | |
| user_id: str = Form(""), | |
| keys: dict = Depends(get_verified_keys), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| mode = "guest" if is_default_key(keys["pinecone_key"], DEFAULT_PINECONE_KEY) else "personal" | |
| log("INFO", "search.start", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| filename=file.filename, detect_faces=detect_faces) | |
| try: | |
| file_bytes = await file.read() | |
| ai_manager = request.app.state.ai | |
| sem = request.app.state.ai_semaphore | |
| # Run query inference | |
| async with sem: | |
| vectors = await ai_manager.process_image_bytes_async( | |
| file_bytes, detect_faces=detect_faces | |
| ) | |
| inference_ms = round((time.perf_counter() - start) * 1000) | |
| face_vectors = [v for v in vectors if v["type"] == "face"] | |
| object_vectors = [v for v in vectors if v["type"] == "object"] | |
| log("INFO", "search.inference_done", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| face_vecs=len(face_vectors), obj_vecs=len(object_vectors), | |
| inference_ms=inference_ms) | |
| pc = pinecone_pool.get(keys["pinecone_key"]) | |
| # Stable opaque user identity derived from the Pinecone key — matches | |
| # what clustering.py writes to Supabase so cluster lookups work. | |
| cluster_uid = hashlib.sha256(keys["pinecone_key"].encode()).hexdigest()[:16] | |
| # Auto-create indexes if missing. Self-heals the case where user | |
| # hasn't triggered verify-keys yet. | |
| try: | |
| created = await asyncio.to_thread(ensure_indexes, pc) | |
| if created: | |
| log("INFO", "search.indexes_auto_created", | |
| user_id=user_id or "anonymous", ip=ip, created=created) | |
| await asyncio.sleep(8) | |
| except Exception as e: | |
| log("ERROR", "search.ensure_indexes_failed", | |
| user_id=user_id or "anonymous", ip=ip, error=str(e)) | |
| idx_obj = pc.Index(IDX_OBJECTS) | |
| if USE_SPLIT_FACE_INDEXES: | |
| idx_arcface = pc.Index(IDX_FACES_ARCFACE) | |
| idx_adaface = pc.Index(IDX_FACES_ADAFACE) | |
| idx_face_legacy = None | |
| else: | |
| idx_face_legacy = pc.Index(IDX_FACES) | |
| idx_arcface = None | |
| idx_adaface = None | |
| if detect_faces and face_vectors: | |
| return await _run_face_search( | |
| face_vectors, object_vectors, | |
| idx_arcface, idx_adaface, idx_face_legacy, idx_obj, | |
| start, user_id, ip, mode, | |
| pc=pc, cluster_uid=cluster_uid, | |
| ) | |
| return await _run_object_search( | |
| object_vectors, idx_obj, start, user_id, ip, mode | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| log("ERROR", "search.error", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| error=str(e), traceback=traceback.format_exc()[-800:]) | |
| raise HTTPException(500, str(e)) | |
| async def _query_face_split(fv, idx_arcface, idx_adaface, pc=None, cluster_uid=None): | |
| """Parallel query to ArcFace + AdaFace indexes, then fuse. | |
| When USE_CLUSTER_AWARE_SEARCH is on, expands results to include every | |
| image in the matched person clusters for near-100% recall.""" | |
| arcface_vec = to_list(fv["arcface_vector"]) | |
| adaface_vec = to_list(fv.get("adaface_vector")) if fv.get("has_adaface") else None | |
| try: | |
| image_map = await asyncio.to_thread( | |
| search_faces_split, | |
| idx_arcface, idx_adaface, | |
| arcface_vec, adaface_vec, | |
| ) | |
| except Exception as e: | |
| if "404" in str(e): | |
| raise HTTPException( | |
| 404, | |
| "Face indexes not found. Go to Settings → Verify & Save to create them." | |
| ) | |
| raise | |
| # Expand clusters for matches with fused_score >= 0.35 (more inclusive). | |
| # Most same-person matches score above 0.35; this ensures complete photo galleries. | |
| # Lowered from 0.50 to catch borderline cases while still rejecting imposters. | |
| CLUSTER_EXPAND_MIN_SCORE = 0.35 | |
| high_confidence = { | |
| url: d for url, d in image_map.items() | |
| if d.get("fused_score", 0.0) >= CLUSTER_EXPAND_MIN_SCORE | |
| } | |
| if USE_CLUSTER_AWARE_SEARCH and high_confidence and pc is not None and cluster_uid: | |
| from src.services.clustering import search_cluster_aware | |
| image_map = await search_cluster_aware(pc, high_confidence, cluster_uid) | |
| return _format_face_group(fv, image_map, scoring="fused") | |
| async def _query_face_legacy(fv, idx_face): | |
| """Legacy single-index query for pre-Phase-2 data.""" | |
| vec = to_list(fv["vector"]) | |
| det_score = fv.get("det_score", 1.0) | |
| try: | |
| image_map = await asyncio.to_thread(search_faces, idx_face, vec, det_score) | |
| except Exception as e: | |
| if "404" in str(e): | |
| raise HTTPException(404, "Pinecone index not found.") | |
| raise | |
| return _format_face_group(fv, image_map, scoring="legacy") | |
| def _format_face_group(fv, image_map, scoring: str): | |
| """Shape the response the same way regardless of scoring backend.""" | |
| matches = [] | |
| for url, d in image_map.items(): | |
| if scoring == "fused": | |
| display_score = face_ui_score(d["fused_score"], mode="fused") | |
| raw_score = round(d["fused_score"], 4) | |
| else: | |
| display_score = face_ui_score(d["raw_score"], mode="legacy") | |
| raw_score = round(d["raw_score"], 4) | |
| matches.append({ | |
| "url": url, | |
| "score": display_score, | |
| "raw_score": raw_score, | |
| "arcface_score": round(d.get("arcface_score", 0), 4), | |
| "adaface_score": round(d.get("adaface_score", 0), 4), | |
| "face_crop": d["face_crop"], | |
| "folder": d["folder"], | |
| "caption": "👤 Verified Identity", | |
| }) | |
| matches.sort(key=lambda x: x["score"], reverse=True) | |
| return { | |
| "query_face_idx": fv.get("face_idx", 0), | |
| "query_face_crop": fv.get("face_crop", ""), | |
| "query_bbox": fv.get("bbox", []), | |
| "det_score": fv.get("det_score", 1.0), | |
| "face_width_px": fv.get("face_width_px", 0), | |
| "matches": matches, | |
| } | |
| async def _run_face_search( | |
| face_vectors, object_vectors, | |
| idx_arcface, idx_adaface, idx_face_legacy, idx_obj, | |
| start, user_id, ip, mode, | |
| pc=None, cluster_uid=None, | |
| ) -> dict: | |
| # Build face query tasks | |
| if USE_SPLIT_FACE_INDEXES: | |
| face_tasks = [ | |
| _query_face_split(fv, idx_arcface, idx_adaface, pc=pc, cluster_uid=cluster_uid) | |
| for fv in face_vectors | |
| ] | |
| else: | |
| face_tasks = [_query_face_legacy(fv, idx_face_legacy) for fv in face_vectors] | |
| # Object queries run in parallel with face queries | |
| async def _query_obj_single(ov): | |
| vec = to_list(ov["vector"]) | |
| try: | |
| return await asyncio.to_thread(search_objects, idx_obj, vec) | |
| except Exception as e: | |
| if "404" in str(e): | |
| raise HTTPException(404, "Pinecone index not found.") | |
| raise | |
| obj_tasks = [_query_obj_single(ov) for ov in object_vectors] | |
| all_results = await asyncio.gather(*face_tasks, *obj_tasks) | |
| raw_groups = list(all_results[:len(face_tasks)]) | |
| obj_nested = list(all_results[len(face_tasks):]) | |
| merged_face = merge_face_results(raw_groups) | |
| merged_objects = merge_object_results(obj_nested) | |
| face_groups = [g for g in raw_groups if g.get("matches")] | |
| duration_ms = round((time.perf_counter() - start) * 1000) | |
| log("INFO", "search.complete", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| lanes=["face", "object"], | |
| face_groups=len(face_groups), | |
| face_results=len(merged_face), | |
| object_results=len(merged_objects), | |
| duration_ms=duration_ms, | |
| index_mode="split" if USE_SPLIT_FACE_INDEXES else "legacy") | |
| return { | |
| "mode": "face", | |
| "face_groups": face_groups, | |
| "results": merged_face, | |
| "object_results": merged_objects, | |
| } | |
| async def _run_object_search(object_vectors, idx_obj, start, user_id, ip, mode) -> dict: | |
| if not object_vectors: | |
| return {"mode": "object", "results": [], "face_groups": []} | |
| async def _query_obj(ov): | |
| vec = to_list(ov["vector"]) | |
| try: | |
| return await asyncio.to_thread(search_objects, idx_obj, vec) | |
| except Exception as e: | |
| if "404" in str(e): | |
| raise HTTPException(404, "Pinecone index not found.") | |
| raise | |
| nested = await asyncio.gather(*[_query_obj(ov) for ov in object_vectors]) | |
| final = merge_object_results(nested) | |
| duration_ms = round((time.perf_counter() - start) * 1000) | |
| log("INFO", "search.complete", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| lanes=["object"], results=len(final), duration_ms=duration_ms) | |
| return {"mode": "object", "results": final, "face_groups": []} | |
| async def search_by_face( | |
| request: Request, | |
| front: UploadFile = File(...), | |
| left: Optional[UploadFile] = File(None), | |
| right: Optional[UploadFile] = File(None), | |
| user_id: str = Form(""), | |
| keys: dict = Depends(get_verified_keys), | |
| ): | |
| """ | |
| Multi-angle face search: accepts 1-3 face images, fuses embeddings server-side, | |
| performs single Pinecone query. 3x faster + lower quota usage vs 3 sequential queries. | |
| """ | |
| import numpy as np | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| mode = "guest" if is_default_key(keys["pinecone_key"], DEFAULT_PINECONE_KEY) else "personal" | |
| log("INFO", "search.search_by_face.start", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode) | |
| try: | |
| ai_manager = request.app.state.ai | |
| sem = request.app.state.ai_semaphore | |
| log("DEBUG", "search.search_by_face.received_files", | |
| user_id=user_id or "anonymous", ip=ip, | |
| front=bool(front), left=bool(left), right=bool(right)) | |
| # Read all image bytes in parallel | |
| images = {} | |
| for name, file in [("front", front), ("left", left), ("right", right)]: | |
| if file: | |
| file_bytes = await file.read() | |
| images[name] = file_bytes | |
| log("DEBUG", "search.search_by_face.file_read", | |
| user_id=user_id or "anonymous", ip=ip, | |
| angle=name, size_bytes=len(file_bytes)) | |
| if not images: | |
| log("ERROR", "search.search_by_face.no_images", | |
| user_id=user_id or "anonymous", ip=ip) | |
| raise HTTPException(400, "At least front image required") | |
| # Process all images in parallel | |
| async def process_img(name, data): | |
| async with sem: | |
| return name, await ai_manager.process_image_bytes_async( | |
| data, detect_faces=True | |
| ) | |
| results = await asyncio.gather( | |
| *[process_img(name, data) for name, data in images.items()], | |
| return_exceptions=True | |
| ) | |
| # Extract face vectors from successful results | |
| face_vectors_by_angle = {} | |
| for result in results: | |
| if isinstance(result, Exception): | |
| log("WARNING", "search.search_by_face.process_error", | |
| user_id=user_id or "anonymous", ip=ip, | |
| error=str(result), traceback=traceback.format_exc()[-500:]) | |
| continue | |
| name, vectors = result | |
| face_vecs = [v for v in vectors if v["type"] == "face"] | |
| if face_vecs: | |
| face_vectors_by_angle[name] = face_vecs[0] | |
| log("DEBUG", "search.search_by_face.face_detected", | |
| user_id=user_id or "anonymous", ip=ip, | |
| angle=name, det_score=face_vecs[0].get("det_score", 0)) | |
| else: | |
| log("WARNING", "search.search_by_face.no_face_in_angle", | |
| user_id=user_id or "anonymous", ip=ip, | |
| angle=name, vectors_count=len(vectors) if vectors else 0) | |
| if not face_vectors_by_angle: | |
| log("ERROR", "search.search_by_face.no_faces_detected", | |
| user_id=user_id or "anonymous", ip=ip) | |
| raise HTTPException(400, "No face detected in provided images") | |
| # Get front face crop for results display (use if available, fallback to any angle) | |
| front_face_crop = ( | |
| face_vectors_by_angle.get("front", {}).get("face_crop", "") or | |
| next((v.get("face_crop", "") for v in face_vectors_by_angle.values() if v.get("face_crop")), "") | |
| ) | |
| # Fuse embeddings: front weighted higher | |
| weights = {"front": 0.5, "left": 0.25, "right": 0.25} | |
| arcface_vectors = [] | |
| adaface_vectors = [] | |
| det_scores = [] | |
| for angle, vec in face_vectors_by_angle.items(): | |
| w = weights.get(angle, 0) | |
| if w > 0: | |
| arcface_vectors.append(np.array(to_list(vec["arcface_vector"])) * w) | |
| det_scores.append(vec.get("det_score", 1.0)) | |
| if vec.get("has_adaface") and vec.get("adaface_vector") is not None: | |
| adaface_vectors.append(np.array(to_list(vec["adaface_vector"])) * w) | |
| if not arcface_vectors: | |
| raise HTTPException(400, "Could not fuse face embeddings") | |
| # Fuse and normalize | |
| fused_arcface = np.sum(arcface_vectors, axis=0) | |
| fused_arcface = fused_arcface / (np.linalg.norm(fused_arcface) + 1e-7) | |
| fused_adaface = None | |
| has_adaface = False | |
| if adaface_vectors and len(adaface_vectors) > 0: | |
| fused_adaface = np.sum(adaface_vectors, axis=0) | |
| fused_adaface = fused_adaface / (np.linalg.norm(fused_adaface) + 1e-7) | |
| has_adaface = True | |
| # Build synthetic face vector dict for query (include front face crop for UI display) | |
| fv = { | |
| "face_idx": 0, | |
| "det_score": float(np.mean(det_scores)), | |
| "arcface_vector": fused_arcface.tolist(), | |
| "has_adaface": has_adaface, | |
| "adaface_vector": fused_adaface.tolist() if has_adaface else None, | |
| "bbox": [0, 0, 0, 0], | |
| "face_width_px": 0, | |
| "face_crop": front_face_crop, | |
| } | |
| inference_ms = round((time.perf_counter() - start) * 1000) | |
| log("INFO", "search.search_by_face.fused", | |
| user_id=user_id or "anonymous", ip=ip, | |
| angles=list(face_vectors_by_angle.keys()), | |
| inference_ms=inference_ms) | |
| pc = pinecone_pool.get(keys["pinecone_key"]) | |
| cluster_uid = hashlib.sha256(keys["pinecone_key"].encode()).hexdigest()[:16] | |
| # Ensure indexes exist | |
| try: | |
| created = await asyncio.to_thread(ensure_indexes, pc) | |
| if created: | |
| log("INFO", "search.indexes_auto_created", | |
| user_id=user_id or "anonymous", ip=ip, created=created) | |
| await asyncio.sleep(8) | |
| except Exception as e: | |
| log("ERROR", "search.ensure_indexes_failed", | |
| user_id=user_id or "anonymous", ip=ip, error=str(e)) | |
| # Setup indexes | |
| if USE_SPLIT_FACE_INDEXES: | |
| idx_arcface = pc.Index(IDX_FACES_ARCFACE) | |
| idx_adaface = pc.Index(IDX_FACES_ADAFACE) | |
| idx_face_legacy = None | |
| else: | |
| idx_face_legacy = pc.Index(IDX_FACES) | |
| idx_arcface = None | |
| idx_adaface = None | |
| # Query with fused vector | |
| if USE_SPLIT_FACE_INDEXES: | |
| face_group = await _query_face_split(fv, idx_arcface, idx_adaface, pc=pc, cluster_uid=cluster_uid) | |
| else: | |
| face_group = await _query_face_legacy(fv, idx_face_legacy) | |
| duration_ms = round((time.perf_counter() - start) * 1000) | |
| log("INFO", "search.search_by_face.complete", | |
| user_id=user_id or "anonymous", ip=ip, | |
| results=len(face_group.get("matches", [])), | |
| duration_ms=duration_ms) | |
| return { | |
| "mode": "face", | |
| "face_groups": [face_group] if face_group.get("matches") else [], | |
| "results": [], | |
| "object_results": [], | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| log("ERROR", "search.search_by_face.error", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| error=str(e), traceback=traceback.format_exc()[-800:]) | |
| raise HTTPException(500, str(e)) |