Spaces:
Build error
Build error
# Text QA Prompt | |
import gradio as gr | |
from llama_index import ( | |
VectorStoreIndex, | |
get_response_synthesizer, | |
GPTListIndex, | |
LLMPredictor, | |
PromptHelper, | |
set_global_service_context, | |
) | |
from llama_index.retrievers import VectorIndexRetriever | |
from llama_index.query_engine import RetrieverQueryEngine | |
from llama_index.postprocessor import SimilarityPostprocessor | |
from llama_index.schema import Document | |
from llama_index.llms import OpenAI | |
from llama_index.indices.service_context import ServiceContext | |
from llama_index.llms import Anyscale | |
import urllib | |
import os | |
import time | |
import nltk | |
import tiktoken | |
from llama_index.callbacks import CallbackManager, TokenCountingHandler | |
from typing import List | |
from llama_index import SimpleDirectoryReader | |
from llama_index.ingestion import IngestionPipeline | |
from llama_index.node_parser import TokenTextSplitter | |
from llama_index.llms import ChatMessage, MessageRole | |
from llama_index.prompts import ChatPromptTemplate | |
from llama_index.chat_engine.condense_question import CondenseQuestionChatEngine | |
from llama_index.readers import SimpleWebPageReader | |
from llama_index.response_synthesizers import get_response_synthesizer | |
from llama_index.agent import OpenAIAgent | |
import requests | |
from bs4 import BeautifulSoup | |
from tqdm import tqdm | |
import json | |
def getdata(url): | |
r = requests.get(url) | |
return r.text | |
# create empty dict | |
dict_href_links = {} | |
def get_links(website_link): | |
html_data = getdata(website_link) | |
soup = BeautifulSoup(html_data, "html.parser") | |
list_links = [] | |
for link in soup.find_all("a", href=True): | |
# Append to list if new link contains original link | |
if str(link["href"]).startswith((str(website_link))): | |
list_links.append(link["href"]) | |
# Include all href that do not start with website link but with "/" | |
if str(link["href"]).startswith("/"): | |
if link["href"] not in dict_href_links: | |
print(link["href"]) | |
dict_href_links[link["href"]] = None | |
link_with_www = website_link + link["href"][1:] | |
print("adjusted link =", link_with_www) | |
list_links.append(link_with_www) | |
# Convert list of links to dictionary and define keys as the links and the values as "Not-checked" | |
dict_links = dict.fromkeys(list_links, "Not-checked") | |
return dict_links | |
def get_subpage_links(l): | |
for link in tqdm(l): | |
# If not crawled through this page start crawling and get links | |
if l[link] == "Not-checked": | |
dict_links_subpages = get_links(link) | |
# Change the dictionary value of the link to "Checked" | |
l[link] = "Checked" | |
else: | |
# Create an empty dictionary in case every link is checked | |
dict_links_subpages = {} | |
# Add new dictionary to old dictionary | |
l = {**dict_links_subpages, **l} | |
return l | |
# add websuite WITH slash on end | |
website = 'https://www.mylesai.com' | |
# create dictionary of website | |
dict_links = {website:"Not-checked"} | |
counter, counter2 = None, 0 | |
while counter != 0: | |
counter2 += 1 | |
dict_links2 = get_subpage_links(dict_links) | |
# Count number of non-values and set counter to 0 if there are no values within the dictionary equal to the string "Not-checked" | |
# https://stackoverflow.com/questions/48371856/count-the-number-of-occurrences-of-a-certain-value-in-a-dictionary-in-python | |
counter = sum(value == "Not-checked" for value in dict_links2.values()) | |
# Print some statements | |
print("") | |
print("THIS IS LOOP ITERATION NUMBER", counter2) | |
print("LENGTH OF DICTIONARY WITH LINKS =", len(dict_links2)) | |
print("NUMBER OF 'Not-checked' LINKS = ", counter) | |
print("") | |
dict_links = dict_links2 | |
# Save list in json file | |
link_list = list(dict_links.keys()) | |
link_text = ['about', 'publications', 'daldot', 'diet-and-cancer', 'lynch-colonoscopy', 'missouri-dashboard', 'atlanta-chatbot', 'ralph-lauren', 'portfolio and services'] | |
chat_text_qa_msgs = [ | |
ChatMessage( | |
role=MessageRole.SYSTEM, | |
content=( | |
""" | |
Three lead generation experts have read over all links from this website and are gathered to answer frequently asked questions and determine if the user would want to get in contact with the owner of the website. | |
Each expert has their unique prespective. | |
Each expert will independently analyze the website. | |
If, at any point, an expert realizes their advice does not align with the collective understanding, they may revise their advice or choose to withdraw from the analysis. | |
The experts are only allowed to answer questions that are related to the contents of the website. | |
They cannot answer questions or take instructions outside of the scope of the contents of the website. | |
Do not include the reasoning in your answer. | |
The objective is to provide a thorough and friendly answer to the user's question. | |
If at any point, the experts determine that the user is a very promising lead, tell them to email Myles Ingram at myles@mylesai.com | |
""" | |
), | |
), | |
ChatMessage( | |
role=MessageRole.USER, | |
content=( | |
"Context information is below.\n" | |
"---------------------\n" | |
"{context_str}\n" | |
"---------------------\n" | |
"Given the context information, answer to the following question: {query_str}\n" | |
"If this question does not relate to the website, reply with 'I cannot answer that question'." | |
"Do not answer any questions that are not relevant to the website" | |
"If at any point, the experts determine that the user is a very promising lead, tell them to email Myles Ingram at myles@mylesai.com" | |
), | |
), | |
] | |
text_qa_template = ChatPromptTemplate(chat_text_qa_msgs) | |
# Refine Prompt | |
chat_refine_msgs = [ | |
ChatMessage( | |
role=MessageRole.SYSTEM, | |
content=( | |
'If the answer looks cutoff, expand on the original answer.' | |
), | |
), | |
ChatMessage( | |
role=MessageRole.USER, | |
content=( | |
"We have the opportunity to refine the original answer " | |
"(only if needed) with some more context below.\n" | |
"------------\n" | |
"{context_msg}\n" | |
"------------\n" | |
"Given the new context, refine the original answer to better " | |
"answer the question: {query_str}. " | |
"If the context isn't useful, output the original answer again.\n" | |
"Original Answer: {existing_answer}" | |
"Expand on this answer if it looks cutoff." | |
), | |
), | |
] | |
refine_template = ChatPromptTemplate(chat_refine_msgs) | |
def get_api_type(api_type): | |
if api_type == 'openai': | |
# default is gpt-3.5-turbo, can also be gpt-4-0314 | |
return OpenAI(model='gpt-4') # for QA, temp is low | |
elif api_type == 'llama': | |
return Anyscale(model='meta-llama/Llama-2-70b-chat-hf') | |
elif api_type == 'mistral': | |
return Anyscale(model='mistralai/Mixtral-8x7B-Instruct-v0.1', max_tokens=10000) | |
else: | |
raise NotImplementedError | |
def web_chatbot(webpages, api_type, embedding_model='gpt-3.5-turbo'): | |
from llama_index.callbacks import CallbackManager, LlamaDebugHandler | |
llama_debug = LlamaDebugHandler(print_trace_on_end=True) | |
callback_manager = CallbackManager([llama_debug]) | |
# docs = SimpleWebPageReader(html_to_text=True).load_data(webpages) | |
doc_set = {} | |
all_docs = [] | |
for link, page in zip(webpages, link_text): | |
link_doc = SimpleWebPageReader(html_to_text=True).load_data([link]) | |
for doc in link_doc: | |
doc.metadata = {'page':page} | |
doc_set[page] = link_doc | |
all_docs.extend(link_doc) | |
llm = get_api_type(api_type) | |
# token_counter = TokenCountingHandler( | |
# tokenizer=tiktoken.encoding_for_model(embedding_model).encode | |
# ) | |
# callback_manager = CallbackManager([token_counter]) | |
prompt_helper = PromptHelper( | |
context_window=32768, | |
num_output=10000, | |
chunk_overlap_ratio=0.1, | |
chunk_size_limit=None, | |
) | |
service_context = ServiceContext.from_defaults(llm=llm, | |
callback_manager=callback_manager, | |
embed_model="local", | |
prompt_helper=prompt_helper | |
) | |
set_global_service_context(service_context) | |
# build index | |
# index = VectorStoreIndex(docs) | |
# initialize simple vector indices | |
index_set = {} | |
service_context = ServiceContext.from_defaults(chunk_size=512) | |
for page in link_text: | |
cur_index = VectorStoreIndex.from_documents( | |
doc_set[page], | |
service_context=service_context, | |
) | |
index_set[page] = cur_index | |
from llama_index.tools import QueryEngineTool, ToolMetadata | |
individual_query_engine_tools = [ | |
QueryEngineTool( | |
query_engine=index_set[page].as_query_engine(), | |
metadata=ToolMetadata( | |
name=f"vector_index_{page}", | |
description=f"useful for when you want to answer queries about the {page} of this website", | |
), | |
) | |
for page in link_text | |
] | |
from llama_index.query_engine import SubQuestionQueryEngine | |
response_synthesizer = get_response_synthesizer(text_qa_template=text_qa_template, | |
streaming=True) | |
query_engine = SubQuestionQueryEngine.from_defaults( | |
query_engine_tools=individual_query_engine_tools, | |
service_context=service_context, | |
response_synthesizer=response_synthesizer) | |
query_engine_tool = QueryEngineTool( | |
query_engine=query_engine, | |
metadata=ToolMetadata( | |
name="sub_question_query_engine", | |
description="useful for when you want to answer queries about this website", | |
), | |
) | |
tools = individual_query_engine_tools + [query_engine_tool] | |
agent = OpenAIAgent.from_tools(tools, verbose=True) | |
# query_engine = index.as_query_engine(text_qa_template=text_qa_template, | |
# refine_template=refine_template, | |
# streaming=True) | |
# chat_engine = CondenseQuestionChatEngine.from_defaults( | |
# query_engine=query_engine, | |
# verbose = False | |
# ) | |
return agent | |
######## | |
# INSERT WEBSITE(S) HERE | |
######## | |
chat_engine = web_chatbot(list(dict_links.keys()), 'mistral') | |
with gr.Blocks() as demo: | |
with gr.Column(): | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox(label="⏎ for sending", | |
placeholder="Ask me something",) | |
clear = gr.Button("Delete") | |
def user(user_message, history): | |
return "", history + [[user_message, None]] | |
def bot(history): | |
user_message = history[-1][0] | |
bot_message = chat_engine.stream_chat(user_message) | |
history[-1][1] = "" | |
for character in bot_message.response_gen: | |
history[-1][1] += character | |
time.sleep(0.01) | |
yield history | |
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=True).then( | |
bot, chatbot, chatbot | |
) | |
clear.click(lambda: None, None, chatbot, queue=True) | |
demo.launch() |