#!/usr/bin/env python3 import argparse import logging from pathlib import Path from private_gpt.di import global_injector from private_gpt.server.ingest.ingest_service import IngestService from private_gpt.server.ingest.ingest_watcher import IngestWatcher logger = logging.getLogger(__name__) class LocalIngestWorker: def __init__(self, ingest_service: IngestService) -> None: self.ingest_service = ingest_service self.total_documents = 0 self.current_document_count = 0 self._files_under_root_folder: list[Path] = [] def _find_all_files_in_folder(self, root_path: Path, ignored: list[str]) -> None: """Search all files under the root folder recursively. Count them at the same time """ for file_path in root_path.iterdir(): if file_path.is_file() and file_path.name not in ignored: self.total_documents += 1 self._files_under_root_folder.append(file_path) elif file_path.is_dir() and file_path.name not in ignored: self._find_all_files_in_folder(file_path, ignored) def ingest_folder(self, folder_path: Path, ignored: list[str]) -> None: # Count total documents before ingestion self._find_all_files_in_folder(folder_path, ignored) self._ingest_all(self._files_under_root_folder) def _ingest_all(self, files_to_ingest: list[Path]) -> None: logger.info("Ingesting files=%s", [f.name for f in files_to_ingest]) self.ingest_service.bulk_ingest([(str(p.name), p) for p in files_to_ingest]) def ingest_on_watch(self, changed_path: Path) -> None: logger.info("Detected change in at path=%s, ingesting", changed_path) self._do_ingest_one(changed_path) def _do_ingest_one(self, changed_path: Path) -> None: try: if changed_path.exists(): logger.info(f"Started ingesting file={changed_path}") self.ingest_service.ingest_file(changed_path.name, changed_path) logger.info(f"Completed ingesting file={changed_path}") except Exception: logger.exception( f"Failed to ingest document: {changed_path}, find the exception attached" ) parser = argparse.ArgumentParser(prog="ingest_folder.py") parser.add_argument("folder", help="Folder to ingest") parser.add_argument( "--watch", help="Watch for changes", action=argparse.BooleanOptionalAction, default=False, ) parser.add_argument( "--ignored", nargs="*", help="List of files/directories to ignore", default=[], ) parser.add_argument( "--log-file", help="Optional path to a log file. If provided, logs will be written to this file.", type=str, default=None, ) args = parser.parse_args() # Set up logging to a file if a path is provided if args.log_file: file_handler = logging.FileHandler(args.log_file, mode="a") file_handler.setFormatter( logging.Formatter( "[%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) ) logger.addHandler(file_handler) if __name__ == "__main__": root_path = Path(args.folder) if not root_path.exists(): raise ValueError(f"Path {args.folder} does not exist") ingest_service = global_injector.get(IngestService) worker = LocalIngestWorker(ingest_service) worker.ingest_folder(root_path, args.ignored) if args.ignored: logger.info(f"Skipping following files and directories: {args.ignored}") if args.watch: logger.info(f"Watching {args.folder} for changes, press Ctrl+C to stop...") directories_to_watch = [ dir for dir in root_path.iterdir() if dir.is_dir() and dir.name not in args.ignored ] watcher = IngestWatcher(args.folder, worker.ingest_on_watch) watcher.start()