Spaces:
Running
Running
| import asyncio | |
| import os | |
| import shutil | |
| import uuid | |
| import re | |
| import time | |
| import json | |
| import base64 | |
| import traceback | |
| import inflect | |
| from datetime import datetime, timezone | |
| from urllib.parse import urlparse | |
| from typing import List | |
| from contextlib import asynccontextmanager | |
| from collections import OrderedDict | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import cloudinary | |
| import cloudinary.uploader | |
| import cloudinary.api | |
| from pinecone import Pinecone, ServerlessSpec | |
| # ── loguru for pretty local console logs (optional dep) ────────── | |
| try: | |
| from loguru import logger as _loguru | |
| _loguru.remove() | |
| _loguru.add( | |
| lambda msg: print(msg, end=""), | |
| format="<green>{time:HH:mm:ss}</green> | <level>{level:<8}</level> | {message}", | |
| level="DEBUG", colorize=True, | |
| ) | |
| _log_fn = _loguru.log | |
| except ImportError: | |
| import logging as _logging | |
| _logging.basicConfig(level=_logging.INFO) | |
| _stdlib = _logging.getLogger("el") | |
| def _log_fn(level, msg): _stdlib.log(getattr(_logging, level, 20), msg) | |
| # ── Deferred imports ───────────────────────────────────────────── | |
| ai = None | |
| p = inflect.engine() | |
| MAX_CONCURRENT_INFERENCES = int(os.getenv("MAX_CONCURRENT_INFERENCES", "6")) | |
| _inference_sem: asyncio.Semaphore | |
| _pinecone_pool = OrderedDict() | |
| _POOL_MAX = 64 | |
| IDX_FACES = "enterprise-faces" | |
| IDX_OBJECTS = "enterprise-objects" | |
| # ════════════════════════════════════════════════════════════════ | |
| # GRAFANA LOKI — async, fire-and-forget, never crashes the API | |
| # HF Space Secrets needed: | |
| # LOKI_URL → https://logs-prod-006.grafana.net (no trailing slash) | |
| # LOKI_USERNAME → your Grafana Cloud numeric user ID | |
| # LOKI_PASSWORD → your Grafana Cloud API token (Logs:Write scope) | |
| # ════════════════════════════════════════════════════════════════ | |
| LOKI_URL = os.getenv("LOKI_URL", "") | |
| LOKI_USERNAME = os.getenv("LOKI_USERNAME", "") | |
| LOKI_PASSWORD = os.getenv("LOKI_PASSWORD", "") | |
| async def _loki_push(level: str, event: str, data: dict): | |
| """Fire-and-forget push to Grafana Loki. Silent on failure.""" | |
| if not (LOKI_URL and LOKI_USERNAME and LOKI_PASSWORD): | |
| return | |
| try: | |
| import aiohttp | |
| ts_ns = str(int(time.time() * 1e9)) | |
| line = json.dumps({"timestamp": datetime.now(timezone.utc).isoformat(), | |
| "level": level.upper(), "service": "enterprise-lens", | |
| "event": event, **data}, default=str) | |
| payload = {"streams": [{"stream": {"service": "enterprise-lens", | |
| "level": level.lower(), | |
| "event": event, | |
| "env": os.getenv("ENVIRONMENT", "production")}, | |
| "values": [[ts_ns, line]]}]} | |
| creds = base64.b64encode(f"{LOKI_USERNAME}:{LOKI_PASSWORD}".encode()).decode() | |
| headers = {"Content-Type": "application/json", "Authorization": f"Basic {creds}"} | |
| async with aiohttp.ClientSession() as s: | |
| async with s.post(f"{LOKI_URL}/loki/api/v1/push", | |
| json=payload, headers=headers, | |
| timeout=aiohttp.ClientTimeout(total=5)) as r: | |
| if r.status not in (200, 204): | |
| _log_fn("WARNING", f"Loki returned {r.status}") | |
| except Exception as exc: | |
| _log_fn("DEBUG", f"Loki push skipped: {exc}") | |
| def log(level: str, event: str, **data): | |
| """ | |
| Log to console + Grafana Loki (background task). | |
| Usage: log("INFO", "upload.complete", user_id="x", files=3, duration_ms=340) | |
| """ | |
| clean = {k: v for k, v in data.items()} | |
| _log_fn(level.upper(), f"[{event}] {json.dumps(clean, default=str)}") | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| asyncio.create_task(_loki_push(level, event, data)) | |
| except Exception: | |
| pass | |
| # ════════════════════════════════════════════════════════════════ | |
| # HELPERS | |
| # ════════════════════════════════════════════════════════════════ | |
| def get_ip(request: Request) -> str: | |
| xff = request.headers.get("X-Forwarded-For", "") | |
| return xff.split(",")[0].strip() if xff else getattr(request.client, "host", "unknown") | |
| def is_guest(key: str) -> bool: | |
| default = os.getenv("DEFAULT_PINECONE_KEY", "") | |
| return bool(default) and key.strip() == default.strip() | |
| def _get_pinecone(api_key: str) -> Pinecone: | |
| if api_key not in _pinecone_pool: | |
| if len(_pinecone_pool) >= _POOL_MAX: | |
| _pinecone_pool.popitem(last=False) | |
| _pinecone_pool[api_key] = Pinecone(api_key=api_key) | |
| _pinecone_pool.move_to_end(api_key) | |
| return _pinecone_pool[api_key] | |
| def _cld_upload(tmp_path, folder, creds): | |
| return cloudinary.uploader.upload(tmp_path, folder=folder, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| def _cld_ping(creds): | |
| return cloudinary.api.ping( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| def _cld_root_folders(creds): | |
| return cloudinary.api.root_folders( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| def get_cloudinary_creds(env_url: str) -> dict: | |
| if not env_url: return {} | |
| parsed = urlparse(env_url) | |
| return {"api_key": parsed.username, "api_secret": parsed.password, "cloud_name": parsed.hostname} | |
| def standardize_category_name(name: str) -> str: | |
| clean = re.sub(r'\s+', '_', name.strip().lower()) | |
| clean = re.sub(r'[^\w]', '', clean) | |
| return p.singular_noun(clean) or clean | |
| def sanitize_filename(filename: str) -> str: | |
| return re.sub(r'[^\w.\-]', '', re.sub(r'\s+', '_', filename)) | |
| DEFAULT_PC_KEY = os.getenv("DEFAULT_PINECONE_KEY", "") | |
| DEFAULT_CLD_URL = os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| def _is_default_key(key: str, default: str) -> bool: | |
| return bool(default) and key.strip() == default.strip() | |
| # ════════════════════════════════════════════════════════════════ | |
| # APP STARTUP / SHUTDOWN | |
| # ════════════════════════════════════════════════════════════════ | |
| async def lifespan(app: FastAPI): | |
| global ai, _inference_sem | |
| from src.models import AIModelManager | |
| log("INFO", "server.startup", message="Loading AI models...") | |
| loop = asyncio.get_event_loop() | |
| ai = await loop.run_in_executor(None, AIModelManager) | |
| _inference_sem = asyncio.Semaphore(MAX_CONCURRENT_INFERENCES) | |
| log("INFO", "server.ready", message="All models loaded. API ready.") | |
| yield | |
| log("INFO", "server.shutdown", message="API shutting down.") | |
| app = FastAPI(lifespan=lifespan) | |
| app.add_middleware(CORSMiddleware, | |
| allow_origins=["*"], allow_credentials=True, | |
| allow_methods=["*"], allow_headers=["*"]) | |
| os.makedirs("temp_uploads", exist_ok=True) | |
| # ════════════════════════════════════════════════════════════════ | |
| # FRONTEND EVENT LOG — React calls this for client-side events | |
| # Logs: page visits, tab switches, mode toggles, search/upload | |
| # initiated, settings changes, errors caught in UI | |
| # ════════════════════════════════════════════════════════════════ | |
| async def frontend_log( | |
| request: Request, | |
| event: str = Form(...), # e.g. "page.visit", "search.initiated" | |
| user_id: str = Form(""), | |
| page: str = Form(""), | |
| metadata: str = Form("{}"), # JSON string with extra context | |
| ): | |
| ip = get_ip(request) | |
| try: | |
| meta = json.loads(metadata) if metadata else {} | |
| except Exception: | |
| meta = {} | |
| log("INFO", f"frontend.{event}", | |
| user_id = user_id or "anonymous", | |
| page = page, | |
| ip = ip, | |
| ua = request.headers.get("User-Agent", "")[:120], | |
| **meta, | |
| ) | |
| return {"ok": True} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 1. VERIFY KEYS & AUTO-BUILD INDEXES | |
| # ════════════════════════════════════════════════════════════════ | |
| async def verify_keys( | |
| request: Request, | |
| pinecone_key: str = Form(""), | |
| cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| mode = "guest" if is_guest(pinecone_key) else "personal" | |
| start = time.perf_counter() | |
| log("INFO", "settings.verify_keys.start", | |
| user_id=user_id or "anonymous", mode=mode, ip=ip) | |
| if cloudinary_url: | |
| try: | |
| creds_v = get_cloudinary_creds(cloudinary_url) | |
| if not creds_v.get("cloud_name"): raise ValueError("bad url") | |
| await asyncio.to_thread(_cld_ping, creds_v) | |
| except HTTPException: raise | |
| except Exception as e: | |
| log("ERROR", "settings.verify_keys.cloudinary_fail", | |
| user_id=user_id or "anonymous", ip=ip, error=str(e), | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| raise HTTPException(400, "Invalid Cloudinary Environment URL.") | |
| indexes_created = [] | |
| if pinecone_key: | |
| try: | |
| pc = _get_pinecone(pinecone_key) | |
| existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)} | |
| tasks = [] | |
| if IDX_OBJECTS not in existing: | |
| tasks.append(asyncio.to_thread(pc.create_index, name=IDX_OBJECTS, dimension=1536, | |
| metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"))) | |
| indexes_created.append(IDX_OBJECTS) | |
| if IDX_FACES not in existing: | |
| tasks.append(asyncio.to_thread(pc.create_index, name=IDX_FACES, dimension=512, | |
| metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"))) | |
| indexes_created.append(IDX_FACES) | |
| if tasks: await asyncio.gather(*tasks) | |
| except Exception as e: | |
| err = str(e) | |
| clean = ("Invalid Pinecone API Key. Please check your key and try again." | |
| if "401" in err or "unauthorized" in err.lower() | |
| else f"Pinecone Error: {err}") | |
| log("ERROR", "settings.verify_keys.pinecone_fail", | |
| user_id=user_id or "anonymous", ip=ip, error=clean, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| raise HTTPException(400, clean) | |
| log("INFO", "settings.verify_keys.success", | |
| user_id=user_id or "anonymous", mode=mode, ip=ip, | |
| indexes_created=indexes_created, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| return {"message": "Keys verified and indexes ready!"} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 2. UPLOAD | |
| # ════════════════════════════════════════════════════════════════ | |
| async def upload_new_images( | |
| request: Request, | |
| files: List[UploadFile] = File(...), | |
| folder_name: str = Form(...), | |
| detect_faces: bool = Form(True), | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| mode = "guest" if is_guest(actual_pc_key) else "personal" | |
| log("INFO", "upload.start", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| folder=folder_name, file_count=len(files), | |
| file_names=[f.filename for f in files][:10], | |
| detect_faces=detect_faces) | |
| if not actual_pc_key or not actual_cld_url: | |
| log("ERROR", "upload.missing_keys", user_id=user_id or "anonymous", ip=ip, mode=mode) | |
| raise HTTPException(400, "API Keys are missing. If you are a guest, the server is missing its DEFAULT_ secrets in Hugging Face.") | |
| folder = standardize_category_name(folder_name) | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): | |
| log("ERROR", "upload.bad_cloudinary_url", user_id=user_id or "anonymous", ip=ip) | |
| raise HTTPException(400, "Invalid Cloudinary URL format.") | |
| pc = _get_pinecone(actual_pc_key) | |
| idx_obj = pc.Index(IDX_OBJECTS) | |
| idx_face = pc.Index(IDX_FACES) | |
| uploaded_urls = [] | |
| face_vec_total = 0 | |
| object_vec_total = 0 | |
| for file in files: | |
| tmp_path = f"temp_uploads/{uuid.uuid4().hex}_{sanitize_filename(file.filename)}" | |
| file_start = time.perf_counter() | |
| try: | |
| with open(tmp_path, "wb") as buf: | |
| shutil.copyfileobj(file.file, buf) | |
| res = await asyncio.to_thread(_cld_upload, tmp_path, folder, creds) | |
| image_url = res["secure_url"] | |
| uploaded_urls.append(image_url) | |
| async with _inference_sem: | |
| vectors = await ai.process_image_async(tmp_path, is_query=False, detect_faces=detect_faces) | |
| face_upserts, object_upserts = [], [] | |
| for v in vectors: | |
| vec_list = v["vector"].tolist() if hasattr(v["vector"], "tolist") else v["vector"] | |
| record = {"id": str(uuid.uuid4()), "values": vec_list, | |
| "metadata": {"url": image_url, "folder": folder}} | |
| (face_upserts if v["type"] == "face" else object_upserts).append(record) | |
| face_vec_total += len(face_upserts) | |
| object_vec_total += len(object_upserts) | |
| upsert_tasks = [] | |
| if face_upserts: upsert_tasks.append(asyncio.to_thread(idx_face.upsert, vectors=face_upserts)) | |
| if object_upserts: upsert_tasks.append(asyncio.to_thread(idx_obj.upsert, vectors=object_upserts)) | |
| if upsert_tasks: await asyncio.gather(*upsert_tasks) | |
| log("INFO", "upload.file.success", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| filename=file.filename, folder=folder, image_url=image_url, | |
| face_vectors=len(face_upserts), obj_vectors=len(object_upserts), | |
| detect_faces=detect_faces, | |
| duration_ms=round((time.perf_counter()-file_start)*1000)) | |
| except Exception as e: | |
| log("ERROR", "upload.file.error", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| filename=file.filename, folder=folder, error=str(e), | |
| traceback=traceback.format_exc()[-800:], | |
| duration_ms=round((time.perf_counter()-file_start)*1000)) | |
| err = str(e) | |
| if "not found" in err.lower() or "404" in err: | |
| raise HTTPException(404, "Pinecone index not found. Please go to Settings and click 'Verify & Save' to recreate your indexes.") | |
| raise HTTPException(500, f"Upload processing failed: {err}") | |
| finally: | |
| if os.path.exists(tmp_path): os.remove(tmp_path) | |
| log("INFO", "upload.complete", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| folder=folder, files_uploaded=len(uploaded_urls), | |
| face_vectors=face_vec_total, object_vectors=object_vec_total, | |
| detect_faces=detect_faces, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| return {"message": "Done!", "urls": uploaded_urls} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 3. SEARCH | |
| # ════════════════════════════════════════════════════════════════ | |
| async def search_database( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| detect_faces: bool = Form(True), | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| mode = "guest" if is_guest(actual_pc_key) else "personal" | |
| log("INFO", "search.start", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| filename=file.filename, detect_faces=detect_faces) | |
| if not actual_pc_key: | |
| log("ERROR", "search.missing_keys", user_id=user_id or "anonymous", ip=ip, mode=mode) | |
| raise HTTPException(400, "Pinecone Key is missing.") | |
| tmp_path = f"temp_uploads/query_{uuid.uuid4().hex}_{sanitize_filename(file.filename)}" | |
| try: | |
| with open(tmp_path, "wb") as buf: | |
| shutil.copyfileobj(file.file, buf) | |
| async with _inference_sem: | |
| vectors = await ai.process_image_async(tmp_path, is_query=True, detect_faces=detect_faces) | |
| inference_ms = round((time.perf_counter() - start) * 1000) | |
| lanes_used = list({v["type"] for v in vectors}) | |
| log("INFO", "search.inference_done", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| vector_count=len(vectors), lanes=lanes_used, inference_ms=inference_ms) | |
| pc = _get_pinecone(actual_pc_key) | |
| idx_obj = pc.Index(IDX_OBJECTS) | |
| idx_face = pc.Index(IDX_FACES) | |
| async def _query_one(vec_dict: dict): | |
| vec_list = vec_dict["vector"].tolist() if hasattr(vec_dict["vector"], "tolist") else vec_dict["vector"] | |
| target_idx = idx_face if vec_dict["type"] == "face" else idx_obj | |
| try: | |
| res = await asyncio.to_thread(target_idx.query, vector=vec_list, top_k=10, include_metadata=True) | |
| except Exception as e: | |
| if "404" in str(e): | |
| raise HTTPException(404, "Pinecone Index not found. Please log in and click 'Verify Keys' in Settings.") | |
| raise e | |
| out = [] | |
| for match in res.get("matches", []): | |
| score = match["score"] | |
| is_face = vec_dict["type"] == "face" | |
| if is_face: | |
| if score < 0.35: continue | |
| ui_score = min(0.99, 0.75 + ((score - 0.35) / 0.65) * 0.24) | |
| else: | |
| if score < 0.45: continue | |
| ui_score = score | |
| out.append({"url": match["metadata"].get("url") or match["metadata"].get("image_url", ""), | |
| "score": round(ui_score, 4), | |
| "caption": "👤 Verified Identity" if is_face else match["metadata"].get("folder", "🎯 Object Match")}) | |
| return out | |
| nested = await asyncio.gather(*[_query_one(v) for v in vectors]) | |
| all_results = [r for sub in nested for r in sub] | |
| seen = {} | |
| for r in all_results: | |
| url = r["url"] | |
| if url not in seen or r["score"] > seen[url]["score"]: | |
| seen[url] = r | |
| final = sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:10] | |
| log("INFO", "search.complete", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| lanes=lanes_used, detect_faces=detect_faces, | |
| results_count=len(final), top_score=final[0]["score"] if final else 0, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| return {"results": final} | |
| except HTTPException: raise | |
| except Exception as e: | |
| log("ERROR", "search.error", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| error=str(e), traceback=traceback.format_exc()[-800:], | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| raise HTTPException(500, str(e)) | |
| finally: | |
| if os.path.exists(tmp_path): os.remove(tmp_path) | |
| # ════════════════════════════════════════════════════════════════ | |
| # 4. CATEGORIES | |
| # ════════════════════════════════════════════════════════════════ | |
| async def get_categories( | |
| request: Request, | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| actual_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| if not actual_url: return {"categories": []} | |
| try: | |
| creds = get_cloudinary_creds(actual_url) | |
| if not creds.get("cloud_name"): return {"categories": []} | |
| result = await asyncio.to_thread(_cld_root_folders, creds) | |
| categories = [f["name"] for f in result.get("folders", [])] | |
| log("INFO", "categories.fetched", | |
| user_id=user_id or "anonymous", ip=ip, category_count=len(categories)) | |
| return {"categories": categories} | |
| except Exception as e: | |
| log("ERROR", "categories.error", user_id=user_id or "anonymous", ip=ip, error=str(e)) | |
| return {"categories": []} | |
| async def health(): | |
| return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 5. LIST FOLDER IMAGES | |
| # ════════════════════════════════════════════════════════════════ | |
| def _cld_list_folder_images(folder: str, creds: dict, next_cursor: str = None): | |
| kwargs = dict(type="upload", prefix=f"{folder}/", max_results=500, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| if next_cursor: kwargs["next_cursor"] = next_cursor | |
| return cloudinary.api.resources(**kwargs) | |
| async def list_folder_images( | |
| request: Request, | |
| user_cloudinary_url: str = Form(""), | |
| folder_name: str = Form(...), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| actual_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| creds = get_cloudinary_creds(actual_url) | |
| if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.") | |
| images, next_cursor = [], None | |
| while True: | |
| result = await asyncio.to_thread(_cld_list_folder_images, folder_name, creds, next_cursor) | |
| for r in result.get("resources", []): | |
| images.append({"url": r["secure_url"], "public_id": r["public_id"]}) | |
| next_cursor = result.get("next_cursor") | |
| if not next_cursor: break | |
| log("INFO", "explorer.folder_opened", | |
| user_id=user_id or "anonymous", ip=ip, | |
| folder_name=folder_name, image_count=len(images)) | |
| return {"images": images, "count": len(images)} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 6. DELETE SINGLE IMAGE | |
| # ════════════════════════════════════════════════════════════════ | |
| def url_to_public_id(image_url: str, cloud_name: str) -> str: | |
| try: | |
| path = urlparse(image_url).path | |
| parts = path.split("/") | |
| upload_idx = parts.index("upload") | |
| after = parts[upload_idx + 1:] | |
| if after and after[0].startswith("v") and after[0][1:].isdigit(): after = after[1:] | |
| return "/".join(after).rsplit(".", 1)[0] | |
| except Exception: return "" | |
| def _cld_delete_resource(public_id: str, creds: dict): | |
| return cloudinary.uploader.destroy(public_id, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| async def delete_image( | |
| request: Request, | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| image_url: str = Form(""), | |
| public_id: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.") | |
| pid = public_id or url_to_public_id(image_url, creds["cloud_name"]) | |
| if not pid: raise HTTPException(400, "Could not determine public_id.") | |
| await asyncio.to_thread(_cld_delete_resource, pid, creds) | |
| if actual_pc_key and image_url: | |
| try: | |
| pc = _get_pinecone(actual_pc_key) | |
| for idx_name in [IDX_OBJECTS, IDX_FACES]: | |
| await asyncio.to_thread(pc.Index(idx_name).delete, filter={"url": {"$eq": image_url}}) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Pinecone delete warning: {e}") | |
| log("INFO", "explorer.image_deleted", | |
| user_id=user_id or "anonymous", ip=ip, | |
| image_url=image_url, public_id=pid) | |
| return {"message": "Image deleted successfully."} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 7. DELETE ENTIRE FOLDER | |
| # ════════════════════════════════════════════════════════════════ | |
| def _cld_delete_folder(folder: str, creds: dict): | |
| return cloudinary.api.delete_resources_by_prefix(f"{folder}/", | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| def _cld_remove_folder(folder: str, creds: dict): | |
| try: | |
| return cloudinary.api.delete_folder(folder, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| except Exception: pass | |
| async def delete_folder( | |
| request: Request, | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| folder_name: str = Form(...), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.") | |
| all_images, next_cursor = [], None | |
| while True: | |
| result = await asyncio.to_thread(_cld_list_folder_images, folder_name, creds, next_cursor) | |
| all_images.extend(result.get("resources", [])) | |
| next_cursor = result.get("next_cursor") | |
| if not next_cursor: break | |
| await asyncio.to_thread(_cld_delete_folder, folder_name, creds) | |
| await asyncio.to_thread(_cld_remove_folder, folder_name, creds) | |
| if actual_pc_key: | |
| try: | |
| pc = _get_pinecone(actual_pc_key) | |
| for idx_name in [IDX_OBJECTS, IDX_FACES]: | |
| idx = pc.Index(idx_name) | |
| try: | |
| await asyncio.to_thread(idx.delete, filter={"folder": {"$eq": folder_name}}) | |
| except Exception: | |
| for img in all_images: | |
| try: | |
| if img.get("secure_url"): | |
| await asyncio.to_thread(idx.delete, filter={"url": {"$eq": img["secure_url"]}}) | |
| except Exception: pass | |
| except Exception as e: | |
| _log_fn("WARNING", f"Pinecone folder delete warning: {e}") | |
| log("INFO", "explorer.folder_deleted", | |
| user_id=user_id or "anonymous", ip=ip, | |
| folder_name=folder_name, deleted_count=len(all_images)) | |
| return {"message": f"Folder '{folder_name}' and all its contents deleted.", "deleted_count": len(all_images)} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 8. RESET DATABASE ⚠️ DESTRUCTIVE — triple-logged | |
| # ════════════════════════════════════════════════════════════════ | |
| async def reset_database( | |
| request: Request, | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| log("WARNING", "danger.reset_database.attempt", | |
| user_id=user_id or "anonymous", ip=ip) | |
| if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL): | |
| log("WARNING", "danger.reset_database.blocked_shared_db", | |
| user_id=user_id or "anonymous", ip=ip) | |
| raise HTTPException(403, "Reset is not allowed on the shared demo database.") | |
| creds = get_cloudinary_creds(user_cloudinary_url) | |
| if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.") | |
| try: | |
| await asyncio.to_thread(lambda: cloudinary.api.delete_all_resources( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Cloudinary wipe: {e}") | |
| # Delete Cloudinary folders too | |
| try: | |
| folders_res = await asyncio.to_thread(_cld_root_folders, creds) | |
| for folder in folders_res.get("folders", []): | |
| await asyncio.to_thread(_cld_remove_folder, folder["name"], creds) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Cloudinary folder cleanup: {e}") | |
| try: | |
| pc = _get_pinecone(user_pinecone_key) | |
| existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)} | |
| tasks = [] | |
| if IDX_OBJECTS in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS)) | |
| if IDX_FACES in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES)) | |
| if tasks: await asyncio.gather(*tasks) | |
| await asyncio.sleep(2) | |
| await asyncio.gather( | |
| asyncio.to_thread(pc.create_index, name=IDX_OBJECTS, dimension=1536, metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1")), | |
| asyncio.to_thread(pc.create_index, name=IDX_FACES, dimension=512, metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1")), | |
| ) | |
| except Exception as e: | |
| log("ERROR", "danger.reset_database.pinecone_error", | |
| user_id=user_id or "anonymous", ip=ip, error=str(e)) | |
| raise HTTPException(500, f"Pinecone reset error: {e}") | |
| log("WARNING", "danger.reset_database.complete", | |
| user_id=user_id or "anonymous", ip=ip, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| return {"message": "Database reset complete. All data wiped and indexes recreated."} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 9. DELETE ACCOUNT ⚠️ DESTRUCTIVE — triple-logged | |
| # ════════════════════════════════════════════════════════════════ | |
| async def delete_account( | |
| request: Request, | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| log("WARNING", "danger.delete_account.attempt", | |
| user_id=user_id or "anonymous", ip=ip) | |
| if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL): | |
| log("WARNING", "danger.delete_account.blocked_shared_db", | |
| user_id=user_id or "anonymous", ip=ip) | |
| raise HTTPException(403, "Account deletion is not allowed on the shared demo database.") | |
| creds = get_cloudinary_creds(user_cloudinary_url) | |
| try: | |
| await asyncio.to_thread(lambda: cloudinary.api.delete_all_resources( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Account delete Cloudinary: {e}") | |
| # Delete Cloudinary folders | |
| try: | |
| folders_res = await asyncio.to_thread(_cld_root_folders, creds) | |
| for folder in folders_res.get("folders", []): | |
| await asyncio.to_thread(_cld_remove_folder, folder["name"], creds) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Account delete Cloudinary folders: {e}") | |
| try: | |
| pc = _get_pinecone(user_pinecone_key) | |
| existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)} | |
| tasks = [] | |
| if IDX_OBJECTS in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS)) | |
| if IDX_FACES in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES)) | |
| if tasks: await asyncio.gather(*tasks) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Account delete Pinecone: {e}") | |
| log("WARNING", "danger.delete_account.complete", | |
| user_id=user_id or "anonymous", ip=ip, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| return {"message": "Account data deleted. Sign out initiated."} |