File size: 8,710 Bytes
10b392a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# src/vector_store_manager/chroma_manager.py
from langchain_chroma import Chroma # cite: embed_pipeline.py, query_pipeline.py
from langchain.schema import Document # cite: embed_pipeline.py
from config.settings import PERSIST_DIR, CHROMADB_COLLECTION_NAME # cite: embed_pipeline.py, query_pipeline.py
from src.embedding_generator.embedder import EmbeddingGenerator
import logging
from typing import List, Dict, Any

logger = logging.getLogger(__name__)

class ChromaManager:
    """
    Manages interactions with the ChromaDB vector store.
    """
    def __init__(self, embedding_generator: EmbeddingGenerator):
        self.embedding_generator = embedding_generator
        # --- Financial Ministry Adaptation ---
        # TODO: Configure Chroma client to use a scalable backend (e.g., ClickHouse)
        # instead of or in addition to persistent_directory for production.
        # This might involve using chromadb.HttpClient or specific backend configurations.
        # Handle connection errors and retries to the database backend.
        # Implement authentication/authorization for ChromaDB access.
        # ------------------------------------
        try:
            # Initialize Chroma with the embedding function and persistence settings
            # For production, you might replace persist_directory with client settings
            # pointing to a ClickHouse backend.
            self.vectordb = Chroma(
                persist_directory=PERSIST_DIR, # cite: embed_pipeline.py, query_pipeline.py
                collection_name=CHROMADB_COLLECTION_NAME, # cite: embed_pipeline.py, query_pipeline.py
                embedding_function=self.embedding_generator.embedder # Use the Langchain embedder instance
            )
            logger.info(f"Initialized ChromaDB collection: '{CHROMADB_COLLECTION_NAME}' at '{PERSIST_DIR}'")
            # You might want to check if the collection exists and its health

        except Exception as e:
            logger.critical(f"Failed to initialize ChromaDB: {e}")
            raise e

    def add_documents(self, chunks: List[Document]):
        """
        Adds document chunks to the ChromaDB collection.

        Args:
            chunks: A list of Langchain Document chunks with metadata.
        """
        # --- Financial Ministry Adaptation ---
        # Implement error handling and retry logic for batch additions.
        # Consider transactional behavior if adding large batches requires it.
        # Log successful and failed additions.
        # Ensure document IDs are managed consistently (e.g., based on source + chunk index or a stable hash).
        # ------------------------------------
        try:
            # Langchain's add_documents handles embedding internally using the provided embedding_function
            # Ensure your chunks have unique IDs if you need to update/delete later.
            # If IDs are not in metadata, Langchain/Chroma might generate them.
            # For better control, you might generate IDs in document_processor and pass them here.
            if not chunks:
                logger.warning("No chunks to add to ChromaDB.")
                return

            # If chunks don't have IDs, generate them (simple example)
            # In a real system, use stable IDs based on source data
            # chunk_ids = [f"{chunk.metadata.get('source', 'unknown')}_{i}" for i, chunk in enumerate(chunks)]
            # self.vectordb.add_documents(chunks, ids=chunk_ids)

            self.vectordb.add_documents(chunks) # Langchain handles IDs if not provided
            logger.info(f"Added {len(chunks)} chunks to ChromaDB.")

        except Exception as e:
            logger.error(f"Failed to add documents to ChromaDB: {e}")
            # Implement retry logic or raise exception

    def update_documents(self, ids: List[str], documents: List[str], metadatas: List[Dict[str, Any]]):
         """
         Updates documents in the ChromaDB collection by ID.

         Args:
             ids: List of document IDs to update.
             documents: List of new document content corresponding to IDs.
             metadatas: List of new metadata dictionaries corresponding to IDs.
         """
         # --- Financial Ministry Adaptation ---
         # Implement error handling and retry logic.
         # Validate that IDs exist before attempting to update.
         # ------------------------------------
         try:
             self.vectordb._collection.update( # Accessing the underlying collection for update/delete
                 ids=ids,
                 documents=documents,
                 metadatas=metadatas
             )
             logger.info(f"Updated documents with IDs: {ids}")
         except Exception as e:
             logger.error(f"Failed to update documents with IDs {ids}: {e}")
             raise e


    def delete_documents(self, ids: List[str] = None, where: Dict[str, Any] = None):
        """
        Deletes documents from the ChromaDB collection by ID or metadata filter.

        Args:
            ids: List of document IDs to delete.
            where: A dictionary for metadata filtering (e.g., {"source": "old_file.txt"}).
        """
        # --- Financial Ministry Adaptation ---
        # Implement error handling and retry logic.
        # Add logging to record which documents were deleted and why (if using where).
        # ------------------------------------
        try:
            if ids:
                self.vectordb._collection.delete(ids=ids) # Accessing the underlying collection
                logger.info(f"Deleted documents with IDs: {ids}")
            elif where:
                self.vectordb._collection.delete(where=where) # Accessing the underlying collection
                logger.info(f"Deleted documents matching metadata filter: {where}")
            else:
                logger.warning("Delete called without specifying ids or where filter.")
        except Exception as e:
            logger.error(f"Failed to delete documents (ids: {ids}, where: {where}): {e}")
            raise e

    def get_documents(self, ids: List[str] = None, where: Dict[str, Any] = None,
                      where_document: Dict[str, Any] = None, limit: int = None,
                      offset: int = None, include: List[str] = None) -> Dict[str, List[Any]]:
        """
        Retrieves documents and their details from the ChromaDB collection.

        Args:
            ids: List of document IDs to retrieve.
            where: Metadata filter.
            where_document: Document content filter.
            limit: Maximum number of results.
            offset: Offset for pagination.
            include: List of fields to include (e.g., ['metadatas', 'documents']). IDs are always included.

        Returns:
            A dictionary containing the retrieved data (ids, documents, metadatas, etc.).
        """
        # --- Financial Ministry Adaptation ---
        # Implement error handling and retry logic.
        # Ensure sensitive metadata is handled appropriately if retrieved.
        # ------------------------------------
        try:
            # Default include to metadatas and documents if not specified
            if include is None:
                 include = ['metadatas', 'documents'] # Default as per Chroma docs

            results = self.vectordb._collection.get( # Accessing the underlying collection
                ids=ids,
                where=where,
                where_document=where_document,
                limit=limit,
                offset=offset,
                include=include
            )
            logger.debug(f"Retrieved {len(results.get('ids', []))} documents from ChromaDB.")
            return results
        except Exception as e:
            logger.error(f"Failed to retrieve documents from ChromaDB: {e}")
            raise e

    def as_retriever(self, search_kwargs: Dict[str, Any] = None):
        """
        Returns a Langchain Retriever instance for the Chroma collection.

        Args:
            search_kwargs: Arguments for the retriever (e.g., {"k": 5}).

        Returns:
            A Langchain Retriever.
        """
        # --- Financial Ministry Adaptation ---
        # Consider adding default search_kwargs here if not provided.
        # Ensure the retriever uses the configured embedding function.
        # ------------------------------------
        if search_kwargs is None:
             search_kwargs = {}
        # Langchain's .as_retriever method automatically uses the embedding_function
        # provided during Chroma initialization.
        return self.vectordb.as_retriever(search_kwargs=search_kwargs) # cite: query_pipeline.py