import logging import shutil from pathlib import Path logging.basicConfig(level=logging.INFO) _logger = logging.getLogger("chunking") from operator import itemgetter from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables.base import RunnableSequence from langchain_core.vectorstores import VectorStore from langchain.retrievers.multi_query import MultiQueryRetriever from langchain_community.vectorstores import Qdrant from langchain.schema.output_parser import StrOutputParser from langchain.schema.runnable import RunnablePassthrough from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import PyMuPDFLoader from langchain_experimental.text_splitter import SemanticChunker from globals import ( embeddings, gpt35_model, gpt4_model, META_10K_FILE_PATH, META_SEMANTIC_COLLECTION, VECTOR_STORE_PATH ) USE_MEMORY = True from qdrant_client import QdrantClient qclient: QdrantClient if USE_MEMORY == True: qclient = QdrantClient(":memory:") else: qclient = QdrantClient(path=VECTOR_STORE_PATH) RAG_PROMPT = """ Reply the user's query thoughtfully and clearly. You should only respond to user's query if the context is related to the query. If you are not sure how to answer, please reply "I don't know". Respond with structure in markdown. CONTEXT: {context} QUERY: {question} YOUR REPLY: """ rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT) class SemanticStoreFactory: _semantic_vectorstore: VectorStore = None @classmethod def __load_semantic_store( cls ) -> VectorStore: path = Path(VECTOR_STORE_PATH) store = None # check if path exists and if it is not empty if path.exists() and path.is_dir() and any(path.iterdir()): _logger.info(f"\tQdrant loading ...") store = Qdrant( client=qclient, embeddings=embeddings, collection_name=META_SEMANTIC_COLLECTION, ) else: _logger.info(f"\tQdrant creating ...") store = cls.__create_semantic_store() return store @classmethod def __create_semantic_store( cls ) -> VectorStore: if USE_MEMORY == True: _logger.info(f"creating semantic vector store: {USE_MEMORY}") else: _logger.info(f"creating semantic vector store: {VECTOR_STORE_PATH}") path = Path(VECTOR_STORE_PATH) if not path.exists(): path.mkdir(parents=True, exist_ok=True) _logger.info(f"Directory '{path}' created.") _logger.info(f"loading {META_10K_FILE_PATH}") documents = PyMuPDFLoader(META_10K_FILE_PATH).load() semantic_chunker = SemanticChunker( embeddings=embeddings, breakpoint_threshold_type="percentile" ) semantic_chunks = semantic_chunker.create_documents([d.page_content for d in documents]) _logger.info(f"created semantic_chunks: {len(semantic_chunks)}") if USE_MEMORY == True: _logger.info(f"\t==> creating memory vectorstore ...") semantic_chunk_vectorstore = Qdrant.from_documents( semantic_chunks, embeddings, location=":memory:", collection_name=META_SEMANTIC_COLLECTION, force_recreate=True ) _logger.info(f"\t==> finished constructing vectorstore") else: semantic_chunk_vectorstore = Qdrant.from_documents( semantic_chunks, embeddings, path=VECTOR_STORE_PATH, collection_name=META_SEMANTIC_COLLECTION, force_recreate=True ) _logger.info(f"\t==> return vectorstore") return semantic_chunk_vectorstore @classmethod def get_semantic_store( cls ) -> VectorStore: _logger.info(f"get_semantic_store") if cls._semantic_vectorstore is None: if USE_MEMORY == True: cls._semantic_vectorstore = cls.__create_semantic_store() _logger.info(f"received semantic_vectorstore") else: print(f"Loading semantic vectorstore {META_SEMANTIC_COLLECTION} from: {VECTOR_STORE_PATH}") try: # first try to load the store cls._semantic_vectorstore = cls.__load_semantic_store() except Exception as e: _logger.warning(f"cannot load: {e}") cls._semantic_vectorstore = cls.__create_semantic_store() _logger.info(f"RETURNING get_semantic_store") return cls._semantic_vectorstore class SemanticRAGChainFactory: _chain: RunnableSequence = None @classmethod def get_semantic_rag_chain( cls ) -> RunnableSequence: if cls._chain is None: _logger.info(f"creating SemanticRAGChainFactory") semantic_store = SemanticStoreFactory.get_semantic_store() _logger.info(f"\treceived semantic_store") if semantic_store is not None: semantic_chunk_retriever = semantic_store.as_retriever() semantic_mquery_retriever = MultiQueryRetriever.from_llm( retriever=semantic_chunk_retriever, llm=gpt4_model ) cls._chain = ( # INVOKE CHAIN WITH: {"question" : "<>"} # "question" : populated by getting the value of the "question" key # "context" : populated by getting the value of the "question" key and chaining it into the base_retriever {"context": itemgetter("question") | semantic_mquery_retriever, "question": itemgetter("question")} # "context" : is assigned to a RunnablePassthrough object (will not be called or considered in the next step) # by getting the value of the "context" key from the previous step | RunnablePassthrough.assign(context=itemgetter("context")) # "response" : the "context" and "question" values are used to format our prompt object and then piped # into the LLM and stored in a key called "response" # "context" : populated by getting the value of the "context" key from the previous step | {"response": rag_prompt | gpt4_model, "context": itemgetter("context")} ) _logger.info(f"\t_chain constructed") return cls._chain