Amin23's picture
initial commit
ec2497b
import chromadb
from chromadb.config import Settings as ChromaSettings
from typing import List, Dict, Optional, Tuple
import json
import logging
from app.core.config import settings
logger = logging.getLogger(__name__)
class VectorStore:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(VectorStore, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not self._initialized:
self.client = chromadb.PersistentClient(
path=settings.CHROMA_PERSIST_DIRECTORY,
settings=ChromaSettings(
anonymized_telemetry=False
)
)
self.collection_name = "pdf_documents"
self.collection = self._get_or_create_collection()
self._initialized = True
def _get_or_create_collection(self):
"""Get existing collection or create new one"""
try:
collection = self.client.get_collection(name=self.collection_name)
logger.info(f"Using existing collection: {self.collection_name}")
except Exception:
collection = self.client.create_collection(
name=self.collection_name,
metadata={"description": "PDF document embeddings for Q&A chatbot"}
)
logger.info(f"Created new collection: {self.collection_name}")
return collection
def add_document(self, document_id: str, content: str, metadata: Dict = None) -> bool:
"""Add document content to vector store"""
try:
logger.info(f"Starting to add document {document_id} to vector store")
logger.info(f"Content length: {len(content)} characters")
# Split content into chunks for better retrieval
chunks = self._split_text(content, chunk_size=1000, overlap=200)
logger.info(f"Split content into {len(chunks)} chunks")
# Prepare data for ChromaDB
ids = [f"{document_id}_chunk_{i}" for i in range(len(chunks))]
documents = chunks
metadatas = [{
"document_id": document_id,
"chunk_index": i,
**(metadata or {})
} for i in range(len(chunks))]
logger.info(f"Prepared {len(ids)} chunks with IDs: {ids[:3]}...") # Log first 3 IDs
# Add to collection
logger.info(f"Adding chunks to ChromaDB collection: {self.collection_name}")
self.collection.add(
ids=ids,
documents=documents,
metadatas=metadatas
)
logger.info(f"Successfully added document {document_id} with {len(chunks)} chunks to vector store")
return True
except Exception as e:
logger.error(f"Error adding document {document_id} to vector store: {e}")
logger.error(f"Exception type: {type(e).__name__}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False
def search_similar(self, query: str, n_results: int = 5, document_id: str = None) -> List[Dict]:
"""Search for similar documents based on query, optionally filtering by document_id"""
try:
results = self.collection.query(
query_texts=[query],
n_results=n_results,
include=["documents", "metadatas", "distances"]
)
# Format results
formatted_results = []
if results['documents'] and results['documents'][0]:
for i, (doc, metadata, distance) in enumerate(zip(
results['documents'][0],
results['metadatas'][0],
results['distances'][0]
)):
if document_id is not None and str(metadata.get('document_id')) != str(document_id):
continue
formatted_results.append({
'content': doc,
'metadata': metadata,
'similarity_score': 1 - distance, # Convert distance to similarity
'rank': i + 1
})
return formatted_results
except Exception as e:
logger.error(f"Error searching vector store: {e}")
return []
def delete_document(self, document_id: str) -> bool:
"""Delete all chunks for a specific document"""
try:
# Get all chunks for this document
results = self.collection.get(
where={"document_id": document_id}
)
if results['ids']:
self.collection.delete(ids=results['ids'])
logger.info(f"Deleted {len(results['ids'])} chunks for document {document_id}")
return True
except Exception as e:
logger.error(f"Error deleting document {document_id} from vector store: {e}")
return False
def get_collection_stats(self) -> Dict:
"""Get statistics about the vector store collection"""
try:
logger.info(f"Getting stats for collection: {self.collection_name}")
count = self.collection.count()
logger.info(f"Collection count: {count}")
return {
"total_documents": count,
"collection_name": self.collection_name
}
except Exception as e:
logger.error(f"Error getting collection stats: {e}")
logger.error(f"Exception type: {type(e).__name__}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return {"total_documents": 0, "collection_name": self.collection_name}
def _split_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]:
"""Split text into overlapping chunks"""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
# If this isn't the last chunk, try to break at a sentence boundary
if end < len(text):
# Look for sentence endings
for i in range(end, max(start + chunk_size - 100, start), -1):
if text[i] in '.!?':
end = i + 1
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Move start position with overlap
start = end - overlap
if start >= len(text):
break
return chunks
def clear_all(self) -> bool:
"""Clear all documents from the vector store"""
try:
self.client.delete_collection(name=self.collection_name)
self.collection = self._get_or_create_collection()
logger.info("Cleared all documents from vector store")
return True
except Exception as e:
logger.error(f"Error clearing vector store: {e}")
return False
@classmethod
def reset_instance(cls):
"""Reset the singleton instance - useful after clearing collections"""
cls._instance = None