|
|
""" |
|
|
Vector Store Interface and ChromaDB Implementation |
|
|
|
|
|
Provides: |
|
|
- Abstract VectorStore interface |
|
|
- ChromaDB implementation with local persistence |
|
|
- Chunk storage with metadata |
|
|
""" |
|
|
|
|
|
from abc import ABC, abstractmethod |
|
|
from typing import List, Optional, Dict, Any, Tuple |
|
|
from pathlib import Path |
|
|
from pydantic import BaseModel, Field |
|
|
from loguru import logger |
|
|
import hashlib |
|
|
import json |
|
|
|
|
|
try: |
|
|
import chromadb |
|
|
from chromadb.config import Settings |
|
|
CHROMADB_AVAILABLE = True |
|
|
except ImportError: |
|
|
CHROMADB_AVAILABLE = False |
|
|
logger.warning("ChromaDB not available. Install with: pip install chromadb") |
|
|
|
|
|
|
|
|
class VectorStoreConfig(BaseModel): |
|
|
"""Configuration for vector store.""" |
|
|
|
|
|
persist_directory: str = Field( |
|
|
default="./data/vectorstore", |
|
|
description="Directory for persistent storage" |
|
|
) |
|
|
collection_name: str = Field( |
|
|
default="sparknet_documents", |
|
|
description="Name of the collection" |
|
|
) |
|
|
|
|
|
|
|
|
default_top_k: int = Field(default=5, ge=1, description="Default number of results") |
|
|
similarity_threshold: float = Field( |
|
|
default=0.7, |
|
|
ge=0.0, |
|
|
le=1.0, |
|
|
description="Minimum similarity score" |
|
|
) |
|
|
|
|
|
|
|
|
anonymized_telemetry: bool = Field(default=False) |
|
|
|
|
|
|
|
|
class VectorSearchResult(BaseModel): |
|
|
"""Result from vector search.""" |
|
|
chunk_id: str |
|
|
document_id: str |
|
|
text: str |
|
|
metadata: Dict[str, Any] |
|
|
similarity: float |
|
|
|
|
|
|
|
|
page: Optional[int] = None |
|
|
bbox: Optional[Dict[str, float]] = None |
|
|
chunk_type: Optional[str] = None |
|
|
|
|
|
|
|
|
class VectorStore(ABC): |
|
|
"""Abstract interface for vector stores.""" |
|
|
|
|
|
@abstractmethod |
|
|
def add_chunks( |
|
|
self, |
|
|
chunks: List[Dict[str, Any]], |
|
|
embeddings: List[List[float]], |
|
|
) -> List[str]: |
|
|
""" |
|
|
Add chunks with embeddings to the store. |
|
|
|
|
|
Args: |
|
|
chunks: List of chunk dictionaries with text and metadata |
|
|
embeddings: Corresponding embeddings |
|
|
|
|
|
Returns: |
|
|
List of stored chunk IDs |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def search( |
|
|
self, |
|
|
query_embedding: List[float], |
|
|
top_k: int = 5, |
|
|
filters: Optional[Dict[str, Any]] = None, |
|
|
) -> List[VectorSearchResult]: |
|
|
""" |
|
|
Search for similar chunks. |
|
|
|
|
|
Args: |
|
|
query_embedding: Query vector |
|
|
top_k: Number of results |
|
|
filters: Optional metadata filters |
|
|
|
|
|
Returns: |
|
|
List of search results |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def delete_document(self, document_id: str) -> int: |
|
|
""" |
|
|
Delete all chunks for a document. |
|
|
|
|
|
Args: |
|
|
document_id: Document ID to delete |
|
|
|
|
|
Returns: |
|
|
Number of chunks deleted |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def get_chunk(self, chunk_id: str) -> Optional[Dict[str, Any]]: |
|
|
"""Get a specific chunk by ID.""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def count(self, document_id: Optional[str] = None) -> int: |
|
|
"""Count chunks in store, optionally filtered by document.""" |
|
|
pass |
|
|
|
|
|
|
|
|
class ChromaVectorStore(VectorStore): |
|
|
""" |
|
|
ChromaDB implementation of vector store. |
|
|
|
|
|
Features: |
|
|
- Local persistent storage |
|
|
- Metadata filtering |
|
|
- Similarity search with cosine distance |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[VectorStoreConfig] = None): |
|
|
"""Initialize ChromaDB store.""" |
|
|
if not CHROMADB_AVAILABLE: |
|
|
raise ImportError("ChromaDB is required. Install with: pip install chromadb") |
|
|
|
|
|
self.config = config or VectorStoreConfig() |
|
|
|
|
|
|
|
|
persist_path = Path(self.config.persist_directory) |
|
|
persist_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
self._client = chromadb.PersistentClient( |
|
|
path=str(persist_path), |
|
|
settings=Settings( |
|
|
anonymized_telemetry=self.config.anonymized_telemetry, |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
self._collection = self._client.get_or_create_collection( |
|
|
name=self.config.collection_name, |
|
|
metadata={"hnsw:space": "cosine"} |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"ChromaDB initialized: {self.config.collection_name} " |
|
|
f"({self._collection.count()} chunks)" |
|
|
) |
|
|
|
|
|
def add_chunks( |
|
|
self, |
|
|
chunks: List[Dict[str, Any]], |
|
|
embeddings: List[List[float]], |
|
|
) -> List[str]: |
|
|
"""Add chunks with embeddings.""" |
|
|
if not chunks: |
|
|
return [] |
|
|
|
|
|
if len(chunks) != len(embeddings): |
|
|
raise ValueError( |
|
|
f"Chunks ({len(chunks)}) and embeddings ({len(embeddings)}) " |
|
|
"must have same length" |
|
|
) |
|
|
|
|
|
ids = [] |
|
|
documents = [] |
|
|
metadatas = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
|
|
|
chunk_id = chunk.get("chunk_id") |
|
|
if not chunk_id: |
|
|
|
|
|
content = f"{chunk.get('document_id', '')}-{chunk.get('text', '')[:100]}" |
|
|
chunk_id = hashlib.md5(content.encode()).hexdigest()[:16] |
|
|
|
|
|
ids.append(chunk_id) |
|
|
documents.append(chunk.get("text", "")) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"document_id": chunk.get("document_id", ""), |
|
|
"source_path": chunk.get("source_path", ""), |
|
|
"chunk_type": chunk.get("chunk_type", "text"), |
|
|
"page": chunk.get("page", 0), |
|
|
"sequence_index": chunk.get("sequence_index", 0), |
|
|
"confidence": chunk.get("confidence", 1.0), |
|
|
} |
|
|
|
|
|
|
|
|
if "bbox" in chunk and chunk["bbox"]: |
|
|
bbox = chunk["bbox"] |
|
|
if hasattr(bbox, "model_dump"): |
|
|
metadata["bbox_json"] = json.dumps(bbox.model_dump()) |
|
|
elif isinstance(bbox, dict): |
|
|
metadata["bbox_json"] = json.dumps(bbox) |
|
|
|
|
|
metadatas.append(metadata) |
|
|
|
|
|
|
|
|
self._collection.add( |
|
|
ids=ids, |
|
|
embeddings=embeddings, |
|
|
documents=documents, |
|
|
metadatas=metadatas, |
|
|
) |
|
|
|
|
|
logger.debug(f"Added {len(ids)} chunks to vector store") |
|
|
return ids |
|
|
|
|
|
def search( |
|
|
self, |
|
|
query_embedding: List[float], |
|
|
top_k: int = 5, |
|
|
filters: Optional[Dict[str, Any]] = None, |
|
|
) -> List[VectorSearchResult]: |
|
|
"""Search for similar chunks.""" |
|
|
|
|
|
where = None |
|
|
if filters: |
|
|
where = self._build_where_clause(filters) |
|
|
|
|
|
|
|
|
results = self._collection.query( |
|
|
query_embeddings=[query_embedding], |
|
|
n_results=top_k, |
|
|
where=where, |
|
|
include=["documents", "metadatas", "distances"], |
|
|
) |
|
|
|
|
|
|
|
|
search_results = [] |
|
|
|
|
|
if results["ids"] and results["ids"][0]: |
|
|
for i, chunk_id in enumerate(results["ids"][0]): |
|
|
|
|
|
distance = results["distances"][0][i] if results["distances"] else 0 |
|
|
similarity = 1 - distance |
|
|
|
|
|
|
|
|
if similarity < self.config.similarity_threshold: |
|
|
continue |
|
|
|
|
|
metadata = results["metadatas"][0][i] if results["metadatas"] else {} |
|
|
|
|
|
|
|
|
bbox = None |
|
|
if "bbox_json" in metadata: |
|
|
try: |
|
|
bbox = json.loads(metadata["bbox_json"]) |
|
|
except: |
|
|
pass |
|
|
|
|
|
result = VectorSearchResult( |
|
|
chunk_id=chunk_id, |
|
|
document_id=metadata.get("document_id", ""), |
|
|
text=results["documents"][0][i] if results["documents"] else "", |
|
|
metadata=metadata, |
|
|
similarity=similarity, |
|
|
page=metadata.get("page"), |
|
|
bbox=bbox, |
|
|
chunk_type=metadata.get("chunk_type"), |
|
|
) |
|
|
search_results.append(result) |
|
|
|
|
|
return search_results |
|
|
|
|
|
def _build_where_clause(self, filters: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Build ChromaDB where clause from filters.""" |
|
|
conditions = [] |
|
|
|
|
|
for key, value in filters.items(): |
|
|
if key == "document_id": |
|
|
conditions.append({"document_id": {"$eq": value}}) |
|
|
elif key == "chunk_type": |
|
|
if isinstance(value, list): |
|
|
conditions.append({"chunk_type": {"$in": value}}) |
|
|
else: |
|
|
conditions.append({"chunk_type": {"$eq": value}}) |
|
|
elif key == "page": |
|
|
if isinstance(value, dict): |
|
|
|
|
|
if "min" in value: |
|
|
conditions.append({"page": {"$gte": value["min"]}}) |
|
|
if "max" in value: |
|
|
conditions.append({"page": {"$lte": value["max"]}}) |
|
|
else: |
|
|
conditions.append({"page": {"$eq": value}}) |
|
|
elif key == "confidence_min": |
|
|
conditions.append({"confidence": {"$gte": value}}) |
|
|
|
|
|
if len(conditions) == 0: |
|
|
return None |
|
|
elif len(conditions) == 1: |
|
|
return conditions[0] |
|
|
else: |
|
|
return {"$and": conditions} |
|
|
|
|
|
def delete_document(self, document_id: str) -> int: |
|
|
"""Delete all chunks for a document.""" |
|
|
|
|
|
results = self._collection.get( |
|
|
where={"document_id": {"$eq": document_id}}, |
|
|
include=[], |
|
|
) |
|
|
|
|
|
if not results["ids"]: |
|
|
return 0 |
|
|
|
|
|
count = len(results["ids"]) |
|
|
|
|
|
|
|
|
self._collection.delete(ids=results["ids"]) |
|
|
|
|
|
logger.info(f"Deleted {count} chunks for document {document_id}") |
|
|
return count |
|
|
|
|
|
def get_chunk(self, chunk_id: str) -> Optional[Dict[str, Any]]: |
|
|
"""Get a specific chunk by ID.""" |
|
|
results = self._collection.get( |
|
|
ids=[chunk_id], |
|
|
include=["documents", "metadatas"], |
|
|
) |
|
|
|
|
|
if not results["ids"]: |
|
|
return None |
|
|
|
|
|
metadata = results["metadatas"][0] if results["metadatas"] else {} |
|
|
|
|
|
return { |
|
|
"chunk_id": chunk_id, |
|
|
"text": results["documents"][0] if results["documents"] else "", |
|
|
**metadata, |
|
|
} |
|
|
|
|
|
def count(self, document_id: Optional[str] = None) -> int: |
|
|
"""Count chunks in store.""" |
|
|
if document_id: |
|
|
results = self._collection.get( |
|
|
where={"document_id": {"$eq": document_id}}, |
|
|
include=[], |
|
|
) |
|
|
return len(results["ids"]) if results["ids"] else 0 |
|
|
return self._collection.count() |
|
|
|
|
|
def list_documents(self) -> List[str]: |
|
|
"""List all unique document IDs in the store.""" |
|
|
results = self._collection.get(include=["metadatas"]) |
|
|
|
|
|
if not results["metadatas"]: |
|
|
return [] |
|
|
|
|
|
doc_ids = set() |
|
|
for meta in results["metadatas"]: |
|
|
if meta and "document_id" in meta: |
|
|
doc_ids.add(meta["document_id"]) |
|
|
|
|
|
return list(doc_ids) |
|
|
|
|
|
|
|
|
|
|
|
_vector_store: Optional[VectorStore] = None |
|
|
|
|
|
|
|
|
def get_vector_store( |
|
|
config: Optional[VectorStoreConfig] = None, |
|
|
store_type: str = "chromadb", |
|
|
) -> VectorStore: |
|
|
""" |
|
|
Get or create singleton vector store. |
|
|
|
|
|
Args: |
|
|
config: Store configuration |
|
|
store_type: Type of store ("chromadb") |
|
|
|
|
|
Returns: |
|
|
VectorStore instance |
|
|
""" |
|
|
global _vector_store |
|
|
|
|
|
if _vector_store is None: |
|
|
if store_type == "chromadb": |
|
|
_vector_store = ChromaVectorStore(config) |
|
|
else: |
|
|
raise ValueError(f"Unknown store type: {store_type}") |
|
|
|
|
|
return _vector_store |
|
|
|
|
|
|
|
|
def reset_vector_store(): |
|
|
"""Reset the global vector store instance.""" |
|
|
global _vector_store |
|
|
_vector_store = None |
|
|
|