DemoRag / app03-chatRagLcelMem.py
Jorge Londono
Implemented RAG with memory
83233f5
# Run with reload mode:
# gradio app03-chatRagLcelMem.py
import os
import gradio as gr
from operator import itemgetter
# Langchain
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableParallel,RunnablePassthrough,RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import AIMessage, HumanMessage, get_buffer_string
from langchain.prompts.prompt import PromptTemplate
from langchain.schema import format_document
from langchain.memory import ConversationBufferMemory
# HuggingFace
from langchain_community.embeddings import HuggingFaceEmbeddings
# GeminiPro
from langchain_google_genai import ChatGoogleGenerativeAI
# Groq
from langchain_groq import ChatGroq
# Pinecone vector database
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
from dotenv import load_dotenv
load_dotenv()
# print('EMBEDDINGS_MODEL', os.getenv("EMBEDDINGS_MODEL"))
setid = "global"
def pipeLog(x):
print("***", x)
return x
embeddings = HuggingFaceEmbeddings(model_name=os.getenv("EMBEDDINGS_MODEL"))
# OpenAI
# model = ChatOpenAI(temperature=0.0)
# Gemini
# model = ChatGoogleGenerativeAI(
# model="gemini-pro", temperature=0.1, convert_system_message_to_human=True
# )
# Groq
# llama2-70b-4096 (4k), mixtral-8x7b-32768 (32k)
model = ChatGroq(model_name='mixtral-8x7b-32768')
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))
index = pc.Index(setid)
vectorstore = PineconeVectorStore(index, embeddings, "text")
retriever = vectorstore.as_retriever(kwargs={"k":5}) # Find top-5 documents
template_no_history = """Answer the question based only on the following context:
{context}
Question: {question}
"""
ANSWER_PROMPT = ChatPromptTemplate.from_template(template_no_history)
template_with_history = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.
Chat History:
{chat_history}
Follow Up Input: {question}
Standalone question:"""
CONDENSE_QUESTION_PROMPT = ChatPromptTemplate.from_template(template_with_history)
DEFAULT_DOCUMENT_PROMPT = PromptTemplate.from_template(template="{page_content}")
def _combine_documents(docs, document_prompt=DEFAULT_DOCUMENT_PROMPT, document_separator="\n\n"):
doc_strings = [format_document(doc, document_prompt) for doc in docs]
return document_separator.join(doc_strings)
# setup_and_retrieval = RunnableParallel(
# {"context": retriever, "question": RunnablePassthrough()}
# )
# def format_docs(docs):
# return "\n\n".join(doc.page_content for doc in docs)
# rag_chain_from_docs = (
# RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
# | PROMPT_NH
# | model
# | StrOutputParser()
# )
# rag_chain_with_source = RunnableParallel(
# {"context": retriever, "question": RunnablePassthrough()}
# ).assign(answer=rag_chain_from_docs)
# def rag_query(question: str, history: list[list[str]]):
# if len(history)==0:
# # chain = setup_and_retrieval | PROMPT_NH | model
# # response = chain.invoke(question)
# response = rag_chain_with_source.invoke(question)
# sources = [ doc.metadata['source'] for doc in response['context'] ]
# print(response, '\n', sources)
# return response['answer'] # FAILS!!!
# else:
# chat_history = ""
# for l in history:
# chat_history += " : ".join(l)
# chat_history += "\n"
# chain = (
# { "chat_history": itemgetter('chat_history'), "question": itemgetter('question') }
# | PROMPT_WH
# | pipeLog
# | model
# )
# response = chain.invoke({ "chat_history": chat_history, "question": question })
# return response.content
# ----------------------------------------
# Prepare the chain to run the queries
# Store chat history
memory = ConversationBufferMemory(return_messages=True, output_key="answer", input_key="question")
# Load chat history into 'memory' key
loaded_memory = RunnablePassthrough.assign(
chat_history=RunnableLambda(memory.load_memory_variables) | itemgetter("history"),
)
# Generate a standalone question
standalone_question = {
"standalone_question": {
"question": lambda x: x["question"],
"chat_history": lambda x: get_buffer_string(x["chat_history"]),
}
| CONDENSE_QUESTION_PROMPT
| model
| StrOutputParser(),
}
# Retrieve related documents
retrieved_documents = {
"docs": itemgetter("standalone_question") | retriever,
"question": lambda x: x["standalone_question"],
}
# Construct the inputs for the final prompt
final_inputs = {
"context": lambda x: _combine_documents(x["docs"]),
"question": itemgetter("question"),
}
# And finally, we do the part that returns the answers
answer = {
"answer": final_inputs | ANSWER_PROMPT | model,
"docs": itemgetter("docs"),
}
# The complete chain
final_chain = loaded_memory | standalone_question | retrieved_documents | answer
def pipeLog(s:str, x):
print(s, x)
return x
pipe_a = RunnableLambda(lambda x: pipeLog("a:",x))
pipe_b = RunnableLambda(lambda x: pipeLog("b:",x))
def rag_query(question: str, history: list[list[str]]) -> str:
"""Run a RAG query using own history, not the gradio history"""
inputs = { 'question':question }
response = final_chain.invoke(inputs)
# print(response)
memory.save_context(inputs, {"answer": response["answer"].content})
# sources = [ doc.metadata['source'] for doc in response['context'] ]
# print(response, '\n', sources)
return response['answer'].content
def test_query(question):
print('QUESTION:', question)
answer = rag_query(question, None)
print('ANSWER: ', answer, '\n')
# test_query("What is the capital of France?")
# test_query("What is a Blockchain?")
# test_query("What is it useful for?")
gr.ChatInterface(
rag_query,
title="RAG Chatbot demo",
description="A chatbot doing Retrieval Augmented Generation, backed by a Pinecone vector database"
).launch()