import os import shutil from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Chroma from langchain.embeddings import HuggingFaceEmbeddings from langchain.document_loaders import PyPDFLoader from .config import get_sources from .embeddings import EMBEDDING_MODEL_NAME from .vectorstore import PERSIST_DIRECTORY, get_vectorstore MIN_CHUNK_SIZE = 100 def load_data(): print("Loading data...") docs = parse_data() print("Documents loaded") embedding_function = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL_NAME) print("Building index...") vectorstore = get_vectorstore(embedding_function) assert isinstance(vectorstore, Chroma) vectorstore.from_documents( docs, embedding_function, persist_directory=PERSIST_DIRECTORY ) print("Index built") return vectorstore def parse_data(): docs = [] for source in get_sources(): file_path = source["file_path"] loader = PyPDFLoader(file_path) pages = loader.load_and_split() # split it into chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) doc_chunks = text_splitter.split_documents(pages) for chunk in doc_chunks: if len(chunk.page_content) < MIN_CHUNK_SIZE: continue chunk.metadata["name"] = source["name"] chunk.metadata["domain"] = source["domain"] url = source.get("url", None) if url: chunk.metadata["url"] = source.get("url", None) chunk.metadata["page_number"] = chunk.metadata["page"] chunk.metadata["short_name"] = chunk.metadata["name"] docs.append(chunk) return docs def clear_index(): directory_path = PERSIST_DIRECTORY for filename in os.listdir(directory_path): file_path = os.path.join(directory_path, filename) try: print(f"Deleting {file_path}") if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print(f"Failed to delete {file_path}. Reason: {e}") if __name__ == "__main__": clear_index() db = load_data() # query it query = ( "He who can bear the misfortune of a nation is called the ruler of the world." ) docs = db.similarity_search(query) print(docs)