""" Vector Storage Module Handles storing chunks and embeddings in Qdrant vector database. """ import numpy as np from typing import List from pathlib import Path from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct class VectorStorage: """Handles vector storage operations with Qdrant.""" def __init__(self, base_db_path: Path): """ Initialize the vector storage. Args: base_db_path: Base path for storing Qdrant databases """ self.base_db_path = base_db_path async def store_in_qdrant(self, chunks: List[str], embeddings: np.ndarray, doc_id: str): """ Store chunks and embeddings in Qdrant. Args: chunks: List of text chunks embeddings: Corresponding embeddings array doc_id: Document identifier """ if len(chunks) != embeddings.shape[0]: raise ValueError(f"Chunk count ({len(chunks)}) doesn't match embedding count ({embeddings.shape[0]})") collection_name = f"{doc_id}_collection" db_path = self.base_db_path / f"{collection_name}.db" client = QdrantClient(path=str(db_path)) print(f"💾 Storing {len(chunks)} vectors in collection: {collection_name}") try: # Create or recreate collection await self._setup_collection(client, collection_name, embeddings.shape[1]) # Prepare and upload points await self._upload_points(client, collection_name, chunks, embeddings, doc_id) print(f"✅ Successfully stored all vectors in Qdrant") finally: client.close() async def _setup_collection(self, client: QdrantClient, collection_name: str, embedding_dim: int): """ Set up Qdrant collection, recreating if it exists. Args: client: Qdrant client collection_name: Name of the collection embedding_dim: Dimension of embeddings """ # Delete existing collection if it exists try: client.delete_collection(collection_name) print(f"🗑️ Deleted existing collection: {collection_name}") except Exception: pass # Collection might not exist # Create new collection client.create_collection( collection_name=collection_name, vectors_config=VectorParams( size=embedding_dim, distance=Distance.COSINE ) ) print(f"✅ Created new collection: {collection_name}") async def _upload_points(self, client: QdrantClient, collection_name: str, chunks: List[str], embeddings: np.ndarray, doc_id: str): """ Upload points to Qdrant collection in batches. Args: client: Qdrant client collection_name: Name of the collection chunks: Text chunks embeddings: Embedding vectors doc_id: Document identifier """ # Prepare points points = [] for i in range(len(chunks)): points.append( PointStruct( id=i, vector=embeddings[i].tolist(), payload={ "text": chunks[i], "chunk_id": i, "doc_id": doc_id, "char_count": len(chunks[i]), "word_count": len(chunks[i].split()) } ) ) # Upload in batches to handle large documents batch_size = 100 total_batches = (len(points) + batch_size - 1) // batch_size for i in range(0, len(points), batch_size): batch = points[i:i + batch_size] batch_num = (i // batch_size) + 1 print(f" Uploading batch {batch_num}/{total_batches} ({len(batch)} points)") client.upsert(collection_name=collection_name, points=batch) print(f"✅ Uploaded {len(points)} points in {total_batches} batches") def collection_exists(self, doc_id: str) -> bool: """ Check if a collection exists for the given document ID. Args: doc_id: Document identifier Returns: bool: True if collection exists, False otherwise """ collection_name = f"{doc_id}_collection" db_path = self.base_db_path / f"{collection_name}.db" return db_path.exists() def get_collection_info(self, doc_id: str) -> dict: """ Get information about a collection. Args: doc_id: Document identifier Returns: dict: Collection information """ collection_name = f"{doc_id}_collection" db_path = self.base_db_path / f"{collection_name}.db" if not db_path.exists(): return { "collection_name": collection_name, "exists": False, "path": str(db_path) } try: client = QdrantClient(path=str(db_path)) try: collection_info = client.get_collection(collection_name) return { "collection_name": collection_name, "exists": True, "path": str(db_path), "vectors_count": collection_info.vectors_count, "status": collection_info.status } finally: client.close() except Exception as e: return { "collection_name": collection_name, "exists": True, "path": str(db_path), "error": str(e) } def delete_collection(self, doc_id: str) -> bool: """ Delete a collection and its database file. Args: doc_id: Document identifier Returns: bool: True if successfully deleted, False otherwise """ collection_name = f"{doc_id}_collection" db_path = self.base_db_path / f"{collection_name}.db" try: if db_path.exists(): # Try to delete collection properly first try: client = QdrantClient(path=str(db_path)) client.delete_collection(collection_name) client.close() except Exception: pass # Collection might not exist or be corrupted # Remove database directory import shutil shutil.rmtree(db_path, ignore_errors=True) print(f"🗑️ Deleted collection: {collection_name}") return True except Exception as e: print(f"❌ Error deleting collection {collection_name}: {e}") return False return True # Nothing to delete