File size: 7,650 Bytes
e22dcc4
 
ec2497b
e22dcc4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec2497b
 
 
 
 
 
e22dcc4
ec2497b
e22dcc4
ec2497b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e22dcc4
 
 
 
ec2497b
e22dcc4
 
 
ec2497b
e22dcc4
 
 
ec2497b
 
 
 
 
 
e22dcc4
ec2497b
e22dcc4
 
ec2497b
e22dcc4
 
ec2497b
e22dcc4
ec2497b
e22dcc4
 
ec2497b
e22dcc4
 
 
 
 
 
ec2497b
 
e22dcc4
 
ec2497b
e22dcc4
 
 
 
ec2497b
e22dcc4
 
ec2497b
 
 
 
 
 
 
 
 
 
 
 
 
 
e22dcc4
 
 
 
ec2497b
e22dcc4
 
 
 
ec2497b
e22dcc4
 
ec2497b
e22dcc4
ec2497b
e22dcc4
ec2497b
e22dcc4
ec2497b
e22dcc4
 
 
ec2497b
e22dcc4
 
 
 
ec2497b
e22dcc4
 
 
 
 
 
 
 
 
 
 
ec2497b
e22dcc4
 
 
 
ec2497b
e22dcc4
 
ec2497b
e22dcc4
 
ec2497b
e22dcc4
 
 
 
 
 
 
ec2497b
e22dcc4
 
 
ec2497b
e22dcc4
 
 
 
ec2497b
e22dcc4
ec2497b
e22dcc4
 
 
ec2497b
 
 
e22dcc4
 
ec2497b
1d00b72
ec2497b
e22dcc4
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import chromadb
from chromadb.config import Settings as ChromaSettings
from typing import List, Dict, Optional, Tuple
import json
import logging
from app.core.config import settings

logger = logging.getLogger(__name__)


class VectorStore:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(VectorStore, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if not self._initialized:
            self.client = chromadb.PersistentClient(
                path=settings.CHROMA_PERSIST_DIRECTORY,
                settings=ChromaSettings(
                    anonymized_telemetry=False
                )
            )
            self.collection_name = "pdf_documents"
            self.collection = self._get_or_create_collection()
            self._initialized = True
    
    def _get_or_create_collection(self):
        """Get existing collection or create new one"""
        try:
            collection = self.client.get_collection(name=self.collection_name)
            logger.info(f"Using existing collection: {self.collection_name}")
        except Exception:
            collection = self.client.create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for Q&A chatbot"}
            )
            logger.info(f"Created new collection: {self.collection_name}")
        
        return collection
    
    def add_document(self, document_id: str, content: str, metadata: Dict = None) -> bool:
        """Add document content to vector store"""
        try:
            logger.info(f"Starting to add document {document_id} to vector store")
            logger.info(f"Content length: {len(content)} characters")
            
            # Split content into chunks for better retrieval
            chunks = self._split_text(content, chunk_size=1000, overlap=200)
            logger.info(f"Split content into {len(chunks)} chunks")
            
            # Prepare data for ChromaDB
            ids = [f"{document_id}_chunk_{i}" for i in range(len(chunks))]
            documents = chunks
            metadatas = [{
                "document_id": document_id,
                "chunk_index": i,
                **(metadata or {})
            } for i in range(len(chunks))]
            
            logger.info(f"Prepared {len(ids)} chunks with IDs: {ids[:3]}...")  # Log first 3 IDs
            
            # Add to collection
            logger.info(f"Adding chunks to ChromaDB collection: {self.collection_name}")
            self.collection.add(
                ids=ids,
                documents=documents,
                metadatas=metadatas
            )
            
            logger.info(f"Successfully added document {document_id} with {len(chunks)} chunks to vector store")
            return True
            
        except Exception as e:
            logger.error(f"Error adding document {document_id} to vector store: {e}")
            logger.error(f"Exception type: {type(e).__name__}")
            import traceback
            logger.error(f"Full traceback: {traceback.format_exc()}")
            return False
    
    def search_similar(self, query: str, n_results: int = 5, document_id: str = None) -> List[Dict]:
        """Search for similar documents based on query, optionally filtering by document_id"""
        try:
            results = self.collection.query(
                query_texts=[query],
                n_results=n_results,
                include=["documents", "metadatas", "distances"]
            )
            
            # Format results
            formatted_results = []
            if results['documents'] and results['documents'][0]:
                for i, (doc, metadata, distance) in enumerate(zip(
                    results['documents'][0],
                    results['metadatas'][0],
                    results['distances'][0]
                )):
                    if document_id is not None and str(metadata.get('document_id')) != str(document_id):
                        continue
                    formatted_results.append({
                        'content': doc,
                        'metadata': metadata,
                        'similarity_score': 1 - distance,  # Convert distance to similarity
                        'rank': i + 1
                    })
            return formatted_results
        except Exception as e:
            logger.error(f"Error searching vector store: {e}")
            return []
    
    def delete_document(self, document_id: str) -> bool:
        """Delete all chunks for a specific document"""
        try:
            # Get all chunks for this document
            results = self.collection.get(
                where={"document_id": document_id}
            )
            
            if results['ids']:
                self.collection.delete(ids=results['ids'])
                logger.info(f"Deleted {len(results['ids'])} chunks for document {document_id}")
            
            return True
            
        except Exception as e:
            logger.error(f"Error deleting document {document_id} from vector store: {e}")
            return False
    
    def get_collection_stats(self) -> Dict:
        """Get statistics about the vector store collection"""
        try:
            logger.info(f"Getting stats for collection: {self.collection_name}")
            count = self.collection.count()
            logger.info(f"Collection count: {count}")
            return {
                "total_documents": count,
                "collection_name": self.collection_name
            }
        except Exception as e:
            logger.error(f"Error getting collection stats: {e}")
            logger.error(f"Exception type: {type(e).__name__}")
            import traceback
            logger.error(f"Full traceback: {traceback.format_exc()}")
            return {"total_documents": 0, "collection_name": self.collection_name}
    
    def _split_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]:
        """Split text into overlapping chunks"""
        if len(text) <= chunk_size:
            return [text]
        
        chunks = []
        start = 0
        
        while start < len(text):
            end = start + chunk_size
            
            # If this isn't the last chunk, try to break at a sentence boundary
            if end < len(text):
                # Look for sentence endings
                for i in range(end, max(start + chunk_size - 100, start), -1):
                    if text[i] in '.!?':
                        end = i + 1
                        break
            
            chunk = text[start:end].strip()
            if chunk:
                chunks.append(chunk)
            
            # Move start position with overlap
            start = end - overlap
            if start >= len(text):
                break
        
        return chunks
    
    def clear_all(self) -> bool:
        """Clear all documents from the vector store"""
        try:
            self.client.delete_collection(name=self.collection_name)
            self.collection = self._get_or_create_collection()
            logger.info("Cleared all documents from vector store")
            return True
        except Exception as e:
            logger.error(f"Error clearing vector store: {e}")
            return False
    
    @classmethod
    def reset_instance(cls):
        """Reset the singleton instance - useful after clearing collections"""
        cls._instance = None