Spaces:
Sleeping
Sleeping
| """ | |
| ChromaDB store — semantic vector search. | |
| """ | |
| import chromadb | |
| from openmark import config | |
| from openmark.embeddings.base import EmbeddingProvider | |
| COLLECTION_NAME = "openmark_bookmarks" | |
| def get_client() -> chromadb.PersistentClient: | |
| return chromadb.PersistentClient(path=config.CHROMA_PATH) | |
| def get_collection(client: chromadb.PersistentClient, embedder: EmbeddingProvider): | |
| """Get or create the bookmarks collection.""" | |
| return client.get_or_create_collection( | |
| name=COLLECTION_NAME, | |
| metadata={"hnsw:space": "cosine"}, | |
| ) | |
| def ingest(items: list[dict], embedder: EmbeddingProvider, batch_size: int = 100): | |
| """Embed all items and store in ChromaDB.""" | |
| client = get_client() | |
| collection = get_collection(client, embedder) | |
| # Check already ingested | |
| existing = set(collection.get(include=[])["ids"]) | |
| new_items = [i for i in items if i["url"] not in existing] | |
| print(f"ChromaDB: {len(existing)} already ingested, {len(new_items)} new") | |
| if not new_items: | |
| return | |
| total = 0 | |
| for start in range(0, len(new_items), batch_size): | |
| batch = new_items[start:start + batch_size] | |
| texts = [i["doc_text"] for i in batch] | |
| ids = [i["url"] for i in batch] | |
| metas = [ | |
| { | |
| "title": i["title"][:500], | |
| "category": i["category"], | |
| "source": i["source"], | |
| "score": float(i["score"]), | |
| "tags": ",".join(i["tags"]), | |
| "folder": i.get("folder", ""), | |
| } | |
| for i in batch | |
| ] | |
| embeddings = embedder.embed_documents(texts) | |
| collection.add( | |
| ids=ids, | |
| embeddings=embeddings, | |
| documents=texts, | |
| metadatas=metas, | |
| ) | |
| total += len(batch) | |
| print(f" ChromaDB ingested {total}/{len(new_items)}") | |
| print(f"ChromaDB total: {collection.count()} items") | |
| def search( | |
| query: str, | |
| embedder: EmbeddingProvider, | |
| n: int = 10, | |
| category: str | None = None, | |
| source: str | None = None, | |
| min_score: float | None = None, | |
| ) -> list[dict]: | |
| """Semantic search with optional metadata filters.""" | |
| client = get_client() | |
| collection = get_collection(client, embedder) | |
| q_embedding = embedder.embed_query(query) | |
| # Build filters | |
| filters = [] | |
| if category: | |
| filters.append({"category": {"$eq": category}}) | |
| if source: | |
| filters.append({"source": {"$eq": source}}) | |
| if min_score is not None: | |
| filters.append({"score": {"$gte": min_score}}) | |
| where = None | |
| if len(filters) == 1: | |
| where = filters[0] | |
| elif len(filters) > 1: | |
| where = {"$and": filters} | |
| results = collection.query( | |
| query_embeddings=[q_embedding], | |
| n_results=n, | |
| where=where, | |
| include=["metadatas", "documents", "distances"], | |
| ) | |
| output = [] | |
| for i, (meta, doc, dist) in enumerate(zip( | |
| results["metadatas"][0], | |
| results["documents"][0], | |
| results["distances"][0], | |
| )): | |
| output.append({ | |
| "rank": i + 1, | |
| "url": results["ids"][0][i], | |
| "title": meta.get("title", ""), | |
| "category": meta.get("category", ""), | |
| "source": meta.get("source", ""), | |
| "score": meta.get("score", 0), | |
| "tags": meta.get("tags", "").split(","), | |
| "similarity": round(1 - dist, 4), | |
| }) | |
| return output | |
| def get_stats() -> dict: | |
| client = get_client() | |
| collection = get_collection(client, None) | |
| return {"total": collection.count()} | |