Spaces:
Runtime error
Runtime error
| """ | |
| Module for ingesting data to be used by the RAG tool. | |
| """ | |
| import glob | |
| import os | |
| from typing import List | |
| from multiprocessing import Pool | |
| from tqdm import tqdm | |
| from langchain_community.document_loaders import ( | |
| CSVLoader, | |
| PyMuPDFLoader, | |
| TextLoader, | |
| UnstructuredWordDocumentLoader, | |
| UnstructuredPowerPointLoader, | |
| UnstructuredMarkdownLoader, | |
| UnstructuredEPubLoader, | |
| ) | |
| from langchain_community.vectorstores.chroma import Chroma | |
| from langchain_openai.embeddings import OpenAIEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| import chromadb | |
| from dotenv import ( | |
| load_dotenv, | |
| find_dotenv, | |
| ) | |
| from fastapi import APIRouter | |
| from constants import CHROMA_SETTINGS | |
| ingestion_router = APIRouter() | |
| if not load_dotenv(find_dotenv()): | |
| print("Could not load `.env` file or it is empty. Please check that it exists \ | |
| and is readable by the current user") | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
| embeddings_model = OpenAIEmbeddings() | |
| # Load environment variables | |
| persist_directory = os.environ.get("PERSIST_DIRECTORY", "chroma_vectorstore") | |
| source_directory = os.environ.get('SOURCE_DIRECTORY', "data") | |
| CHUNK_SIZE = 1000 | |
| CHUNK_OVERLAP = 200 | |
| LOADER_MAPPING = { | |
| ".csv": (CSVLoader, {}), | |
| ".doc": (UnstructuredWordDocumentLoader, {}), | |
| ".docx": (UnstructuredWordDocumentLoader, {}), | |
| ".epub": (UnstructuredEPubLoader, {}), | |
| ".md": (UnstructuredMarkdownLoader, {}), | |
| ".pdf": (PyMuPDFLoader, {}), | |
| ".ppt": (UnstructuredPowerPointLoader, {}), | |
| ".pptx": (UnstructuredPowerPointLoader, {}), | |
| ".txt": (TextLoader, {"encoding": "utf8"}), | |
| # ".json": (JSONLoader, {"jq_schema": ".", "text_content": False}) | |
| } | |
| def load_single_document(file_path: str) -> List[Document]: | |
| ext = "." + file_path.rsplit(".", 1)[-1].lower() | |
| print(file_path) | |
| if ext in LOADER_MAPPING: | |
| loader_class, loader_args = LOADER_MAPPING[ext] | |
| loader = loader_class(file_path, **loader_args) | |
| return loader.load() | |
| raise ValueError(f"Unsupported file extension '{ext}'") | |
| def load_documents( | |
| source_dir: str, | |
| ignored_files: List[str] = [] | |
| ) -> List[Document]: | |
| """ | |
| Loads all documents from the source documents directory, ignoring specified files | |
| """ | |
| all_files = [] | |
| for ext in LOADER_MAPPING: | |
| all_files.extend( | |
| glob.glob(os.path.join(source_dir, f"**/*{ext.lower()}"), recursive=True) | |
| ) | |
| all_files.extend( | |
| glob.glob(os.path.join(source_dir, f"**/*{ext.upper()}"), recursive=True) | |
| ) | |
| filtered_files = [file_path for file_path in all_files if file_path not in ignored_files] | |
| with Pool(processes=os.cpu_count()) as pool: | |
| results = [] | |
| with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar: | |
| for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)): | |
| results.extend(docs) | |
| pbar.update() | |
| return results | |
| def process_documents(ignored_files: List[str] = []) -> List[Document]: | |
| """ | |
| Load documents and split in chunks | |
| """ | |
| print(f"Loading documents from {source_directory}") | |
| documents = load_documents(source_directory, ignored_files) | |
| if not documents: | |
| print("No new documents to load") | |
| return None | |
| print(f"Loaded {len(documents)} new documents from {source_directory}") | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP | |
| ) | |
| texts = text_splitter.split_documents(documents) | |
| print(f"Split into {len(texts)} chunks of text (max. {CHUNK_SIZE} tokens each)") | |
| return texts | |
| def does_vectorstore_exist( | |
| persist_dir: str, | |
| embeddings: OpenAIEmbeddings | |
| ) -> bool: | |
| """ | |
| Checks if vectorstore exists | |
| """ | |
| db = Chroma( | |
| persist_directory=persist_dir, | |
| embedding_function=embeddings, | |
| client_settings=CHROMA_SETTINGS, | |
| ) | |
| if not db.get()['documents']: | |
| return False | |
| return True | |
| def main(): | |
| try: | |
| # Create embeddings | |
| embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY) | |
| # Chroma client | |
| chroma_client = chromadb.PersistentClient( | |
| settings=CHROMA_SETTINGS, | |
| path=persist_directory | |
| ) | |
| if does_vectorstore_exist(persist_directory, embeddings): | |
| # Update and store locally vectorstore | |
| print(f"Appending to existing vectorstore at {persist_directory}") | |
| db = Chroma( | |
| persist_directory=persist_directory, | |
| embedding_function=embeddings, | |
| client_settings=CHROMA_SETTINGS, | |
| client=chroma_client | |
| ) | |
| collection = db.get() | |
| texts = process_documents( | |
| [metadata['source'] for metadata in collection['metadatas']] | |
| ) | |
| if not texts: | |
| return "No new document to load" | |
| print("Creating embeddings. May take some minutes...") | |
| db.add_documents(texts) | |
| else: | |
| # Create and store locally vectorstore | |
| print("Creating new vectorstore") | |
| texts = process_documents() | |
| if not texts: | |
| return "No new document to load" | |
| print("Creating embeddings. May take some minutes...") | |
| db = Chroma.from_documents( | |
| texts, | |
| embeddings, | |
| persist_directory=persist_directory, | |
| client_settings=CHROMA_SETTINGS, | |
| client=chroma_client | |
| ) | |
| db.persist() | |
| db = None | |
| print("Ingestion complete!") | |
| return { | |
| 'Status': 'Ingestion complete!', | |
| "responseCode": 200 | |
| } | |
| # If an error occurs | |
| except Exception as e: | |
| print(e) | |
| return { | |
| "Status": "An error occurred", | |
| "responseCode": 201 | |
| } | |