AdarshDRC's picture
Update main.py
6fc43b2 verified
import asyncio
import os
import shutil
import uuid
import re
import time
import json
import base64
import traceback
import inflect
from datetime import datetime, timezone
from urllib.parse import urlparse
from typing import List
from contextlib import asynccontextmanager
from collections import OrderedDict
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
import cloudinary
import cloudinary.uploader
import cloudinary.api
from pinecone import Pinecone, ServerlessSpec
# ── loguru for pretty local console logs (optional dep) ──────────
try:
from loguru import logger as _loguru
_loguru.remove()
_loguru.add(
lambda msg: print(msg, end=""),
format="<green>{time:HH:mm:ss}</green> | <level>{level:<8}</level> | {message}",
level="DEBUG", colorize=True,
)
_log_fn = _loguru.log
except ImportError:
import logging as _logging
_logging.basicConfig(level=_logging.INFO)
_stdlib = _logging.getLogger("el")
def _log_fn(level, msg): _stdlib.log(getattr(_logging, level, 20), msg)
# ── Deferred imports ─────────────────────────────────────────────
ai = None
p = inflect.engine()
MAX_CONCURRENT_INFERENCES = int(os.getenv("MAX_CONCURRENT_INFERENCES", "6"))
_inference_sem: asyncio.Semaphore
_pinecone_pool = OrderedDict()
_POOL_MAX = 64
IDX_FACES = "enterprise-faces"
IDX_OBJECTS = "enterprise-objects"
# ════════════════════════════════════════════════════════════════
# GRAFANA LOKI — async, fire-and-forget, never crashes the API
# HF Space Secrets needed:
# LOKI_URL → https://logs-prod-006.grafana.net (no trailing slash)
# LOKI_USERNAME → your Grafana Cloud numeric user ID
# LOKI_PASSWORD → your Grafana Cloud API token (Logs:Write scope)
# ════════════════════════════════════════════════════════════════
LOKI_URL = os.getenv("LOKI_URL", "")
LOKI_USERNAME = os.getenv("LOKI_USERNAME", "")
LOKI_PASSWORD = os.getenv("LOKI_PASSWORD", "")
async def _loki_push(level: str, event: str, data: dict):
"""Fire-and-forget push to Grafana Loki. Silent on failure."""
if not (LOKI_URL and LOKI_USERNAME and LOKI_PASSWORD):
return
try:
import aiohttp
ts_ns = str(int(time.time() * 1e9))
line = json.dumps({"timestamp": datetime.now(timezone.utc).isoformat(),
"level": level.upper(), "service": "enterprise-lens",
"event": event, **data}, default=str)
payload = {"streams": [{"stream": {"service": "enterprise-lens",
"level": level.lower(),
"event": event,
"env": os.getenv("ENVIRONMENT", "production")},
"values": [[ts_ns, line]]}]}
creds = base64.b64encode(f"{LOKI_USERNAME}:{LOKI_PASSWORD}".encode()).decode()
headers = {"Content-Type": "application/json", "Authorization": f"Basic {creds}"}
async with aiohttp.ClientSession() as s:
async with s.post(f"{LOKI_URL}/loki/api/v1/push",
json=payload, headers=headers,
timeout=aiohttp.ClientTimeout(total=5)) as r:
if r.status not in (200, 204):
_log_fn("WARNING", f"Loki returned {r.status}")
except Exception as exc:
_log_fn("DEBUG", f"Loki push skipped: {exc}")
def log(level: str, event: str, **data):
"""
Log to console + Grafana Loki (background task).
Usage: log("INFO", "upload.complete", user_id="x", files=3, duration_ms=340)
"""
clean = {k: v for k, v in data.items()}
_log_fn(level.upper(), f"[{event}] {json.dumps(clean, default=str)}")
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(_loki_push(level, event, data))
except Exception:
pass
# ════════════════════════════════════════════════════════════════
# HELPERS
# ════════════════════════════════════════════════════════════════
def get_ip(request: Request) -> str:
xff = request.headers.get("X-Forwarded-For", "")
return xff.split(",")[0].strip() if xff else getattr(request.client, "host", "unknown")
def is_guest(key: str) -> bool:
default = os.getenv("DEFAULT_PINECONE_KEY", "")
return bool(default) and key.strip() == default.strip()
def _get_pinecone(api_key: str) -> Pinecone:
if api_key not in _pinecone_pool:
if len(_pinecone_pool) >= _POOL_MAX:
_pinecone_pool.popitem(last=False)
_pinecone_pool[api_key] = Pinecone(api_key=api_key)
_pinecone_pool.move_to_end(api_key)
return _pinecone_pool[api_key]
def _cld_upload(tmp_path, folder, creds):
return cloudinary.uploader.upload(tmp_path, folder=folder,
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
def _cld_ping(creds):
return cloudinary.api.ping(
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
def _cld_root_folders(creds):
return cloudinary.api.root_folders(
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
def get_cloudinary_creds(env_url: str) -> dict:
if not env_url: return {}
parsed = urlparse(env_url)
return {"api_key": parsed.username, "api_secret": parsed.password, "cloud_name": parsed.hostname}
def standardize_category_name(name: str) -> str:
clean = re.sub(r'\s+', '_', name.strip().lower())
clean = re.sub(r'[^\w]', '', clean)
return p.singular_noun(clean) or clean
def sanitize_filename(filename: str) -> str:
return re.sub(r'[^\w.\-]', '', re.sub(r'\s+', '_', filename))
DEFAULT_PC_KEY = os.getenv("DEFAULT_PINECONE_KEY", "")
DEFAULT_CLD_URL = os.getenv("DEFAULT_CLOUDINARY_URL", "")
def _is_default_key(key: str, default: str) -> bool:
return bool(default) and key.strip() == default.strip()
# ════════════════════════════════════════════════════════════════
# APP STARTUP / SHUTDOWN
# ════════════════════════════════════════════════════════════════
@asynccontextmanager
async def lifespan(app: FastAPI):
global ai, _inference_sem
from src.models import AIModelManager
log("INFO", "server.startup", message="Loading AI models...")
loop = asyncio.get_event_loop()
ai = await loop.run_in_executor(None, AIModelManager)
_inference_sem = asyncio.Semaphore(MAX_CONCURRENT_INFERENCES)
log("INFO", "server.ready", message="All models loaded. API ready.")
yield
log("INFO", "server.shutdown", message="API shutting down.")
app = FastAPI(lifespan=lifespan)
app.add_middleware(CORSMiddleware,
allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"])
os.makedirs("temp_uploads", exist_ok=True)
# ════════════════════════════════════════════════════════════════
# FRONTEND EVENT LOG — React calls this for client-side events
# Logs: page visits, tab switches, mode toggles, search/upload
# initiated, settings changes, errors caught in UI
# ════════════════════════════════════════════════════════════════
@app.post("/api/log")
async def frontend_log(
request: Request,
event: str = Form(...), # e.g. "page.visit", "search.initiated"
user_id: str = Form(""),
page: str = Form(""),
metadata: str = Form("{}"), # JSON string with extra context
):
ip = get_ip(request)
try:
meta = json.loads(metadata) if metadata else {}
except Exception:
meta = {}
log("INFO", f"frontend.{event}",
user_id = user_id or "anonymous",
page = page,
ip = ip,
ua = request.headers.get("User-Agent", "")[:120],
**meta,
)
return {"ok": True}
# ════════════════════════════════════════════════════════════════
# 1. VERIFY KEYS & AUTO-BUILD INDEXES
# ════════════════════════════════════════════════════════════════
@app.post("/api/verify-keys")
async def verify_keys(
request: Request,
pinecone_key: str = Form(""),
cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
mode = "guest" if is_guest(pinecone_key) else "personal"
start = time.perf_counter()
log("INFO", "settings.verify_keys.start",
user_id=user_id or "anonymous", mode=mode, ip=ip)
if cloudinary_url:
try:
creds_v = get_cloudinary_creds(cloudinary_url)
if not creds_v.get("cloud_name"): raise ValueError("bad url")
await asyncio.to_thread(_cld_ping, creds_v)
except HTTPException: raise
except Exception as e:
log("ERROR", "settings.verify_keys.cloudinary_fail",
user_id=user_id or "anonymous", ip=ip, error=str(e),
duration_ms=round((time.perf_counter()-start)*1000))
raise HTTPException(400, "Invalid Cloudinary Environment URL.")
indexes_created = []
if pinecone_key:
try:
pc = _get_pinecone(pinecone_key)
existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)}
tasks = []
if IDX_OBJECTS not in existing:
tasks.append(asyncio.to_thread(pc.create_index, name=IDX_OBJECTS, dimension=1536,
metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1")))
indexes_created.append(IDX_OBJECTS)
if IDX_FACES not in existing:
tasks.append(asyncio.to_thread(pc.create_index, name=IDX_FACES, dimension=512,
metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1")))
indexes_created.append(IDX_FACES)
if tasks: await asyncio.gather(*tasks)
except Exception as e:
err = str(e)
clean = ("Invalid Pinecone API Key. Please check your key and try again."
if "401" in err or "unauthorized" in err.lower()
else f"Pinecone Error: {err}")
log("ERROR", "settings.verify_keys.pinecone_fail",
user_id=user_id or "anonymous", ip=ip, error=clean,
duration_ms=round((time.perf_counter()-start)*1000))
raise HTTPException(400, clean)
log("INFO", "settings.verify_keys.success",
user_id=user_id or "anonymous", mode=mode, ip=ip,
indexes_created=indexes_created,
duration_ms=round((time.perf_counter()-start)*1000))
return {"message": "Keys verified and indexes ready!"}
# ════════════════════════════════════════════════════════════════
# 2. UPLOAD
# ════════════════════════════════════════════════════════════════
@app.post("/api/upload")
async def upload_new_images(
request: Request,
files: List[UploadFile] = File(...),
folder_name: str = Form(...),
detect_faces: bool = Form(True),
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
start = time.perf_counter()
actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "")
actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
mode = "guest" if is_guest(actual_pc_key) else "personal"
log("INFO", "upload.start",
user_id=user_id or "anonymous", ip=ip, mode=mode,
folder=folder_name, file_count=len(files),
file_names=[f.filename for f in files][:10],
detect_faces=detect_faces)
if not actual_pc_key or not actual_cld_url:
log("ERROR", "upload.missing_keys", user_id=user_id or "anonymous", ip=ip, mode=mode)
raise HTTPException(400, "API Keys are missing. If you are a guest, the server is missing its DEFAULT_ secrets in Hugging Face.")
folder = standardize_category_name(folder_name)
creds = get_cloudinary_creds(actual_cld_url)
if not creds.get("cloud_name"):
log("ERROR", "upload.bad_cloudinary_url", user_id=user_id or "anonymous", ip=ip)
raise HTTPException(400, "Invalid Cloudinary URL format.")
pc = _get_pinecone(actual_pc_key)
idx_obj = pc.Index(IDX_OBJECTS)
idx_face = pc.Index(IDX_FACES)
uploaded_urls = []
face_vec_total = 0
object_vec_total = 0
for file in files:
tmp_path = f"temp_uploads/{uuid.uuid4().hex}_{sanitize_filename(file.filename)}"
file_start = time.perf_counter()
try:
with open(tmp_path, "wb") as buf:
shutil.copyfileobj(file.file, buf)
res = await asyncio.to_thread(_cld_upload, tmp_path, folder, creds)
image_url = res["secure_url"]
uploaded_urls.append(image_url)
async with _inference_sem:
vectors = await ai.process_image_async(tmp_path, is_query=False, detect_faces=detect_faces)
face_upserts, object_upserts = [], []
for v in vectors:
vec_list = v["vector"].tolist() if hasattr(v["vector"], "tolist") else v["vector"]
record = {"id": str(uuid.uuid4()), "values": vec_list,
"metadata": {"url": image_url, "folder": folder}}
(face_upserts if v["type"] == "face" else object_upserts).append(record)
face_vec_total += len(face_upserts)
object_vec_total += len(object_upserts)
upsert_tasks = []
if face_upserts: upsert_tasks.append(asyncio.to_thread(idx_face.upsert, vectors=face_upserts))
if object_upserts: upsert_tasks.append(asyncio.to_thread(idx_obj.upsert, vectors=object_upserts))
if upsert_tasks: await asyncio.gather(*upsert_tasks)
log("INFO", "upload.file.success",
user_id=user_id or "anonymous", ip=ip, mode=mode,
filename=file.filename, folder=folder, image_url=image_url,
face_vectors=len(face_upserts), obj_vectors=len(object_upserts),
detect_faces=detect_faces,
duration_ms=round((time.perf_counter()-file_start)*1000))
except Exception as e:
log("ERROR", "upload.file.error",
user_id=user_id or "anonymous", ip=ip, mode=mode,
filename=file.filename, folder=folder, error=str(e),
traceback=traceback.format_exc()[-800:],
duration_ms=round((time.perf_counter()-file_start)*1000))
err = str(e)
if "not found" in err.lower() or "404" in err:
raise HTTPException(404, "Pinecone index not found. Please go to Settings and click 'Verify & Save' to recreate your indexes.")
raise HTTPException(500, f"Upload processing failed: {err}")
finally:
if os.path.exists(tmp_path): os.remove(tmp_path)
log("INFO", "upload.complete",
user_id=user_id or "anonymous", ip=ip, mode=mode,
folder=folder, files_uploaded=len(uploaded_urls),
face_vectors=face_vec_total, object_vectors=object_vec_total,
detect_faces=detect_faces,
duration_ms=round((time.perf_counter()-start)*1000))
return {"message": "Done!", "urls": uploaded_urls}
# ════════════════════════════════════════════════════════════════
# 3. SEARCH
# ════════════════════════════════════════════════════════════════
@app.post("/api/search")
async def search_database(
request: Request,
file: UploadFile = File(...),
detect_faces: bool = Form(True),
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
start = time.perf_counter()
actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "")
mode = "guest" if is_guest(actual_pc_key) else "personal"
log("INFO", "search.start",
user_id=user_id or "anonymous", ip=ip, mode=mode,
filename=file.filename, detect_faces=detect_faces)
if not actual_pc_key:
log("ERROR", "search.missing_keys", user_id=user_id or "anonymous", ip=ip, mode=mode)
raise HTTPException(400, "Pinecone Key is missing.")
tmp_path = f"temp_uploads/query_{uuid.uuid4().hex}_{sanitize_filename(file.filename)}"
try:
with open(tmp_path, "wb") as buf:
shutil.copyfileobj(file.file, buf)
async with _inference_sem:
vectors = await ai.process_image_async(tmp_path, is_query=True, detect_faces=detect_faces)
inference_ms = round((time.perf_counter() - start) * 1000)
lanes_used = list({v["type"] for v in vectors})
log("INFO", "search.inference_done",
user_id=user_id or "anonymous", ip=ip, mode=mode,
vector_count=len(vectors), lanes=lanes_used, inference_ms=inference_ms)
pc = _get_pinecone(actual_pc_key)
idx_obj = pc.Index(IDX_OBJECTS)
idx_face = pc.Index(IDX_FACES)
async def _query_one(vec_dict: dict):
vec_list = vec_dict["vector"].tolist() if hasattr(vec_dict["vector"], "tolist") else vec_dict["vector"]
target_idx = idx_face if vec_dict["type"] == "face" else idx_obj
try:
res = await asyncio.to_thread(target_idx.query, vector=vec_list, top_k=10, include_metadata=True)
except Exception as e:
if "404" in str(e):
raise HTTPException(404, "Pinecone Index not found. Please log in and click 'Verify Keys' in Settings.")
raise e
out = []
for match in res.get("matches", []):
score = match["score"]
is_face = vec_dict["type"] == "face"
if is_face:
if score < 0.35: continue
ui_score = min(0.99, 0.75 + ((score - 0.35) / 0.65) * 0.24)
else:
if score < 0.45: continue
ui_score = score
out.append({"url": match["metadata"].get("url") or match["metadata"].get("image_url", ""),
"score": round(ui_score, 4),
"caption": "👤 Verified Identity" if is_face else match["metadata"].get("folder", "🎯 Object Match")})
return out
nested = await asyncio.gather(*[_query_one(v) for v in vectors])
all_results = [r for sub in nested for r in sub]
seen = {}
for r in all_results:
url = r["url"]
if url not in seen or r["score"] > seen[url]["score"]:
seen[url] = r
final = sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:10]
log("INFO", "search.complete",
user_id=user_id or "anonymous", ip=ip, mode=mode,
lanes=lanes_used, detect_faces=detect_faces,
results_count=len(final), top_score=final[0]["score"] if final else 0,
duration_ms=round((time.perf_counter()-start)*1000))
return {"results": final}
except HTTPException: raise
except Exception as e:
log("ERROR", "search.error",
user_id=user_id or "anonymous", ip=ip, mode=mode,
error=str(e), traceback=traceback.format_exc()[-800:],
duration_ms=round((time.perf_counter()-start)*1000))
raise HTTPException(500, str(e))
finally:
if os.path.exists(tmp_path): os.remove(tmp_path)
# ════════════════════════════════════════════════════════════════
# 4. CATEGORIES
# ════════════════════════════════════════════════════════════════
@app.post("/api/categories")
async def get_categories(
request: Request,
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
actual_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
if not actual_url: return {"categories": []}
try:
creds = get_cloudinary_creds(actual_url)
if not creds.get("cloud_name"): return {"categories": []}
result = await asyncio.to_thread(_cld_root_folders, creds)
categories = [f["name"] for f in result.get("folders", [])]
log("INFO", "categories.fetched",
user_id=user_id or "anonymous", ip=ip, category_count=len(categories))
return {"categories": categories}
except Exception as e:
log("ERROR", "categories.error", user_id=user_id or "anonymous", ip=ip, error=str(e))
return {"categories": []}
@app.get("/api/health")
async def health():
return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()}
# ════════════════════════════════════════════════════════════════
# 5. LIST FOLDER IMAGES
# ════════════════════════════════════════════════════════════════
def _cld_list_folder_images(folder: str, creds: dict, next_cursor: str = None):
kwargs = dict(type="upload", prefix=f"{folder}/", max_results=500,
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
if next_cursor: kwargs["next_cursor"] = next_cursor
return cloudinary.api.resources(**kwargs)
@app.post("/api/cloudinary/folder-images")
async def list_folder_images(
request: Request,
user_cloudinary_url: str = Form(""),
folder_name: str = Form(...),
user_id: str = Form(""),
):
ip = get_ip(request)
actual_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
creds = get_cloudinary_creds(actual_url)
if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.")
images, next_cursor = [], None
while True:
result = await asyncio.to_thread(_cld_list_folder_images, folder_name, creds, next_cursor)
for r in result.get("resources", []):
images.append({"url": r["secure_url"], "public_id": r["public_id"]})
next_cursor = result.get("next_cursor")
if not next_cursor: break
log("INFO", "explorer.folder_opened",
user_id=user_id or "anonymous", ip=ip,
folder_name=folder_name, image_count=len(images))
return {"images": images, "count": len(images)}
# ════════════════════════════════════════════════════════════════
# 6. DELETE SINGLE IMAGE
# ════════════════════════════════════════════════════════════════
def url_to_public_id(image_url: str, cloud_name: str) -> str:
try:
path = urlparse(image_url).path
parts = path.split("/")
upload_idx = parts.index("upload")
after = parts[upload_idx + 1:]
if after and after[0].startswith("v") and after[0][1:].isdigit(): after = after[1:]
return "/".join(after).rsplit(".", 1)[0]
except Exception: return ""
def _cld_delete_resource(public_id: str, creds: dict):
return cloudinary.uploader.destroy(public_id,
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
@app.post("/api/delete-image")
async def delete_image(
request: Request,
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
image_url: str = Form(""),
public_id: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "")
actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
creds = get_cloudinary_creds(actual_cld_url)
if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.")
pid = public_id or url_to_public_id(image_url, creds["cloud_name"])
if not pid: raise HTTPException(400, "Could not determine public_id.")
await asyncio.to_thread(_cld_delete_resource, pid, creds)
if actual_pc_key and image_url:
try:
pc = _get_pinecone(actual_pc_key)
for idx_name in [IDX_OBJECTS, IDX_FACES]:
await asyncio.to_thread(pc.Index(idx_name).delete, filter={"url": {"$eq": image_url}})
except Exception as e:
_log_fn("WARNING", f"Pinecone delete warning: {e}")
log("INFO", "explorer.image_deleted",
user_id=user_id or "anonymous", ip=ip,
image_url=image_url, public_id=pid)
return {"message": "Image deleted successfully."}
# ════════════════════════════════════════════════════════════════
# 7. DELETE ENTIRE FOLDER
# ════════════════════════════════════════════════════════════════
def _cld_delete_folder(folder: str, creds: dict):
return cloudinary.api.delete_resources_by_prefix(f"{folder}/",
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
def _cld_remove_folder(folder: str, creds: dict):
try:
return cloudinary.api.delete_folder(folder,
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
except Exception: pass
@app.post("/api/delete-folder")
async def delete_folder(
request: Request,
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
folder_name: str = Form(...),
user_id: str = Form(""),
):
ip = get_ip(request)
actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "")
actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
creds = get_cloudinary_creds(actual_cld_url)
if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.")
all_images, next_cursor = [], None
while True:
result = await asyncio.to_thread(_cld_list_folder_images, folder_name, creds, next_cursor)
all_images.extend(result.get("resources", []))
next_cursor = result.get("next_cursor")
if not next_cursor: break
await asyncio.to_thread(_cld_delete_folder, folder_name, creds)
await asyncio.to_thread(_cld_remove_folder, folder_name, creds)
if actual_pc_key:
try:
pc = _get_pinecone(actual_pc_key)
for idx_name in [IDX_OBJECTS, IDX_FACES]:
idx = pc.Index(idx_name)
try:
await asyncio.to_thread(idx.delete, filter={"folder": {"$eq": folder_name}})
except Exception:
for img in all_images:
try:
if img.get("secure_url"):
await asyncio.to_thread(idx.delete, filter={"url": {"$eq": img["secure_url"]}})
except Exception: pass
except Exception as e:
_log_fn("WARNING", f"Pinecone folder delete warning: {e}")
log("INFO", "explorer.folder_deleted",
user_id=user_id or "anonymous", ip=ip,
folder_name=folder_name, deleted_count=len(all_images))
return {"message": f"Folder '{folder_name}' and all its contents deleted.", "deleted_count": len(all_images)}
# ════════════════════════════════════════════════════════════════
# 8. RESET DATABASE ⚠️ DESTRUCTIVE — triple-logged
# ════════════════════════════════════════════════════════════════
@app.post("/api/reset-database")
async def reset_database(
request: Request,
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
start = time.perf_counter()
log("WARNING", "danger.reset_database.attempt",
user_id=user_id or "anonymous", ip=ip)
if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL):
log("WARNING", "danger.reset_database.blocked_shared_db",
user_id=user_id or "anonymous", ip=ip)
raise HTTPException(403, "Reset is not allowed on the shared demo database.")
creds = get_cloudinary_creds(user_cloudinary_url)
if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.")
try:
await asyncio.to_thread(lambda: cloudinary.api.delete_all_resources(
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]))
except Exception as e:
_log_fn("WARNING", f"Cloudinary wipe: {e}")
# Delete Cloudinary folders too
try:
folders_res = await asyncio.to_thread(_cld_root_folders, creds)
for folder in folders_res.get("folders", []):
await asyncio.to_thread(_cld_remove_folder, folder["name"], creds)
except Exception as e:
_log_fn("WARNING", f"Cloudinary folder cleanup: {e}")
try:
pc = _get_pinecone(user_pinecone_key)
existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)}
tasks = []
if IDX_OBJECTS in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS))
if IDX_FACES in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES))
if tasks: await asyncio.gather(*tasks)
await asyncio.sleep(2)
await asyncio.gather(
asyncio.to_thread(pc.create_index, name=IDX_OBJECTS, dimension=1536, metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")),
asyncio.to_thread(pc.create_index, name=IDX_FACES, dimension=512, metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")),
)
except Exception as e:
log("ERROR", "danger.reset_database.pinecone_error",
user_id=user_id or "anonymous", ip=ip, error=str(e))
raise HTTPException(500, f"Pinecone reset error: {e}")
log("WARNING", "danger.reset_database.complete",
user_id=user_id or "anonymous", ip=ip,
duration_ms=round((time.perf_counter()-start)*1000))
return {"message": "Database reset complete. All data wiped and indexes recreated."}
# ════════════════════════════════════════════════════════════════
# 9. DELETE ACCOUNT ⚠️ DESTRUCTIVE — triple-logged
# ════════════════════════════════════════════════════════════════
@app.post("/api/delete-account")
async def delete_account(
request: Request,
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
start = time.perf_counter()
log("WARNING", "danger.delete_account.attempt",
user_id=user_id or "anonymous", ip=ip)
if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL):
log("WARNING", "danger.delete_account.blocked_shared_db",
user_id=user_id or "anonymous", ip=ip)
raise HTTPException(403, "Account deletion is not allowed on the shared demo database.")
creds = get_cloudinary_creds(user_cloudinary_url)
try:
await asyncio.to_thread(lambda: cloudinary.api.delete_all_resources(
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]))
except Exception as e:
_log_fn("WARNING", f"Account delete Cloudinary: {e}")
# Delete Cloudinary folders
try:
folders_res = await asyncio.to_thread(_cld_root_folders, creds)
for folder in folders_res.get("folders", []):
await asyncio.to_thread(_cld_remove_folder, folder["name"], creds)
except Exception as e:
_log_fn("WARNING", f"Account delete Cloudinary folders: {e}")
try:
pc = _get_pinecone(user_pinecone_key)
existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)}
tasks = []
if IDX_OBJECTS in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS))
if IDX_FACES in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES))
if tasks: await asyncio.gather(*tasks)
except Exception as e:
_log_fn("WARNING", f"Account delete Pinecone: {e}")
log("WARNING", "danger.delete_account.complete",
user_id=user_id or "anonymous", ip=ip,
duration_ms=round((time.perf_counter()-start)*1000))
return {"message": "Account data deleted. Sign out initiated."}