import chromadb from chromadb.config import Settings as ChromaSettings from typing import List, Dict, Optional, Tuple import json import logging from app.core.config import settings logger = logging.getLogger(__name__) class VectorStore: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(VectorStore, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if not self._initialized: self.client = chromadb.PersistentClient( path=settings.CHROMA_PERSIST_DIRECTORY, settings=ChromaSettings( anonymized_telemetry=False ) ) self.collection_name = "pdf_documents" self.collection = self._get_or_create_collection() self._initialized = True def _get_or_create_collection(self): """Get existing collection or create new one""" try: collection = self.client.get_collection(name=self.collection_name) logger.info(f"Using existing collection: {self.collection_name}") except Exception: collection = self.client.create_collection( name=self.collection_name, metadata={"description": "PDF document embeddings for Q&A chatbot"} ) logger.info(f"Created new collection: {self.collection_name}") return collection def add_document(self, document_id: str, content: str, metadata: Dict = None) -> bool: """Add document content to vector store""" try: logger.info(f"Starting to add document {document_id} to vector store") logger.info(f"Content length: {len(content)} characters") # Split content into chunks for better retrieval chunks = self._split_text(content, chunk_size=1000, overlap=200) logger.info(f"Split content into {len(chunks)} chunks") # Prepare data for ChromaDB ids = [f"{document_id}_chunk_{i}" for i in range(len(chunks))] documents = chunks metadatas = [{ "document_id": document_id, "chunk_index": i, **(metadata or {}) } for i in range(len(chunks))] logger.info(f"Prepared {len(ids)} chunks with IDs: {ids[:3]}...") # Log first 3 IDs # Add to collection logger.info(f"Adding chunks to ChromaDB collection: {self.collection_name}") self.collection.add( ids=ids, documents=documents, metadatas=metadatas ) logger.info(f"Successfully added document {document_id} with {len(chunks)} chunks to vector store") return True except Exception as e: logger.error(f"Error adding document {document_id} to vector store: {e}") logger.error(f"Exception type: {type(e).__name__}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return False def search_similar(self, query: str, n_results: int = 5, document_id: str = None) -> List[Dict]: """Search for similar documents based on query, optionally filtering by document_id""" try: results = self.collection.query( query_texts=[query], n_results=n_results, include=["documents", "metadatas", "distances"] ) # Format results formatted_results = [] if results['documents'] and results['documents'][0]: for i, (doc, metadata, distance) in enumerate(zip( results['documents'][0], results['metadatas'][0], results['distances'][0] )): if document_id is not None and str(metadata.get('document_id')) != str(document_id): continue formatted_results.append({ 'content': doc, 'metadata': metadata, 'similarity_score': 1 - distance, # Convert distance to similarity 'rank': i + 1 }) return formatted_results except Exception as e: logger.error(f"Error searching vector store: {e}") return [] def delete_document(self, document_id: str) -> bool: """Delete all chunks for a specific document""" try: # Get all chunks for this document results = self.collection.get( where={"document_id": document_id} ) if results['ids']: self.collection.delete(ids=results['ids']) logger.info(f"Deleted {len(results['ids'])} chunks for document {document_id}") return True except Exception as e: logger.error(f"Error deleting document {document_id} from vector store: {e}") return False def get_collection_stats(self) -> Dict: """Get statistics about the vector store collection""" try: logger.info(f"Getting stats for collection: {self.collection_name}") count = self.collection.count() logger.info(f"Collection count: {count}") return { "total_documents": count, "collection_name": self.collection_name } except Exception as e: logger.error(f"Error getting collection stats: {e}") logger.error(f"Exception type: {type(e).__name__}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return {"total_documents": 0, "collection_name": self.collection_name} def _split_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]: """Split text into overlapping chunks""" if len(text) <= chunk_size: return [text] chunks = [] start = 0 while start < len(text): end = start + chunk_size # If this isn't the last chunk, try to break at a sentence boundary if end < len(text): # Look for sentence endings for i in range(end, max(start + chunk_size - 100, start), -1): if text[i] in '.!?': end = i + 1 break chunk = text[start:end].strip() if chunk: chunks.append(chunk) # Move start position with overlap start = end - overlap if start >= len(text): break return chunks def clear_all(self) -> bool: """Clear all documents from the vector store""" try: self.client.delete_collection(name=self.collection_name) self.collection = self._get_or_create_collection() logger.info("Cleared all documents from vector store") return True except Exception as e: logger.error(f"Error clearing vector store: {e}") return False @classmethod def reset_instance(cls): """Reset the singleton instance - useful after clearing collections""" cls._instance = None