# src/ingestion_orchestrator/orchestrator.py from src.data_loader.loader import load_documents from src.document_processor.processor import process_documents from src.embedding_generator.embedder import EmbeddingGenerator from src.vector_store_manager.chroma_manager import ChromaManager from config.settings import DOCS_FOLDER import logging logger = logging.getLogger(__name__) class IngestionOrchestrator: """ Orchestrates the end-to-end data ingestion pipeline. """ def __init__(self): # Initialize the necessary components try: self.embedding_generator = EmbeddingGenerator() self.vector_store_manager = ChromaManager(self.embedding_generator) logger.info("Initialized ingestion orchestrator components.") except Exception as e: logger.critical(f"Failed to initialize ingestion orchestrator components: {e}") raise e def run_ingestion_pipeline(self, docs_folder: str = DOCS_FOLDER): """ Runs the complete ingestion pipeline: loads, processes, and embeds documents. Args: docs_folder: The folder containing the source documents. """ logger.info(f"Starting ingestion pipeline from folder: {docs_folder}") # 1. Load documents # --- Financial Ministry Adaptation --- # Implement logic to identify *new* or *modified* documents # instead of reloading everything each time for efficiency. # Handle potential large number of files efficiently. # ------------------------------------ raw_documents = load_documents(docs_folder) if not raw_documents: logger.warning("No documents loaded. Ingestion pipeline finished.") return # 2. Process documents (split and extract metadata) processed_chunks = process_documents(raw_documents) if not processed_chunks: logger.warning("No processed chunks generated. Ingestion pipeline finished.") return # 3. Add documents to the vector store # The add_documents method handles embedding internally # --- Financial Ministry Adaptation --- # Implement logic for updating or deleting documents if the source data changed. # This requires comparing current source data with what's in ChromaDB (e.g., by source path and modification date or version). # Use the vector_store_manager's update_documents and delete_documents methods. # Implement batching for adding documents to avoid overwhelming ChromaDB or the backend. # ------------------------------------ self.vector_store_manager.add_documents(processed_chunks) logger.info("Ingestion pipeline finished successfully.") # --- Financial Ministry Adaptation --- # TODO: Add methods for handling updates and deletions specifically. # def update_changed_documents(self, changed_files: List[str]): pass # def delete_removed_documents(self, removed_files: List[str]): pass # ------------------------------------