import os import pickle import re import time from typing import List, Union from urllib.parse import urlparse, urljoin import faiss import requests from PyPDF2 import PdfReader from bs4 import BeautifulSoup from langchain import OpenAI, LLMChain from langchain.agents import ConversationalAgent from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent, AgentOutputParser from langchain.prompts import BaseChatPromptTemplate from langchain.chains import ConversationalRetrievalChain from langchain.docstore.document import Document from langchain.embeddings import OpenAIEmbeddings from langchain.memory import ConversationBufferWindowMemory from langchain.schema import AgentAction, AgentFinish, HumanMessage from langchain.text_splitter import CharacterTextSplitter from langchain.vectorstores.faiss import FAISS book_url = 'https://g.co/kgs/2VFC7u' book_file = "Book.pdf" url = 'https://makerlab.illinois.edu/' pickle_file = "open_ai.pkl" index_file = "open_ai.index" gpt_3_5 = OpenAI(model_name='gpt-3.5-turbo',temperature=0) embeddings = OpenAIEmbeddings() chat_history = [] memory = ConversationBufferWindowMemory(memory_key="chat_history") gpt_3_5_index = None class CustomOutputParser(AgentOutputParser): def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: # Check if agent replied without using tools if "AI:" in llm_output: return AgentFinish(return_values={"output": llm_output.split("AI:")[-1].strip()}, log=llm_output) # Check if agent should finish if "Final Answer:" in llm_output: return AgentFinish( # Return values is generally always a dictionary with a single `output` key # It is not recommended to try anything else at the moment :) return_values={"output": llm_output.split("Final Answer:")[-1].strip()}, log=llm_output, ) # Parse out the action and action input regex = r"Action: (.*?)[\n]*Action Input:[\s]*(.*)" match = re.search(regex, llm_output, re.DOTALL) if not match: raise ValueError(f"Could not parse LLM output: `{llm_output}`") action = match.group(1).strip() action_input = match.group(2) # Return the action and action input return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output) # Set up a prompt template class CustomPromptTemplate(BaseChatPromptTemplate): # The template to use template: str # The list of tools available tools: List[Tool] def format_messages(self, **kwargs) -> str: # Get the intermediate steps (AgentAction, Observation tuples) # Format them in a particular way intermediate_steps = kwargs.pop("intermediate_steps") thoughts = "" for action, observation in intermediate_steps: thoughts += action.log thoughts += f"\nObservation: {observation}\nThought: " # Set the agent_scratchpad variable to that value kwargs["agent_scratchpad"] = thoughts # Create a tools variable from the list of tools provided kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools]) # Create a list of tool names for the tools provided kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools]) formatted = self.template.format(**kwargs) return [HumanMessage(content=formatted)] def get_search_index(): global gpt_3_5_index if os.path.isfile(pickle_file) and os.path.isfile(index_file) and os.path.getsize(pickle_file) > 0: # Load index from pickle file with open(pickle_file, "rb") as f: search_index = pickle.load(f) else: search_index = create_index() gpt_3_5_index = search_index def create_index(): source_chunks = create_chunk_documents() search_index = search_index_from_docs(source_chunks) faiss.write_index(search_index.index, index_file) # Save index to pickle file with open(pickle_file, "wb") as f: pickle.dump(search_index, f) return search_index def create_chunk_documents(): sources = fetch_data_for_embeddings(url, book_file, book_url) # print("sources" + str(len(sources))) splitter = CharacterTextSplitter(separator=" ", chunk_size=800, chunk_overlap=0) source_chunks = splitter.split_documents(sources) for chunk in source_chunks: print("Size of chunk: " + str(len(chunk.page_content) + len(chunk.metadata))) if chunk.page_content is None or chunk.page_content == '': print("removing chunk: "+ chunk.page_content) source_chunks.remove(chunk) elif len(chunk.page_content) >=1000: print("splitting document") source_chunks.extend(splitter.split_documents([chunk])) # print("Chunks: " + str(len(source_chunks)) + "and type " + str(type(source_chunks))) return source_chunks def fetch_data_for_embeddings(url, book_file, book_url): sources = get_website_data(url) sources.extend(get_document_data(book_file, book_url)) return sources def get_website_data(index_url): # Get all page paths from index paths = get_paths(index_url) # Filter out invalid links and join them with the base URL links = get_links(index_url, paths) return get_content_from_links(links, index_url) def get_content_from_links(links, index_url): content_list = [] for link in set(links): if link.startswith(index_url): page_data = requests.get(link).content soup = BeautifulSoup(page_data, "html.parser") # Get page content content = soup.get_text(separator="\n") # print(link) # Get page metadata metadata = {"source": link} content_list.append(Document(page_content=content, metadata=metadata)) time.sleep(1) # print("content list" + str(len(content_list))) return content_list def get_paths(index_url): index_data = requests.get(index_url).content soup = BeautifulSoup(index_data, "html.parser") paths = set([a.get('href') for a in soup.find_all('a', href=True)]) return paths def get_links(index_url, paths): links = [] for path in paths: url = urljoin(index_url, path) parsed_url = urlparse(url) if parsed_url.scheme in ["http", "https"] and "squarespace" not in parsed_url.netloc: links.append(url) return links def get_document_data(book_file, book_url): document_list = [] with open(book_file, 'rb') as f: pdf_reader = PdfReader(f) for i in range(len(pdf_reader.pages)): page_text = pdf_reader.pages[i].extract_text() metadata = {"source": book_url} document_list.append(Document(page_content=page_text, metadata=metadata)) # print("document list" + str(len(document_list))) return document_list def search_index_from_docs(source_chunks): # Create index from chunk documents # print("Size of chunk" + str(len(source_chunks))) search_index = FAISS.from_texts([doc.page_content for doc in source_chunks], embeddings, metadatas=[doc.metadata for doc in source_chunks]) return search_index def get_qa_chain(gpt_3_5_index): global gpt_3_5 print("index: " + str(gpt_3_5_index)) return ConversationalRetrievalChain.from_llm(gpt_3_5, chain_type="stuff", get_chat_history=get_chat_history, retriever=gpt_3_5_index.as_retriever(), return_source_documents=True, verbose=True) def get_chat_history(inputs) -> str: res = [] for human, ai in inputs: res.append(f"Human:{human}\nAI:{ai}") return "\n".join(res) def generate_answer(question) -> str: global chat_history, gpt_3_5_index gpt_3_5_chain = get_qa_chain(gpt_3_5_index) result = gpt_3_5_chain( {"question": question, "chat_history": chat_history,"vectordbkwargs": {"search_distance": 0.8}}) print("REsult: " + str(result)) chat_history = [(question, result["answer"])] sources = [] for document in result['source_documents']: source = document.metadata['source'] sources.append(source) source = ',\n'.join(set(sources)) return result['answer'] + '\nSOURCES: ' + source def get_agent_chain(prompt, tools): global gpt_3_5 # output_parser = CustomOutputParser() llm_chain = LLMChain(llm=gpt_3_5, prompt=prompt) agent = ConversationalAgent(llm_chain=llm_chain, tools=tools, verbose=True) agent_chain = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, memory=memory, intermediate_steps=True) return agent_chain def get_prompt_and_tools(): tools = get_tools() prefix = """Have a conversation with a human, answering the following questions as best you can. Always try to use Vectorstore first. Your name is Makerlab Bot because you are a personal assistant of Makerlab. You have access to the following tools:""" suffix = """Begin! If you use any tool, ALWAYS return a "SOURCES" part in your answer" {chat_history} Question: {input} {agent_scratchpad} SOURCES:""" prompt = ConversationalAgent.create_prompt( tools, prefix=prefix, suffix=suffix, input_variables=["input", "chat_history", "agent_scratchpad"] ) # print("Template: " + prompt.template) return prompt, tools def get_tools(): tools = [ Tool( name="Vectorstore", func=generate_answer, description="useful for when you need to answer questions about the Makerlab or 3D Printing.", return_direct=True )] return tools def get_custom_agent(prompt, tools): llm_chain = LLMChain(llm=gpt_3_5, prompt=prompt) output_parser = CustomOutputParser() tool_names = [tool.name for tool in tools] agent = LLMSingleActionAgent( llm_chain=llm_chain, output_parser=output_parser, stop=["\nObservation:"], allowed_tools=tool_names ) agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, memory=memory, intermediate_steps=True) return agent_executor def get_prompt_and_tools_for_custom_agent(): template = """ Have a conversation with a human, answering the following questions as best you can. Always try to use Vectorstore first. Your name is Makerlab Bot because you are a personal assistant of Makerlab. You have access to the following tools: {tools} To answer for the new input, use the following format: New Input: the input question you must answer Thought: Do I need to use a tool? Yes Action: the action to take, should be one of [{tool_names}] Action Input: the input to the action Observation: the result of the action ... (this Thought/Action/Action Input/Observation can repeat N times) Thought: I now know the final answer Final Answer: the final answer to the original input question. SOURCES: the sources referred to find the final answer When you have a response to say to the Human and DO NOT need to use a tool: 1. DO NOT return "SOURCES" if you did not use any tool. 2. You MUST use this format: ``` Thought: Do I need to use a tool? No AI: [your response here] ``` Begin! Remember to speak as a personal assistant when giving your final answer. ALWAYS return a "SOURCES" part in your answer, if you used any tool. Previous conversation history: {chat_history} New input: {input} {agent_scratchpad} SOURCES:""" tools = get_tools() prompt = CustomPromptTemplate( template=template, tools=tools, # This omits the `agent_scratchpad`, `tools`, and `tool_names` variables because those are generated dynamically # This includes the `intermediate_steps` variable because that is needed input_variables=["input", "intermediate_steps", "chat_history"] ) return prompt, tools