aelitta's picture
Upload folder using huggingface_hub
4bdb245 verified
#!/usr/bin/env python3
import argparse
import logging
from pathlib import Path
from private_gpt.di import global_injector
from private_gpt.server.ingest.ingest_service import IngestService
from private_gpt.server.ingest.ingest_watcher import IngestWatcher
logger = logging.getLogger(__name__)
class LocalIngestWorker:
def __init__(self, ingest_service: IngestService) -> None:
self.ingest_service = ingest_service
self.total_documents = 0
self.current_document_count = 0
self._files_under_root_folder: list[Path] = []
def _find_all_files_in_folder(self, root_path: Path, ignored: list[str]) -> None:
"""Search all files under the root folder recursively.
Count them at the same time
"""
for file_path in root_path.iterdir():
if file_path.is_file() and file_path.name not in ignored:
self.total_documents += 1
self._files_under_root_folder.append(file_path)
elif file_path.is_dir() and file_path.name not in ignored:
self._find_all_files_in_folder(file_path, ignored)
def ingest_folder(self, folder_path: Path, ignored: list[str]) -> None:
# Count total documents before ingestion
self._find_all_files_in_folder(folder_path, ignored)
self._ingest_all(self._files_under_root_folder)
def _ingest_all(self, files_to_ingest: list[Path]) -> None:
logger.info("Ingesting files=%s", [f.name for f in files_to_ingest])
self.ingest_service.bulk_ingest([(str(p.name), p) for p in files_to_ingest])
def ingest_on_watch(self, changed_path: Path) -> None:
logger.info("Detected change in at path=%s, ingesting", changed_path)
self._do_ingest_one(changed_path)
def _do_ingest_one(self, changed_path: Path) -> None:
try:
if changed_path.exists():
logger.info(f"Started ingesting file={changed_path}")
self.ingest_service.ingest_file(changed_path.name, changed_path)
logger.info(f"Completed ingesting file={changed_path}")
except Exception:
logger.exception(
f"Failed to ingest document: {changed_path}, find the exception attached"
)
parser = argparse.ArgumentParser(prog="ingest_folder.py")
parser.add_argument("folder", help="Folder to ingest")
parser.add_argument(
"--watch",
help="Watch for changes",
action=argparse.BooleanOptionalAction,
default=False,
)
parser.add_argument(
"--ignored",
nargs="*",
help="List of files/directories to ignore",
default=[],
)
parser.add_argument(
"--log-file",
help="Optional path to a log file. If provided, logs will be written to this file.",
type=str,
default=None,
)
args = parser.parse_args()
# Set up logging to a file if a path is provided
if args.log_file:
file_handler = logging.FileHandler(args.log_file, mode="a")
file_handler.setFormatter(
logging.Formatter(
"[%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
logger.addHandler(file_handler)
if __name__ == "__main__":
root_path = Path(args.folder)
if not root_path.exists():
raise ValueError(f"Path {args.folder} does not exist")
ingest_service = global_injector.get(IngestService)
worker = LocalIngestWorker(ingest_service)
worker.ingest_folder(root_path, args.ignored)
if args.ignored:
logger.info(f"Skipping following files and directories: {args.ignored}")
if args.watch:
logger.info(f"Watching {args.folder} for changes, press Ctrl+C to stop...")
directories_to_watch = [
dir
for dir in root_path.iterdir()
if dir.is_dir() and dir.name not in args.ignored
]
watcher = IngestWatcher(args.folder, worker.ingest_on_watch)
watcher.start()