Spaces:
Runtime error
Runtime error
| from pydantic import NoneStr | |
| import os | |
| from langchain.chains.question_answering import load_qa_chain | |
| from langchain.document_loaders import UnstructuredFileLoader | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.llms import OpenAI | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.vectorstores import FAISS | |
| from langchain.vectorstores import Chroma | |
| from langchain.chains import ConversationalRetrievalChain | |
| import gradio as gr | |
| import openai | |
| from langchain import PromptTemplate, OpenAI, LLMChain | |
| import validators | |
| import requests | |
| import mimetypes | |
| import tempfile | |
| class Chatbot: | |
| def __init__(self): | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| def get_empty_state(self): | |
| """ Create empty Knowledge base""" | |
| return {"knowledge_base": None} | |
| def create_knowledge_base(self,docs): | |
| """Create a knowledge base from the given documents. | |
| Args: | |
| docs (List[str]): List of documents. | |
| Returns: | |
| FAISS: Knowledge base built from the documents. | |
| """ | |
| # Initialize a CharacterTextSplitter to split the documents into chunks | |
| # Each chunk has a maximum length of 500 characters | |
| # There is no overlap between the chunks | |
| text_splitter = CharacterTextSplitter( | |
| separator="\n", chunk_size=1000, chunk_overlap=200, length_function=len | |
| ) | |
| # Split the documents into chunks using the text_splitter | |
| chunks = text_splitter.split_documents(docs) | |
| # Initialize an OpenAIEmbeddings model to compute embeddings of the chunks | |
| embeddings = OpenAIEmbeddings() | |
| # Build a knowledge base using Chroma from the chunks and their embeddings | |
| knowledge_base = Chroma.from_documents(chunks, embeddings) | |
| # Return the resulting knowledge base | |
| return knowledge_base | |
| def upload_file(self,file_paths): | |
| """Upload a file and create a knowledge base from its contents. | |
| Args: | |
| file_paths : The files to uploaded. | |
| Returns: | |
| tuple: A tuple containing the file name and the knowledge base. | |
| """ | |
| file_paths = [i.name for i in file_paths] | |
| print(file_paths) | |
| loaders = [UnstructuredFileLoader(file_obj, strategy="fast") for file_obj in file_paths] | |
| # Load the contents of the file using the loader | |
| docs = [] | |
| for loader in loaders: | |
| docs.extend(loader.load()) | |
| # Create a knowledge base from the loaded documents using the create_knowledge_base() method | |
| knowledge_base = self.create_knowledge_base(docs) | |
| # Return a tuple containing the file name and the knowledge base | |
| return file_paths, {"knowledge_base": knowledge_base} | |
| def add_text(self,history, text): | |
| history = history + [(text, None)] | |
| print("History for Add text : ",history) | |
| return history, gr.update(value="", interactive=False) | |
| def upload_multiple_urls(self,urls): | |
| urlss = [url.strip() for url in urls.split(',')] | |
| all_docs = [] | |
| file_paths = [] | |
| for url in urlss: | |
| if validators.url(url): | |
| headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',} | |
| r = requests.get(url,headers=headers) | |
| if r.status_code != 200: | |
| raise ValueError("Check the url of your file; returned status code %s" % r.status_code) | |
| content_type = r.headers.get("content-type") | |
| file_extension = mimetypes.guess_extension(content_type) | |
| temp_file = tempfile.NamedTemporaryFile(suffix=file_extension, delete=False) | |
| temp_file.write(r.content) | |
| file_path = temp_file.name | |
| file_paths.append(file_path) | |
| loaders = [UnstructuredFileLoader(file_obj, strategy="fast") for file_obj in file_paths] | |
| # Load the contents of the file using the loader | |
| docs = [] | |
| for loader in loaders: | |
| docs.extend(loader.load()) | |
| # Create a knowledge base from the loaded documents using the create_knowledge_base() method | |
| knowledge_base = self.create_knowledge_base(docs) | |
| return file_paths,{"knowledge_base":knowledge_base} | |
| def answer_question(self, question,history,state): | |
| """Answer a question based on the current knowledge base. | |
| Args: | |
| state (dict): The current state containing the knowledge base. | |
| Returns: | |
| str: The answer to the question. | |
| """ | |
| # Retrieve the knowledge base from the state dictionary | |
| knowledge_base = state["knowledge_base"] | |
| retriever = knowledge_base.as_retriever() | |
| qa = ConversationalRetrievalChain.from_llm( | |
| llm=OpenAI(temperature=0.1), | |
| retriever=retriever, | |
| return_source_documents=False) | |
| # Set the question for which we want to find the answer | |
| res = [] | |
| question = history[-1][0] | |
| for human, ai in history[:-1]: | |
| pair = (human, ai) | |
| res.append(pair) | |
| chat_history = [] | |
| query = question | |
| result = qa({"question": query, "chat_history": chat_history}) | |
| # Perform a similarity search on the knowledge base to retrieve relevant documents | |
| response = result["answer"] | |
| # Return the response as the answer to the question | |
| history[-1][1] = response | |
| print("History for QA : ",history) | |
| return history | |
| def clear_function(self,state): | |
| state.clear() | |
| # state = gr.State(self.get_empty_state()) | |
| def gradio_interface(self): | |
| """Create the Gradio interface for the Chemical Identifier.""" | |
| with gr.Blocks(css="style.css",theme='karthikeyan-adople/hudsonhayes-gray') as demo: | |
| gr.HTML("""<div style='background-color:rgb(0,1,36); text-align:center;padding:40px; padding-top:0'> | |
| <img class="leftimage" align="left" src="file=logo.png" alt="Image" width="210" height="210"> | |
| <img class="rightimage" align="right" src="file=bedfordshire.png" alt="Image" width="90" height="90"> | |
| <h2 style="text-align:center; color:white; text-weight:bold;padding-top:120px; padding-botton:0;">Procurement Digital Assistant</h2> | |
| </div>""") | |
| state = gr.State(self.get_empty_state()) | |
| with gr.Column(elem_id="col-container"): | |
| with gr.Accordion("Upload Files", open = False): | |
| with gr.Row(elem_id="row-flex"): | |
| with gr.Row(elem_id="row-flex"): | |
| with gr.Column(scale=1,): | |
| file_url = gr.Textbox(label='file url :',show_label=True, placeholder="") | |
| with gr.Row(elem_id="row-flex"): | |
| with gr.Column(scale=1): | |
| file_output = gr.File() | |
| with gr.Column(scale=1): | |
| upload_button = gr.UploadButton("Browse File", file_types=[".txt", ".pdf", ".doc", ".docx"],file_count = "multiple") | |
| with gr.Row(): | |
| chatbot = gr.Chatbot([], elem_id="chatbot") | |
| with gr.Row(): | |
| txt = gr.Textbox(label = "Question",show_label=True,placeholder="Enter text and press Enter") | |
| with gr.Row(): | |
| clear_btn = gr.Button(value="Clear") | |
| txt_msg = txt.submit(self.add_text, [chatbot, txt], [chatbot, txt], queue=False).then(self.answer_question, [txt, chatbot, state], chatbot) | |
| txt_msg.then(lambda: gr.update(interactive=True), None, [txt], queue=False) | |
| file_url.submit(self.upload_multiple_urls, file_url, [file_output, state]) | |
| clear_btn.click(self.clear_function,[state],[]) | |
| clear_btn.click(lambda: None, None, chatbot, queue=False) | |
| upload_button.upload(self.upload_file, upload_button, [file_output,state]) | |
| demo.queue().launch(debug=True) | |
| if __name__=="__main__": | |
| chatbot = Chatbot() | |
| chatbot.gradio_interface() |