import os from dotenv import load_dotenv from langchain.embeddings import CacheBackedEmbeddings from langchain.storage import LocalFileStore from langchain_community.document_loaders import PyMuPDFLoader from langchain_community.vectorstores import FAISS from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough, RunnableParallel from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain.prompts import ChatPromptTemplate from datasets import Dataset from ragas.testset.generator import TestsetGenerator from ragas.testset.evolutions import simple, reasoning, multi_context from ragas import evaluate from ragas.metrics import ( faithfulness, answer_relevancy, answer_correctness, context_recall, context_precision, ) TEST_SIZE = 10 CACHE_STORE = "data/cache/" # Load the environment variables to gain access to OpenAI, WandB, and the other APIs. load_dotenv() # Initialize tracking and monitoring os.environ['LANGCHAIN_TRACING_V2'] = "true" os.environ["LANGCHAIN_PROJECT"] = "midterm_chainlit" # Set the embedding and completion model. embedding_model = "text-embedding-3-small" llm_model_name = "gpt-3.5-turbo" def get_cached_embedder(cache_store_path: str, embedding_model: str): """Cache the embeddings and return the embedder.""" local_file_store = LocalFileStore(cache_store_path) embeddings = OpenAIEmbeddings( model=embedding_model, ) return CacheBackedEmbeddings.from_bytes_store( embeddings, local_file_store, namespace=embeddings.model ) def get_documents(test: bool = False): if test: return "harrison worked at Kensho" else: # Load the document. loader = PyMuPDFLoader( "data/nvidia_filings.pdf", ) return loader.load() def chunk_and_store(documents, cached_embedder): splitter = RecursiveCharacterTextSplitter( chunk_size=400, chunk_overlap=50, ) if type(documents) is str: chunks = splitter.split_text(documents) vector_store = FAISS.from_texts(chunks, cached_embedder) else: # Split the document into chunks. chunks = splitter.split_documents(documents) # Store the embeddings. vector_store = FAISS.from_documents(chunks, cached_embedder) vector_store.save_local("data/vector_store.index") print("Vector store index saved on disk.") print(len(chunks)) return vector_store def get_store(): if not os.path.exists("data/vector_store.index"): chunk_and_store(get_documents(), get_cached_embedder(CACHE_STORE, embedding_model)) else: print("Loading the vector store from the disk.") return FAISS.load_local( "data/vector_store.index", get_cached_embedder(CACHE_STORE, embedding_model), allow_dangerous_deserialization=True) def get_chain(retriever): return ( RunnableParallel(context=retriever, question=RunnablePassthrough()) | prompt | primary_qa_llm | StrOutputParser() ) def retrieve_and_answer(questions: list, retriever): answers = [] for question in questions: chain = get_chain(retriever) answer = chain.invoke(question) answers.append(answer) return answers documents = get_documents(test=False) # Define the test questions question_1 = "Who is the E-VP, Operations - and how old are they?" question_2 = "what is the gross carrying amount of Total Amortizable Intangible Assets for Jan 29, 2023?" questions = [question_1, question_2] # Define the retrieval prompt. retrieval_prompt_template = """Answer the question based only on the following context. If you cannot answer the question with the context, please respond with 'I cannot answer the question with the context provided.': Context: {context} Question: {question} """ prompt = ChatPromptTemplate.from_template(retrieval_prompt_template) # Define the model primary_qa_llm = ChatOpenAI( model_name=llm_model_name, temperature=0.0, streaming=True, ) store = get_store() retriever = store.as_retriever() chain = get_chain(retriever) for answer in retrieve_and_answer(questions, retriever): print(answer) # # eval_documents = documents # # text_splitter = RecursiveCharacterTextSplitter( # chunk_size=1500, # chunk_overlap=400 # ) # # eval_documents = text_splitter.split_documents(eval_documents) # # generator = TestsetGenerator.with_openai() # # test_set = generator.generate_with_langchain_docs( # eval_documents, # test_size=TEST_SIZE, # distributions={simple: 1}, # ) # # test_df = test_set.to_pandas() # test_questions = test_df["question"].values.tolist() # test_ground_truths = test_df["ground_truth"].values.tolist() # # ragas_answers = [] # ragas_contexts = [] # # for question in test_questions: # response = get_chain(retriever).invoke({"question": question}) # ragas_answers.append(response["response"].content) # ragas_contexts.append([context.page_content for context in response["context"]]) # # response_dataset = Dataset.from_dict({ # "question": test_questions, # "answer": ragas_answers, # "contexts": ragas_contexts, # "ground_truth": test_ground_truths # }) # # metrics = [ # faithfulness, # answer_relevancy, # context_recall, # context_precision, # answer_correctness, # ] # # results = evaluate(response_dataset, metrics) # # results_df = results.to_pandas() # # print(results_df)