visual-search-api2 / src /api /search.py
AdarshDRC's picture
Update src/api/search.py
0928262 verified
import asyncio
import time
import traceback
from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile, Depends
from src.core.config import DEFAULT_PINECONE_KEY, IDX_FACES, IDX_OBJECTS
from src.core.security import get_verified_keys
from src.services.db_client import (
merge_face_results, merge_object_results,
pinecone_pool, search_faces, search_objects,
)
from src.core.logging import log
from src.common.utils import face_ui_score, get_ip, is_default_key, to_list
router = APIRouter()
@router.post("/api/search")
async def search_database(
request: Request,
file: UploadFile = File(...),
detect_faces: bool = Form(True),
user_id: str = Form(""),
keys: dict = Depends(get_verified_keys)
):
ip = get_ip(request)
start = time.perf_counter()
mode = "guest" if is_default_key(keys["pinecone_key"], DEFAULT_PINECONE_KEY) else "personal"
log("INFO", "search.start", user_id=user_id or "anonymous", ip=ip, mode=mode,
filename=file.filename, detect_faces=detect_faces)
try:
file_bytes = await file.read()
ai_manager = request.app.state.ai
sem = request.app.state.ai_semaphore
async with sem:
vectors = await ai_manager.process_image_bytes_async(file_bytes, detect_faces=detect_faces)
inference_ms = round((time.perf_counter() - start) * 1000)
face_vectors = [v for v in vectors if v["type"] == "face"]
object_vectors = [v for v in vectors if v["type"] == "object"]
lanes_used = list({v["type"] for v in vectors})
log("INFO", "search.inference_done", user_id=user_id or "anonymous", ip=ip, mode=mode,
face_vecs=len(face_vectors), obj_vecs=len(object_vectors), inference_ms=inference_ms)
pc = pinecone_pool.get(keys["pinecone_key"])
idx_obj = pc.Index(IDX_OBJECTS)
idx_face = pc.Index(IDX_FACES)
if detect_faces and face_vectors:
return await _run_face_search(face_vectors, object_vectors, idx_face, idx_obj, start, user_id, ip, mode, lanes_used)
else:
return await _run_object_search(object_vectors, idx_obj, start, user_id, ip, mode, lanes_used)
except HTTPException:
raise
except Exception as e:
log("ERROR", "search.error", user_id=user_id or "anonymous", ip=ip, mode=mode,
error=str(e), traceback=traceback.format_exc()[-800:])
raise HTTPException(500, str(e))
async def _run_face_search(face_vectors, object_vectors, idx_face, idx_obj, start, user_id, ip, mode, lanes_used) -> dict:
async def _query_face(fv: dict) -> dict:
vec = to_list(fv["vector"])
det_score = fv.get("det_score", 1.0)
try:
image_map = await asyncio.to_thread(search_faces, idx_face, vec, det_score)
except Exception as e:
if "404" in str(e):
raise HTTPException(404, "Pinecone index not found. Go to Settings → Verify & Save.")
raise
return {
"query_face_idx": fv.get("face_idx", 0),
"query_face_crop": fv.get("face_crop", ""),
"query_bbox": fv.get("bbox", []),
"det_score": det_score,
"face_width_px": fv.get("face_width_px", 0),
"matches": sorted(
[
{
"url": url,
"score": face_ui_score(d["raw_score"]),
"raw_score": round(d["raw_score"], 4),
"face_crop": d["face_crop"],
"folder": d["folder"],
"caption": "👤 Verified Identity",
}
for url, d in image_map.items()
],
key=lambda x: x["score"], reverse=True,
)[:50],
}
async def _query_obj_single(ov: dict) -> list:
vec = to_list(ov["vector"])
try:
return await asyncio.to_thread(search_objects, idx_obj, vec)
except Exception as e:
if "404" in str(e):
raise HTTPException(404, "Pinecone index not found.")
raise
face_tasks = [_query_face(fv) for fv in face_vectors]
obj_tasks = [_query_obj_single(ov) for ov in object_vectors]
all_results = await asyncio.gather(*face_tasks, *obj_tasks)
raw_groups = list(all_results[:len(face_tasks)])
obj_nested = list(all_results[len(face_tasks):])
merged_face = merge_face_results(raw_groups)
merged_objects = merge_object_results(obj_nested)
face_groups = [g for g in raw_groups if g.get("matches")]
duration_ms = round((time.perf_counter() - start) * 1000)
log("INFO", "search.complete", user_id=user_id or "anonymous", ip=ip, mode=mode,
lanes=["face", "object"], face_groups=len(face_groups), face_results=len(merged_face),
object_results=len(merged_objects), duration_ms=duration_ms)
return {
"mode": "face",
"face_groups": face_groups,
"results": merged_face,
"object_results": merged_objects,
}
async def _run_object_search(object_vectors, idx_obj, start, user_id, ip, mode, lanes_used) -> dict:
if not object_vectors:
return {"mode": "object", "results": [], "face_groups": []}
async def _query_obj(ov: dict) -> list:
vec = to_list(ov["vector"])
try:
return await asyncio.to_thread(search_objects, idx_obj, vec)
except Exception as e:
if "404" in str(e):
raise HTTPException(404, "Pinecone index not found.")
raise
nested = await asyncio.gather(*[_query_obj(ov) for ov in object_vectors])
final = merge_object_results(nested)
duration_ms = round((time.perf_counter() - start) * 1000)
log("INFO", "search.complete", user_id=user_id or "anonymous", ip=ip, mode=mode,
lanes=lanes_used, results=len(final), duration_ms=duration_ms)
return {"mode": "object", "results": final, "face_groups": []}