Spaces:
Sleeping
Sleeping
import os | |
from dotenv import load_dotenv | |
from langchain.embeddings import CacheBackedEmbeddings | |
from langchain.storage import LocalFileStore | |
from langchain_community.document_loaders import PyMuPDFLoader | |
from langchain_community.vectorstores import FAISS | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables import RunnablePassthrough, RunnableParallel | |
from langchain_openai import OpenAIEmbeddings, ChatOpenAI | |
from langchain.prompts import ChatPromptTemplate | |
from datasets import Dataset | |
from ragas.testset.generator import TestsetGenerator | |
from ragas.testset.evolutions import simple, reasoning, multi_context | |
from ragas import evaluate | |
from ragas.metrics import ( | |
faithfulness, | |
answer_relevancy, | |
answer_correctness, | |
context_recall, | |
context_precision, | |
) | |
TEST_SIZE = 10 | |
CACHE_STORE = "data/cache/" | |
# Load the environment variables to gain access to OpenAI, WandB, and the other APIs. | |
load_dotenv() | |
# Initialize tracking and monitoring | |
os.environ['LANGCHAIN_TRACING_V2'] = "true" | |
os.environ["LANGCHAIN_PROJECT"] = "midterm_chainlit" | |
# Set the embedding and completion model. | |
embedding_model = "text-embedding-3-small" | |
llm_model_name = "gpt-3.5-turbo" | |
def get_cached_embedder(cache_store_path: str, embedding_model: str): | |
"""Cache the embeddings and return the embedder.""" | |
local_file_store = LocalFileStore(cache_store_path) | |
embeddings = OpenAIEmbeddings( | |
model=embedding_model, | |
) | |
return CacheBackedEmbeddings.from_bytes_store( | |
embeddings, local_file_store, namespace=embeddings.model | |
) | |
def get_documents(test: bool = False): | |
if test: | |
return "harrison worked at Kensho" | |
else: | |
# Load the document. | |
loader = PyMuPDFLoader( | |
"data/nvidia_filings.pdf", | |
) | |
return loader.load() | |
def chunk_and_store(documents, cached_embedder): | |
splitter = RecursiveCharacterTextSplitter( | |
chunk_size=400, | |
chunk_overlap=50, | |
) | |
if type(documents) is str: | |
chunks = splitter.split_text(documents) | |
vector_store = FAISS.from_texts(chunks, cached_embedder) | |
else: | |
# Split the document into chunks. | |
chunks = splitter.split_documents(documents) | |
# Store the embeddings. | |
vector_store = FAISS.from_documents(chunks, cached_embedder) | |
vector_store.save_local("data/vector_store.index") | |
print("Vector store index saved on disk.") | |
print(len(chunks)) | |
return vector_store | |
def get_store(): | |
if not os.path.exists("data/vector_store.index"): | |
chunk_and_store(get_documents(), get_cached_embedder(CACHE_STORE, embedding_model)) | |
else: | |
print("Loading the vector store from the disk.") | |
return FAISS.load_local( | |
"data/vector_store.index", | |
get_cached_embedder(CACHE_STORE, embedding_model), | |
allow_dangerous_deserialization=True) | |
def get_chain(retriever): | |
return ( | |
RunnableParallel(context=retriever, question=RunnablePassthrough()) | |
| prompt | |
| primary_qa_llm | |
| StrOutputParser() | |
) | |
def retrieve_and_answer(questions: list, retriever): | |
answers = [] | |
for question in questions: | |
chain = get_chain(retriever) | |
answer = chain.invoke(question) | |
answers.append(answer) | |
return answers | |
documents = get_documents(test=False) | |
# Define the test questions | |
question_1 = "Who is the E-VP, Operations - and how old are they?" | |
question_2 = "what is the gross carrying amount of Total Amortizable Intangible Assets for Jan 29, 2023?" | |
questions = [question_1, question_2] | |
# Define the retrieval prompt. | |
retrieval_prompt_template = """Answer the question based only on the following context. If you cannot answer the question with the context, please respond with 'I cannot answer the question with the context provided.': | |
Context: {context} | |
Question: | |
{question} | |
""" | |
prompt = ChatPromptTemplate.from_template(retrieval_prompt_template) | |
# Define the model | |
primary_qa_llm = ChatOpenAI( | |
model_name=llm_model_name, | |
temperature=0.0, | |
streaming=True, | |
) | |
store = get_store() | |
retriever = store.as_retriever() | |
chain = get_chain(retriever) | |
for answer in retrieve_and_answer(questions, retriever): | |
print(answer) | |
# | |
# eval_documents = documents | |
# | |
# text_splitter = RecursiveCharacterTextSplitter( | |
# chunk_size=1500, | |
# chunk_overlap=400 | |
# ) | |
# | |
# eval_documents = text_splitter.split_documents(eval_documents) | |
# | |
# generator = TestsetGenerator.with_openai() | |
# | |
# test_set = generator.generate_with_langchain_docs( | |
# eval_documents, | |
# test_size=TEST_SIZE, | |
# distributions={simple: 1}, | |
# ) | |
# | |
# test_df = test_set.to_pandas() | |
# test_questions = test_df["question"].values.tolist() | |
# test_ground_truths = test_df["ground_truth"].values.tolist() | |
# | |
# ragas_answers = [] | |
# ragas_contexts = [] | |
# | |
# for question in test_questions: | |
# response = get_chain(retriever).invoke({"question": question}) | |
# ragas_answers.append(response["response"].content) | |
# ragas_contexts.append([context.page_content for context in response["context"]]) | |
# | |
# response_dataset = Dataset.from_dict({ | |
# "question": test_questions, | |
# "answer": ragas_answers, | |
# "contexts": ragas_contexts, | |
# "ground_truth": test_ground_truths | |
# }) | |
# | |
# metrics = [ | |
# faithfulness, | |
# answer_relevancy, | |
# context_recall, | |
# context_precision, | |
# answer_correctness, | |
# ] | |
# | |
# results = evaluate(response_dataset, metrics) | |
# | |
# results_df = results.to_pandas() | |
# | |
# print(results_df) | |