Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
import os | |
import glob | |
from typing import List | |
from multiprocessing import Pool | |
from tqdm import tqdm | |
from langchain.document_loaders import ( | |
CSVLoader, | |
EverNoteLoader, | |
PyMuPDFLoader, | |
TextLoader, | |
UnstructuredEPubLoader, | |
UnstructuredHTMLLoader, | |
UnstructuredMarkdownLoader, | |
UnstructuredODTLoader, | |
UnstructuredPowerPointLoader, | |
UnstructuredWordDocumentLoader, | |
) | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.docstore.document import Document | |
from config import ( | |
CHROMA_SETTINGS, | |
DOCUMENTS_PATH, | |
PERSIST_DIRECTORY, | |
CHUNK_SIZE, | |
CHUNK_OVERLAP, | |
) | |
# Map file extensions to document loaders and their arguments | |
LOADER_MAPPING = { | |
".csv": (CSVLoader, {}), | |
".doc": (UnstructuredWordDocumentLoader, {}), | |
".docx": (UnstructuredWordDocumentLoader, {}), | |
".enex": (EverNoteLoader, {}), | |
".epub": (UnstructuredEPubLoader, {}), | |
".html": (UnstructuredHTMLLoader, {}), | |
".md": (UnstructuredMarkdownLoader, {}), | |
".odt": (UnstructuredODTLoader, {}), | |
".pdf": (PyMuPDFLoader, {}), | |
".ppt": (UnstructuredPowerPointLoader, {}), | |
".pptx": (UnstructuredPowerPointLoader, {}), | |
".txt": (TextLoader, {"encoding": "utf8"}), | |
} | |
def load_single_document(file_path: str) -> List[Document]: | |
print(file_path) | |
ext = "." + file_path.rsplit(".", 1)[-1] | |
if ext in LOADER_MAPPING: | |
loader_class, loader_args = LOADER_MAPPING[ext] | |
loader = loader_class(file_path, **loader_args) | |
return loader.load() | |
raise ValueError(f"Unsupported file extension '{ext}'") | |
def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]: | |
""" | |
Loads all documents from the source documents directory, ignoring specified files | |
""" | |
all_files = [] | |
for ext in LOADER_MAPPING: | |
all_files.extend( | |
glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True) | |
) | |
filtered_files = [ | |
file_path for file_path in all_files if file_path not in ignored_files | |
] | |
with Pool(processes=os.cpu_count()) as pool: | |
results = [] | |
with tqdm( | |
total=len(filtered_files), desc="Loading new documents", ncols=80 | |
) as pbar: | |
for i, docs in enumerate( | |
pool.imap_unordered(load_single_document, filtered_files) | |
): | |
results.extend(docs) | |
pbar.update() | |
return results | |
def process_documents(ignored_files: List[str] = []) -> List[Document]: | |
""" | |
Load documents and split in chunks | |
""" | |
print(f"Loading documents from {DOCUMENTS_PATH}") | |
documents = load_documents(DOCUMENTS_PATH, ignored_files) | |
if not documents: | |
print("No new documents to load") | |
return [] | |
print(f"Loaded {len(documents)} new documents from {DOCUMENTS_PATH}") | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP | |
) | |
texts = text_splitter.split_documents(documents) | |
print(f"Split into {len(texts)} chunks of text (max. {CHUNK_SIZE} tokens each)") | |
return texts | |
def does_vectorstore_exist(persist_directory: str) -> bool: | |
""" | |
Checks if vectorstore exists | |
""" | |
if os.path.exists(os.path.join(persist_directory, "index")): | |
if os.path.exists( | |
os.path.join(persist_directory, "chroma-collections.parquet") | |
) and os.path.exists( | |
os.path.join(persist_directory, "chroma-embeddings.parquet") | |
): | |
list_index_files = glob.glob(os.path.join(persist_directory, "index/*.bin")) | |
list_index_files += glob.glob( | |
os.path.join(persist_directory, "index/*.pkl") | |
) | |
# At least 3 documents are needed in a working vectorstore | |
if len(list_index_files) > 3: | |
return True | |
return False | |
def create_vectorstore(): | |
# Create embeddings | |
embeddings = OpenAIEmbeddings() | |
if does_vectorstore_exist(PERSIST_DIRECTORY): | |
# Update and store locally vectorstore | |
print(f"Appending to existing vectorstore at {PERSIST_DIRECTORY}") | |
db = Chroma( | |
persist_directory=PERSIST_DIRECTORY, | |
embedding_function=embeddings, | |
client_settings=CHROMA_SETTINGS, | |
) | |
collection = db.get() | |
texts = process_documents( | |
[metadata["source"] for metadata in collection["metadatas"]] | |
) | |
if not texts: | |
return | |
print(f"Creating embeddings. May take some minutes...") | |
db.add_documents(texts) | |
else: | |
# Create and store locally vectorstore | |
print("Creating new vectorstore") | |
texts = process_documents() | |
if not texts: | |
return | |
print(f"Creating embeddings. May take some minutes...") | |
db = Chroma.from_documents( | |
texts, | |
embeddings, | |
persist_directory=PERSIST_DIRECTORY, | |
client_settings=CHROMA_SETTINGS, | |
) | |
db.persist() | |
db = None | |
print(f"Ingestion complete!") | |