#!/usr/bin/env python3 import os import glob from typing import List from multiprocessing import Pool from tqdm import tqdm from langchain.document_loaders import ( CSVLoader, EverNoteLoader, PyMuPDFLoader, TextLoader, UnstructuredEPubLoader, UnstructuredHTMLLoader, UnstructuredMarkdownLoader, UnstructuredODTLoader, UnstructuredPowerPointLoader, UnstructuredWordDocumentLoader, ) from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Chroma from langchain.embeddings.openai import OpenAIEmbeddings from langchain.docstore.document import Document from config import ( CHROMA_SETTINGS, DOCUMENTS_PATH, PERSIST_DIRECTORY, CHUNK_SIZE, CHUNK_OVERLAP, ) # Map file extensions to document loaders and their arguments LOADER_MAPPING = { ".csv": (CSVLoader, {}), ".doc": (UnstructuredWordDocumentLoader, {}), ".docx": (UnstructuredWordDocumentLoader, {}), ".enex": (EverNoteLoader, {}), ".epub": (UnstructuredEPubLoader, {}), ".html": (UnstructuredHTMLLoader, {}), ".md": (UnstructuredMarkdownLoader, {}), ".odt": (UnstructuredODTLoader, {}), ".pdf": (PyMuPDFLoader, {}), ".ppt": (UnstructuredPowerPointLoader, {}), ".pptx": (UnstructuredPowerPointLoader, {}), ".txt": (TextLoader, {"encoding": "utf8"}), } def load_single_document(file_path: str) -> List[Document]: print(file_path) ext = "." + file_path.rsplit(".", 1)[-1] if ext in LOADER_MAPPING: loader_class, loader_args = LOADER_MAPPING[ext] loader = loader_class(file_path, **loader_args) return loader.load() raise ValueError(f"Unsupported file extension '{ext}'") def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]: """ Loads all documents from the source documents directory, ignoring specified files """ all_files = [] for ext in LOADER_MAPPING: all_files.extend( glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True) ) filtered_files = [ file_path for file_path in all_files if file_path not in ignored_files ] with Pool(processes=os.cpu_count()) as pool: results = [] with tqdm( total=len(filtered_files), desc="Loading new documents", ncols=80 ) as pbar: for i, docs in enumerate( pool.imap_unordered(load_single_document, filtered_files) ): results.extend(docs) pbar.update() return results def process_documents(ignored_files: List[str] = []) -> List[Document]: """ Load documents and split in chunks """ print(f"Loading documents from {DOCUMENTS_PATH}") documents = load_documents(DOCUMENTS_PATH, ignored_files) if not documents: print("No new documents to load") return [] print(f"Loaded {len(documents)} new documents from {DOCUMENTS_PATH}") text_splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) texts = text_splitter.split_documents(documents) print(f"Split into {len(texts)} chunks of text (max. {CHUNK_SIZE} tokens each)") return texts def does_vectorstore_exist(persist_directory: str) -> bool: """ Checks if vectorstore exists """ if os.path.exists(os.path.join(persist_directory, "index")): if os.path.exists( os.path.join(persist_directory, "chroma-collections.parquet") ) and os.path.exists( os.path.join(persist_directory, "chroma-embeddings.parquet") ): list_index_files = glob.glob(os.path.join(persist_directory, "index/*.bin")) list_index_files += glob.glob( os.path.join(persist_directory, "index/*.pkl") ) # At least 3 documents are needed in a working vectorstore if len(list_index_files) > 3: return True return False def create_vectorstore(): # Create embeddings embeddings = OpenAIEmbeddings() if does_vectorstore_exist(PERSIST_DIRECTORY): # Update and store locally vectorstore print(f"Appending to existing vectorstore at {PERSIST_DIRECTORY}") db = Chroma( persist_directory=PERSIST_DIRECTORY, embedding_function=embeddings, client_settings=CHROMA_SETTINGS, ) collection = db.get() texts = process_documents( [metadata["source"] for metadata in collection["metadatas"]] ) if not texts: return print(f"Creating embeddings. May take some minutes...") db.add_documents(texts) else: # Create and store locally vectorstore print("Creating new vectorstore") texts = process_documents() if not texts: return print(f"Creating embeddings. May take some minutes...") db = Chroma.from_documents( texts, embeddings, persist_directory=PERSIST_DIRECTORY, client_settings=CHROMA_SETTINGS, ) db.persist() db = None print(f"Ingestion complete!")