# Run with reload mode: # gradio app03-chatRagLcelMem.py import os import gradio as gr from operator import itemgetter # Langchain from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.runnables import RunnableParallel,RunnablePassthrough,RunnableLambda from langchain_core.output_parsers import StrOutputParser from langchain_core.messages import AIMessage, HumanMessage, get_buffer_string from langchain.prompts.prompt import PromptTemplate from langchain.schema import format_document from langchain.memory import ConversationBufferMemory # HuggingFace from langchain_community.embeddings import HuggingFaceEmbeddings # GeminiPro from langchain_google_genai import ChatGoogleGenerativeAI # Groq from langchain_groq import ChatGroq # Pinecone vector database from pinecone import Pinecone, ServerlessSpec from langchain_pinecone import PineconeVectorStore from dotenv import load_dotenv load_dotenv() # print('EMBEDDINGS_MODEL', os.getenv("EMBEDDINGS_MODEL")) setid = "global" def pipeLog(x): print("***", x) return x embeddings = HuggingFaceEmbeddings(model_name=os.getenv("EMBEDDINGS_MODEL")) # OpenAI # model = ChatOpenAI(temperature=0.0) # Gemini # model = ChatGoogleGenerativeAI( # model="gemini-pro", temperature=0.1, convert_system_message_to_human=True # ) # Groq # llama2-70b-4096 (4k), mixtral-8x7b-32768 (32k) model = ChatGroq(model_name='mixtral-8x7b-32768') pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) index = pc.Index(setid) vectorstore = PineconeVectorStore(index, embeddings, "text") retriever = vectorstore.as_retriever(kwargs={"k":5}) # Find top-5 documents template_no_history = """Answer the question based only on the following context: {context} Question: {question} """ ANSWER_PROMPT = ChatPromptTemplate.from_template(template_no_history) template_with_history = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language. Chat History: {chat_history} Follow Up Input: {question} Standalone question:""" CONDENSE_QUESTION_PROMPT = ChatPromptTemplate.from_template(template_with_history) DEFAULT_DOCUMENT_PROMPT = PromptTemplate.from_template(template="{page_content}") def _combine_documents(docs, document_prompt=DEFAULT_DOCUMENT_PROMPT, document_separator="\n\n"): doc_strings = [format_document(doc, document_prompt) for doc in docs] return document_separator.join(doc_strings) # setup_and_retrieval = RunnableParallel( # {"context": retriever, "question": RunnablePassthrough()} # ) # def format_docs(docs): # return "\n\n".join(doc.page_content for doc in docs) # rag_chain_from_docs = ( # RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"]))) # | PROMPT_NH # | model # | StrOutputParser() # ) # rag_chain_with_source = RunnableParallel( # {"context": retriever, "question": RunnablePassthrough()} # ).assign(answer=rag_chain_from_docs) # def rag_query(question: str, history: list[list[str]]): # if len(history)==0: # # chain = setup_and_retrieval | PROMPT_NH | model # # response = chain.invoke(question) # response = rag_chain_with_source.invoke(question) # sources = [ doc.metadata['source'] for doc in response['context'] ] # print(response, '\n', sources) # return response['answer'] # FAILS!!! # else: # chat_history = "" # for l in history: # chat_history += " : ".join(l) # chat_history += "\n" # chain = ( # { "chat_history": itemgetter('chat_history'), "question": itemgetter('question') } # | PROMPT_WH # | pipeLog # | model # ) # response = chain.invoke({ "chat_history": chat_history, "question": question }) # return response.content # ---------------------------------------- # Prepare the chain to run the queries # Store chat history memory = ConversationBufferMemory(return_messages=True, output_key="answer", input_key="question") # Load chat history into 'memory' key loaded_memory = RunnablePassthrough.assign( chat_history=RunnableLambda(memory.load_memory_variables) | itemgetter("history"), ) # Generate a standalone question standalone_question = { "standalone_question": { "question": lambda x: x["question"], "chat_history": lambda x: get_buffer_string(x["chat_history"]), } | CONDENSE_QUESTION_PROMPT | model | StrOutputParser(), } # Retrieve related documents retrieved_documents = { "docs": itemgetter("standalone_question") | retriever, "question": lambda x: x["standalone_question"], } # Construct the inputs for the final prompt final_inputs = { "context": lambda x: _combine_documents(x["docs"]), "question": itemgetter("question"), } # And finally, we do the part that returns the answers answer = { "answer": final_inputs | ANSWER_PROMPT | model, "docs": itemgetter("docs"), } # The complete chain final_chain = loaded_memory | standalone_question | retrieved_documents | answer def pipeLog(s:str, x): print(s, x) return x pipe_a = RunnableLambda(lambda x: pipeLog("a:",x)) pipe_b = RunnableLambda(lambda x: pipeLog("b:",x)) def rag_query(question: str, history: list[list[str]]) -> str: """Run a RAG query using own history, not the gradio history""" inputs = { 'question':question } response = final_chain.invoke(inputs) # print(response) memory.save_context(inputs, {"answer": response["answer"].content}) # sources = [ doc.metadata['source'] for doc in response['context'] ] # print(response, '\n', sources) return response['answer'].content def test_query(question): print('QUESTION:', question) answer = rag_query(question, None) print('ANSWER: ', answer, '\n') # test_query("What is the capital of France?") # test_query("What is a Blockchain?") # test_query("What is it useful for?") gr.ChatInterface( rag_query, title="RAG Chatbot demo", description="A chatbot doing Retrieval Augmented Generation, backed by a Pinecone vector database" ).launch()