visual-search-api / src /services /clustering.py
AdarshDRC's picture
feat: friday commit
55a16c0
"""
src/services/clustering.py — Phase 3: HDBSCAN face clustering (People View)
Clusters all face vectors in the faces-arcface Pinecone index using HDBSCAN,
then stores cluster assignments in Supabase (face_clusters table).
Algorithm choice:
- HDBSCAN on ArcFace 512-d vectors (euclidean after L2 normalisation)
- min_cluster_size=3, min_samples=3, cluster_selection_epsilon=0.35
- Noise points (label=-1) are left unclustered — not forced into clusters
- Representative face = the vector closest to the cluster centroid
Pinecone fetch strategy:
- Pinecone free tier has no "list all vectors" endpoint
- We use a dummy query with random vectors + large top_k to page through
vectors. This is imperfect but works within free-tier constraints.
- Production alternative: store vector_ids in Supabase on upload (Phase 4)
Entry points:
run_clustering(pc, user_id, keys) — full re-cluster, called by API endpoint
get_people(user_id) — read cluster list from Supabase
get_person_images(cluster_id, user_id) — images for one cluster
rename_cluster(cluster_id, name, user_id) — label "Mom", "John", etc.
"""
import asyncio
import uuid
from datetime import datetime, timezone
from typing import Optional
import aiohttp
import numpy as np
from src.core.config import (
IDX_FACES_ARCFACE,
SUPABASE_URL, SUPABASE_SERVICE_KEY,
CLUSTER_MIN_SAMPLES, CLUSTER_MIN_CLUSTER_SIZE, CLUSTER_EPSILON,
FACE_SEARCH_TOP_K, CLUSTERING_BLUR_THRESHOLD,
)
from src.common.utils import cld_face_thumb_url
# ──────────────────────────────────────────────────────────────
# Supabase helpers
# ──────────────────────────────────────────────────────────────
def _hdr() -> dict:
return {
"apikey": SUPABASE_SERVICE_KEY,
"Authorization": f"Bearer {SUPABASE_SERVICE_KEY}",
"Content-Type": "application/json",
"Prefer": "return=representation",
}
async def _supa_upsert(table: str, rows: list[dict]) -> None:
if not SUPABASE_URL or not rows:
return
url = f"{SUPABASE_URL}/rest/v1/{table}"
headers = {**_hdr(), "Prefer": "resolution=merge-duplicates,return=minimal"}
async with aiohttp.ClientSession() as s:
await s.post(url, headers=headers, json=rows)
async def _supa_select(table: str, filters: str = "") -> list[dict]:
if not SUPABASE_URL:
return []
url = f"{SUPABASE_URL}/rest/v1/{table}?{filters}"
async with aiohttp.ClientSession() as s:
async with s.get(url, headers=_hdr()) as r:
if r.status == 200:
return await r.json()
return []
async def _supa_patch(table: str, filters: str, patch: dict) -> None:
if not SUPABASE_URL:
return
url = f"{SUPABASE_URL}/rest/v1/{table}?{filters}"
async with aiohttp.ClientSession() as s:
await s.patch(url, headers=_hdr(), json=patch)
async def _supa_delete(table: str, filters: str) -> None:
if not SUPABASE_URL:
return
url = f"{SUPABASE_URL}/rest/v1/{table}?{filters}"
async with aiohttp.ClientSession() as s:
await s.delete(url, headers=_hdr())
# ──────────────────────────────────────────────────────────────
# Pinecone vector fetch helpers
# ──────────────────────────────────────────────────────────────
def _fetch_all_vectors(idx, dim: int = 512, max_vectors: int = 10000) -> list[dict]:
"""
Fetches as many vectors as possible from a Pinecone index using
random-probe queries. Free-tier Pinecone has no scan endpoint, so
we use diverse random probes to discover vectors.
Returns list of dicts: {id, values, metadata}
"""
seen_ids: set = set()
collected: list[dict] = []
rng = np.random.default_rng(seed=42)
# 20 random probes — covers most of the index for typical gallery sizes
for _ in range(20):
probe = rng.standard_normal(dim).astype(np.float32)
probe /= np.linalg.norm(probe)
res = idx.query(
vector=probe.tolist(),
top_k=min(FACE_SEARCH_TOP_K, 1000),
include_metadata=True,
include_values=True,
)
for match in res.get("matches", []):
vid = match["id"]
if vid in seen_ids:
continue
seen_ids.add(vid)
values = match.get("values")
if values:
collected.append({
"id": vid,
"values": values,
"metadata": match.get("metadata", {}),
})
if len(collected) >= max_vectors:
break
if len(collected) >= max_vectors:
break
return collected
# ──────────────────────────────────────────────────────────────
# Core clustering logic
# ──────────────────────────────────────────────────────────────
def _run_hdbscan(vectors: np.ndarray) -> np.ndarray:
"""
Runs HDBSCAN on the provided L2-normalised 512-d face vectors.
Returns integer label array (−1 = noise / unclustered).
"""
try:
import hdbscan
except ImportError:
raise RuntimeError(
"hdbscan not installed. Add hdbscan>=0.8.33 to requirements.txt"
)
clusterer = hdbscan.HDBSCAN(
min_cluster_size=CLUSTER_MIN_CLUSTER_SIZE,
min_samples=CLUSTER_MIN_SAMPLES,
cluster_selection_epsilon=CLUSTER_EPSILON,
metric="euclidean",
core_dist_n_jobs=1, # HF CPU — avoid multiprocessing overhead
)
clusterer.fit(vectors)
return clusterer.labels_
def _pick_representative(cluster_vecs: np.ndarray, cluster_meta: list[dict]) -> dict:
"""
Picks the non-blurry face closest to the cluster centroid as the representative.
Prefers sharpest (highest blur_score) faces. Returns the metadata dict for that face.
"""
centroid = cluster_vecs.mean(axis=0)
centroid /= np.linalg.norm(centroid) + 1e-8
sims = cluster_vecs @ centroid
# Sort by similarity, but prefer non-blurry faces (higher blur_score)
sorted_indices = np.argsort(sims)[::-1] # highest similarity first
for idx in sorted_indices:
blur_score = cluster_meta[idx].get("blur_score", 100.0)
if blur_score >= CLUSTERING_BLUR_THRESHOLD:
return cluster_meta[int(idx)]
# Fallback: if all faces are blurry, pick the sharpest one
best_idx = max(range(len(cluster_meta)), key=lambda i: cluster_meta[i].get("blur_score", 0))
return cluster_meta[best_idx]
# ──────────────────────────────────────────────────────────────
# Public entry points
# ──────────────────────────────────────────────────────────────
async def run_clustering(pc, user_id: str) -> dict:
"""
Full re-cluster pipeline:
1. Fetch all ArcFace vectors from Pinecone
2. Run HDBSCAN
3. Write cluster assignments to Supabase face_clusters table
4. Write per-vector assignments to face_vector_clusters table
Returns a summary dict.
"""
idx = pc.Index(IDX_FACES_ARCFACE)
# 1. Fetch vectors (blocking — run in thread pool)
raw = await asyncio.to_thread(_fetch_all_vectors, idx)
if len(raw) < CLUSTER_MIN_CLUSTER_SIZE:
return {"status": "skipped", "reason": "not enough vectors", "vectors": len(raw)}
ids = [r["id"] for r in raw]
metas = [r["metadata"] for r in raw]
# Filter out blurry faces before clustering
valid_indices = [i for i, meta in enumerate(metas) if meta.get("blur_score", 100.0) >= CLUSTERING_BLUR_THRESHOLD]
if len(valid_indices) < CLUSTER_MIN_CLUSTER_SIZE:
return {"status": "skipped", "reason": f"only {len(valid_indices)} non-blurry vectors after blur filtering", "vectors": len(raw), "valid_vectors": len(valid_indices)}
ids = [ids[i] for i in valid_indices]
metas = [metas[i] for i in valid_indices]
raw_values = [r["values"] for r in raw]
matrix = np.array([raw_values[i] for i in valid_indices], dtype=np.float32)
# L2-normalise before euclidean HDBSCAN (equivalent to angular distance)
norms = np.linalg.norm(matrix, axis=1, keepdims=True)
matrix = matrix / (norms + 1e-8)
# 2. Cluster (blocking)
labels = await asyncio.to_thread(_run_hdbscan, matrix)
unique_labels = set(labels) - {-1}
now_iso = datetime.now(timezone.utc).isoformat()
# 3. Delete existing clusters for this user (full re-cluster)
await _supa_delete("face_clusters", f"user_id=eq.{user_id}")
await _supa_delete("face_vector_clusters", f"user_id=eq.{user_id}")
cluster_rows = []
vector_rows = []
for label in sorted(unique_labels):
cluster_id = str(uuid.uuid4())
mask = labels == label
c_indices = np.where(mask)[0]
c_vecs = matrix[c_indices]
c_meta = [metas[i] for i in c_indices]
c_ids = [ids[i] for i in c_indices]
rep_meta = _pick_representative(c_vecs, c_meta)
cluster_rows.append({
"cluster_id": cluster_id,
"user_id": user_id,
"representative_face_crop": rep_meta.get("face_crop", ""),
"representative_vector_id": c_ids[0],
"face_count": int(len(c_indices)),
"name": None,
"created_at": now_iso,
"updated_at": now_iso,
})
for vid, meta in zip(c_ids, c_meta):
vector_rows.append({
"vector_id": vid,
"cluster_id": cluster_id,
"user_id": user_id,
"image_url": meta.get("url", ""),
"folder": meta.get("folder", ""),
"face_crop": meta.get("face_crop", ""),
"updated_at": now_iso,
})
# 4. Batch write to Supabase (200 rows per request)
for i in range(0, len(cluster_rows), 200):
await _supa_upsert("face_clusters", cluster_rows[i:i + 200])
for i in range(0, len(vector_rows), 200):
await _supa_upsert("face_vector_clusters", vector_rows[i:i + 200])
return {
"status": "ok",
"total_vectors": len(ids),
"clusters_found": len(unique_labels),
"noise_vectors": int(np.sum(labels == -1)),
}
async def get_people(user_id: str) -> list[dict]:
"""Returns all identity clusters for a user, ordered by face_count desc."""
rows = await _supa_select(
"face_clusters",
f"user_id=eq.{user_id}&order=face_count.desc",
)
return [
{
"cluster_id": r["cluster_id"],
"name": r.get("name"),
"face_count": r.get("face_count", 0),
"representative_face_crop": r.get("representative_face_crop", ""),
}
for r in rows
]
async def get_person_images(cluster_id: str, user_id: str) -> list[dict]:
"""Returns all images belonging to a cluster."""
rows = await _supa_select(
"face_vector_clusters",
f"cluster_id=eq.{cluster_id}&user_id=eq.{user_id}",
)
# Dedupe by image_url (multiple face vectors can come from the same image)
seen: set = set()
out = []
for r in rows:
url = r.get("image_url", "")
if url and url not in seen:
seen.add(url)
out.append({
"url": url,
"thumb_url": cld_face_thumb_url(url),
"folder": r.get("folder", ""),
"face_crop": r.get("face_crop", ""),
})
return out
async def rename_cluster(cluster_id: str, name: str, user_id: str) -> bool:
"""Assigns a human-readable name to a cluster ('Mom', 'John', etc.)."""
await _supa_patch(
"face_clusters",
f"cluster_id=eq.{cluster_id}&user_id=eq.{user_id}",
{"name": name, "updated_at": datetime.now(timezone.utc).isoformat()},
)
return True
async def search_cluster_aware(
pc, image_map: dict, user_id: str
) -> dict:
"""
Cluster-aware search expansion (Phase 3 recall win).
Given an initial image_map from search_faces_split, look up which
clusters the matched faces belong to, then return ALL images in those
clusters. This achieves near-100% recall for well-indexed people.
Returns an expanded image_map in the same format as search_faces_split.
"""
if not image_map:
return image_map
# Find which vector_ids were returned in the initial search
matched_vids = {v.get("vector_id") for v in image_map.values() if v.get("vector_id")}
if not matched_vids:
return image_map
# Look up cluster membership for those vector_ids
vid_list = ",".join(f'"{v}"' for v in matched_vids)
rows = await _supa_select(
"face_vector_clusters",
f"vector_id=in.({vid_list})&user_id=eq.{user_id}",
)
if not rows:
return image_map
# Collect all cluster_ids matched
cluster_ids = {r["cluster_id"] for r in rows}
# Fetch all images in those clusters
expanded = dict(image_map)
for cluster_id in cluster_ids:
cluster_images = await get_person_images(cluster_id, user_id)
for img in cluster_images:
url = img["url"]
if url not in expanded:
# Add with a slightly lower score than the worst match
# so cluster-expanded results sort after direct hits
min_score = min(
(v["fused_score"] for v in image_map.values()), default=0.3
)
expanded[url] = {
"fused_score": max(min_score - 0.01, 0.01),
"arcface_score": 0.0,
"adaface_score": 0.0,
"raw_score": 0.0,
"face_crop": img.get("face_crop", ""),
"folder": img.get("folder", "uncategorized"),
"vector_id": None,
"cluster_expanded": True,
}
return expanded