robertselvam's picture
Update app.py
150e933
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
from langchain.chains import ConversationalRetrievalChain
from langchain.document_loaders import UnstructuredFileLoader
from typing import List, Dict, Tuple
import gradio as gr
import validators
import requests
import mimetypes
import tempfile
import os
from langchain.chains.question_answering import load_qa_chain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.prompts.prompt import PromptTemplate
import pandas as pd
from langchain.agents import create_pandas_dataframe_agent
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from langchain import OpenAI, LLMChain
from langchain.agents.agent_types import AgentType
from langchain.agents import create_csv_agent
class ChatDocumentQA:
def __init__(self) -> None:
pass
def _get_empty_state(self) -> Dict[str, None]:
"""Create an empty knowledge base."""
return {"knowledge_base": None}
def _extract_text_from_pdfs(self, file_paths: List[str]) -> List[str]:
"""Extract text content from PDF files.
Args:
file_paths (List[str]): List of file paths.
Returns:
List[str]: Extracted text from the PDFs.
"""
docs = []
loaders = [UnstructuredFileLoader(file_obj, strategy="fast") for file_obj in file_paths]
for loader in loaders:
docs.extend(loader.load())
return docs
def _get_content_from_url(self, urls: str) -> List[str]:
"""Fetch content from given URLs.
Args:
urls (str): Comma-separated URLs.
Returns:
List[str]: List of text content fetched from the URLs.
"""
file_paths = []
for url in urls.split(','):
if validators.url(url):
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',}
r = requests.get(url, headers=headers)
if r.status_code != 200:
raise ValueError("Check the url of your file; returned status code %s" % r.status_code)
content_type = r.headers.get("content-type")
file_extension = mimetypes.guess_extension(content_type)
temp_file = tempfile.NamedTemporaryFile(suffix=file_extension, delete=False)
temp_file.write(r.content)
file_paths.append(temp_file.name)
docs = self._extract_text_from_pdfs(file_paths)
return docs
def _split_text_into_chunks(self, text: str) -> List[str]:
"""Split text into smaller chunks.
Args:
text (str): Input text to be split.
Returns:
List[str]: List of smaller text chunks.
"""
text_splitter = CharacterTextSplitter(separator="\n", chunk_size=500, chunk_overlap=100, length_function=len)
chunks = text_splitter.split_documents(text)
return chunks
def _create_vector_store_from_text_chunks(self, text_chunks: List[str]) -> FAISS:
"""Create a vector store from text chunks.
Args:
text_chunks (List[str]): List of text chunks.
Returns:
FAISS: Vector store created from the text chunks.
"""
embeddings = OpenAIEmbeddings()
return FAISS.from_documents(documents=text_chunks, embedding=embeddings)
def _create_conversation_chain(self,vectorstore):
_template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.
Chat History: {chat_history}
Follow Up Input: {question}
Standalone question:"""
CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template)
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
# llm = ChatOpenAI(temperature=0)
llm=OpenAI(temperature=0)
return ConversationalRetrievalChain.from_llm(llm=llm, retriever=vectorstore.as_retriever(),
condense_question_prompt=CONDENSE_QUESTION_PROMPT,
memory=memory)
def _get_documents_knowledge_base(self, file_paths: List[str]) -> Tuple[str, Dict[str, FAISS]]:
"""Build knowledge base from uploaded files.
Args:
file_paths (List[str]): List of file paths.
Returns:
Tuple[str, Dict]: Tuple containing a status message and the knowledge base.
"""
file_path = file_paths[0].name
file_extension = os.path.splitext(file_path)[1]
if file_extension == '.pdf':
pdf_docs = [file_path.name for file_path in file_paths]
raw_text = self._extract_text_from_pdfs(pdf_docs)
text_chunks = self._split_text_into_chunks(raw_text)
vectorstore = self._create_vector_store_from_text_chunks(text_chunks)
return "file uploaded", {"knowledge_base": vectorstore}
elif file_extension == '.csv':
# df = pd.read_csv(file_path)
# pd_agent = create_pandas_dataframe_agent(OpenAI(temperature=0), df, verbose=True)
# tools = self.get_agent_tools(pd_agent)
# memory,tools,prompt = self.create_memory_for_csv_qa(tools)
# agent_chain = self.create_agent_chain_for_csv_qa(memory,tools,prompt)
agent_chain = create_csv_agent(
OpenAI(temperature=0),
file_path,
verbose=True,
agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
)
return "file uploaded", {"knowledge_base": agent_chain}
else:
return "file uploaded", ""
def _get_urls_knowledge_base(self, urls: str) -> Tuple[str, Dict[str, FAISS]]:
"""Build knowledge base from URLs.
Args:
urls (str): Comma-separated URLs.
Returns:
Tuple[str, Dict]: Tuple containing a status message and the knowledge base.
"""
webpage_text = self._get_content_from_url(urls)
text_chunks = self._split_text_into_chunks(webpage_text)
vectorstore = self._create_vector_store_from_text_chunks(text_chunks)
return "file uploaded", {"knowledge_base": vectorstore}
#************************
# csv qa
#************************
def get_agent_tools(self,agent):
# search = agent
tools = [
Tool(
name="dataframe qa",
func=agent.run,
description="useful for when you need to answer questions about table data and dataframe data",
)
]
return tools
def create_memory_for_csv_qa(self,tools):
prefix = """Have a conversation with a human, answering the following questions about table data and dataframe data as best you can. You have access to the following tools:"""
suffix = """Begin!"
{chat_history}
Question: {input}
{agent_scratchpad}"""
prompt = ZeroShotAgent.create_prompt(
tools,
prefix=prefix,
suffix=suffix,
input_variables=["input", "chat_history", "agent_scratchpad"],
)
memory = ConversationBufferMemory(memory_key="chat_history",return_messages=True)
return memory,tools,prompt
def create_agent_chain_for_csv_qa(self,memory,tools,prompt):
llm_chain = LLMChain(llm=OpenAI(temperature=0), prompt=prompt)
agent = ZeroShotAgent(llm_chain=llm_chain, tools=tools, verbose=True)
agent_chain = AgentExecutor.from_agent_and_tools(
agent=agent, tools=tools, verbose=True, memory=memory
)
return agent_chain
def _get_response(self, message: str, chat_history: List[Tuple[str, str]], state: Dict[str, FAISS],file_paths) -> Tuple[str, List[Tuple[str, str]]]:
"""Get a response from the chatbot.
Args:
message (str): User's message/question.
chat_history (List[Tuple[str, str]]): List of chat history as tuples of (user_message, bot_response).
state (dict): State containing the knowledge base.
Returns:
Tuple[str, List[Tuple[str, str]]]: Tuple containing a status message and updated chat history.
"""
try:
if file_paths:
file_path = file_paths[0].name
file_extension = os.path.splitext(file_path)[1]
if file_extension == ".pdf":
vectorstore = state["knowledge_base"]
chat = self._create_conversation_chain(vectorstore)
# user_ques = {"question": message}
print("chat_history",chat_history)
response = chat({"question": message,"chat_history": chat_history})
chat_history.append((message, response["answer"]))
return "", chat_history
elif file_extension == '.csv':
agent_chain = state["knowledge_base"]
response = agent_chain.run(input = message)
chat_history.append((message, response))
return "", chat_history
else:
vectorstore = state["knowledge_base"]
chat = self._create_conversation_chain(vectorstore)
# user_ques = {"question": message}
print("chat_history",chat_history)
response = chat({"question": message,"chat_history": chat_history})
chat_history.append((message, response["answer"]))
return "", chat_history
except:
chat_history.append((message, "Please Upload Document or URL"))
return "", chat_history
def gradio_interface(self) -> None:
"""Create a Gradio interface for the chatbot."""
with gr.Blocks(css="style.css",theme='karthikeyan-adople/hudsonhayes-gray') as demo:
gr.HTML("""<center class="darkblue" style='background-color:rgb(0,1,36); text-align:center;padding:25px;'>
<center>
<h1 class ="center">
<img src="file=logo.png" height="110px" width="280px">
</h1>
</center>
<be>
<h1 style="color:#fff">
Virtual Assistant Chatbot
</h1>
</center>""")
state = gr.State(self._get_empty_state())
chatbot = gr.Chatbot()
with gr.Row():
with gr.Column(scale=0.85):
msg = gr.Textbox(label="Question")
with gr.Column(scale=0.15):
file_output = gr.Textbox(label="File Status")
with gr.Row():
with gr.Column(scale=0.85):
clear = gr.ClearButton([msg, chatbot])
with gr.Column(scale=0.15):
upload_button = gr.UploadButton(
"Browse File",
file_types=[".txt", ".pdf", ".doc", ".docx",".csv"],
file_count="multiple", variant="primary"
)
with gr.Row():
with gr.Column(scale=1):
input_url = gr.Textbox(label="urls")
input_url.submit(self._get_urls_knowledge_base, input_url, [file_output, state])
upload_button.upload(self._get_documents_knowledge_base, upload_button, [file_output, state])
msg.submit(self._get_response, [msg, chatbot, state,upload_button], [msg, chatbot])
demo.launch()
if __name__ == "__main__":
chatdocumentqa = ChatDocumentQA()
chatdocumentqa.gradio_interface()