chatbot / app.py
long1104's picture
Update app.py (#2)
1d48263 verified
raw
history blame
19.5 kB
from setup_code import * # This imports everything from setup_code.py
class Query_Agent:
def __init__(self, pinecone_index, pinecone_index_python, openai_client) -> None:
# TODO: Initialize the Query_Agent agent
self.pinecone_index = pinecone_index
self.pinecone_index_python = pinecone_index_python
self.openai_client = openai_client
self.query_embedding = None
self.codbert_tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
self.codebert_model = AutoModel.from_pretrained("microsoft/codebert-base")
def get_codebert_embedding(self, code: str):
inputs = self.codbert_tokenizer(code, return_tensors="pt", max_length=512, truncation=True)
outputs = self.codebert_model(**inputs)
cb_embedding = outputs.last_hidden_state.mean(dim=1) # A simple way to pool the embeddings
cb_embedding = cb_embedding.detach().numpy()
cb_embedding = cb_embedding.tolist()
cb_embedding = cb_embedding[0]
return cb_embedding
def get_openai_embedding(self, text, model="text-embedding-ada-002"):
text = text.replace("\n", " ")
return self.openai_client.embeddings.create(input=[text], model=model).data[0].embedding
def query_vector_store(self, query, query_topic: str, index=None, k=5) -> str:
if index == None:
index = self.pinecone_index
if query_topic == 'ml':
self.query_embedding = self.get_openai_embedding(query)
elif query_topic == 'python':
index = self.pinecone_index_python
self.query_embedding = self.get_codebert_embedding(query)
def get_namespace(index):
stat = index.describe_index_stats()
stat_dict_key = stat['namespaces'].keys()
stat_dict_key_list = list(stat_dict_key)
first_key = stat_dict_key_list[0]
return first_key
ns = get_namespace(index)
if query_topic == 'ml':
matches_text = get_top_k_text(index.query(
namespace=ns,
top_k=k,
vector=self.query_embedding,
include_values=True,
include_metadata=True
)
)
elif query_topic == 'python':
matches_text = get_top_filename(index.query(
namespace=ns,
top_k=k,
vector=self.query_embedding,
include_values=True,
include_metadata=True
)
)
return matches_text
def process_query_response(self, head_agent, user_query, query_topic):
# Retrieve the history related to the query_topic
conversation = []
index = head_agent.pinecone_index
if query_topic == "ml":
conversation = Head_Agent.get_history_about('ml')
elif query_topic == 'python':
conversation = Head_Agent.get_history_about('python')
index = head_agent.pinecone_index_python
# get matches from Query_Agent, which uses Pinecone
user_query_plus_conversation = f"The current query is: {user_query}"
if len(conversation) > 0:
conversation_text = "\n".join(conversation)
user_query_plus_conversation += f'The current conversation is: {conversation_text}'
## self.query_embedding is set here
matches_text = self.query_vector_store(user_query_plus_conversation, query_topic, index)
if head_agent.relevant_documents_agent.is_relevant(matches_text, user_query_plus_conversation) or contains_py_filename(matches_text):
if query_topic == 'python':
user_query = user_query + " Please use Python Code in your response."
response = head_agent.answering_agent.generate_response(user_query, matches_text, conversation, head_agent.selected_mode)
else:
prompt_for_gpt = f"Return a response to this query: {user_query} in the context of this conversation: {conversation}. Please use language appropriate for a {head_agent.selected_mode}."
response = get_completion(head_agent.openai_client, prompt_for_gpt)
response = "[EXTERNAL] " + response
return response
class Answering_Agent:
def __init__(self, openai_client) -> None:
self.client = openai_client
def generate_response(self, query, docs, conv_history, selected_mode):
prompt_for_gpt = f"Based on this text in angle brackets: <{docs}>, please summarize a response to this query: {query} in the context of this conversation: {conv_history}. Please use language appropriate for a {selected_mode}. Do not use the phrase `in the context of conversation` in your response"
return get_completion(self.client, prompt_for_gpt)
def generate_response_topic(self, topic_desc, topic_text, conv_history, selected_mode):
prompt_for_gpt = f"Please return a summary response on this topic: {topic_desc} using this text as best as possible {topic_text} in the context of this {conv_history}. Please use language appropriate for a {selected_mode}."
return get_completion(self.client, prompt_for_gpt)
def generate_image(self, text):
if DEBUG:
return None, ""
dall_e_prompt_from_gpt = f"Based on this text, repeated here in double square brackets for your reference: [[{text}]], please generate a simple caption that I can use with dall-e to generate an instructional image."
dall_e_text = get_completion(self.client, dall_e_prompt_from_gpt)
# Write open_ai text
with open("dall_e_prompts.txt", "a") as f:
f.write(f"{dall_e_text}\n\n")
# get image from dall-e
image = Head_Agent.text_to_image(self.client, dall_e_text)
# once u have get a caption from GPT
image_caption_prompt = f"This text in double square brackets is used to prompt dall-e: [[{dall_e_text}]]. Please generate a simple caption that I can use to display with the image dall-e will create. Only return that caption."
image_caption = get_completion(self.client, image_caption_prompt)
#st.write(f"image_caption_prompt): {image_caption_prompt}")
return (image, image_caption)
class Concepts_Agent:
def __init__(self):
self._df = pd.read_csv("concepts_final.csv")
#self.topic_matrix = [[0] * 5 for _ in range(12)]
def increase_cell(self, i, j):
st.session_state.topic_matrix[i][j] += + 1
def display_topic_matrix(self):
headers = [f"Topic {i}" for i in range(1, 6)]
row_indices = [f"{self._df['concept'][i-1]}" for i in range(1, 13)]
topic_df = pd.DataFrame(st.session_state.topic_matrix, row_indices, headers)
st.table(topic_df)
st.write(f"Total Topics covered: {sum(sum(row) for row in st.session_state.topic_matrix)}")
def display_topic_matrix(self):
headers = [f"Topic {i}" for i in range(1, 6)]
row_indices = [f"{self._df['concept'][i-1]}" for i in range(1, 13)]
topic_df = pd.DataFrame(st.session_state.topic_matrix, row_indices, headers)
st.table(topic_df)
st.write(f"Total Topics covered: {sum(sum(row) for row in st.session_state.topic_matrix)}")
def display_topic_matrix_star(self):
headers = [f"Topic {i}" for i in range(1, 6)]
row_indices = [f"{self._df['concept'][i-1]}" for i in range(1, 13)]
# Replace 1 with the Unicode star symbol
topic_matrix_star = [[chr(9733) if val == 1 else val for val in row] for row in st.session_state.topic_matrix]
topic_df = pd.DataFrame(topic_matrix_star, row_indices, headers)
st.table(topic_df)
st.write(f"Total Topics covered: {sum(sum(row) for row in st.session_state.topic_matrix)}")
def display_topic_matrix_as_image(self):
headers = [f"Topic {i}" for i in range(1, 6)]
row_indices = [f"{self._df['concept'][i-1]}" for i in range(1, 13)]
topic_df = pd.DataFrame(st.session_state.topic_matrix, row_indices, headers)
df_html = topic_df.to_html(index=False)
# Create an image of the HTML table
image = Image.new("RGB", (800, 600), color="white") # Define image size
draw = ImageDraw.Draw(image)
draw.text((10, 10), df_html, fill="black") # Position of the table in the image
# Save the image to a byte stream
image_byte_array = io.BytesIO()
image.save(image_byte_array, format="PNG")
image_byte_array.seek(0)
# Now you can use the image_byte_array in Streamlit as an image
st.image(image_byte_array, caption="DataFrame as Image")
return image_byte_array
# for each query_embedding, we will look through the df of concepts
# we'll do a cosine_similarity of that query_embedding with each of the embeddings for each concept
def find_top_concept_index(self, query_embedding):
top_sim = 0
top_concept_index = 0
for index, row in self._df.iterrows():
float_array = np.array(ast.literal_eval(row['embedding'])).reshape(1, -1)
qe_array = np.array(query_embedding).reshape(1, -1)
sim = cosine_similarity(float_array, qe_array)
if sim[0][0] > top_sim:
top_sim = sim[0][0]
top_concept_index = index
return top_concept_index
def get_top_k_text_list(self, matches, k):
text_list = []
for i in range(0, k):
text_list.append(matches.get('matches')[i]['metadata']['text'])
return text_list
def write_to_file(self, filename):
self._df.to_csv(filename, index=False) # Setting index=False to avoid writing row indices
class Head_Agent:
def __init__(self, openai_key, pinecone_key) -> None:
# TODO: Initialize the Head_Agent
self.openai_key = openai_key
self.pinecone_key = pinecone_key
self.selected_mode = ""
self.openai_client = OpenAI(api_key=self.openai_key)
self.pc = Pinecone(api_key=self.pinecone_key)
self.pinecone_index = self.pc.Index("index-600")
self.pinecone_index_python = self.pc.Index("index-python-files")
self.query_embedding_local = None
self.setup_sub_agents()
def setup_sub_agents(self):
self.classify_agent = Classify_Agent(self.openai_client)
self.query_agent = Query_Agent(self.pinecone_index, self.pinecone_index_python, self.openai_client) # took away embeddings argument since not used
self.answering_agent = Answering_Agent(self.openai_client)
self.relevant_documents_agent = Relevant_Documents_Agent(self.openai_client)
self.ca = Concepts_Agent()
@staticmethod
def get_conversation():
# ... (code for getting conversation history)
return Head_Agent.get_history_about()
@staticmethod
def get_history_about(topic=None):
history = []
for message in st.session_state.messages:
role = message["role"]
content = message["content"]
if topic == None:
if role == "user":
history.append(f"{content} ")
else:
if message["topic"] == topic:
history.append(f"{content} ")
# st.write(f"user history in get_conversation is {history}")
if history != None:
history = history[-2:]
return history
@staticmethod
def text_to_image(openai_client, text):
model = "dall-e-3"
size = "512x512"
with st.spinner("Generating ..."):
response = openai_client.images.generate(
model=model,
prompt = text,
n=1,
size="1024x1024"
)
image_url = response.data[0].url
with urllib.request.urlopen(image_url) as image_url:
img = Image.open(BytesIO(image_url.read()))
return img
def get_default_value(self, variable):
if variable == "openai_model": return "gpt-3.5-turbo"
elif variable == "messages": return []
elif variable == "stage": return 0
elif variable == "query_embedding": return None
elif variable == "topic_matrix": return [[0] * 5 for _ in range(12)]
else:
st.write(f"Error: get_default_value, variable not defined: {variable}")
return None
def initialize_session_state(self):
session_state_variables = ["openai_model", "messages", "stage", "query_embedding", "topic_matrix"]
for variable in session_state_variables:
if variable not in st.session_state:
st.session_state[variable] = self.get_default_value(variable)
def display_selection_options(self):
modes = ['college student', 'middle school student', '1st grade student', 'high school student', 'grad student']
self.selected_mode = st.selectbox("Select your education level:", modes)
def display_chat_messages(self):
# Display existing chat messages
for message in st.session_state.messages:
if message["role"] == "assistant":
with st.chat_message("assistant"):
st.write(message["content"])
if message['image'] != None:
st.image(message['image'])
else:
with st.chat_message("user"):
st.write(message["content"])
def main_loop(self):
st.title("Machine Learning Text Guide Chatbot")
self.initialize_session_state()
self.display_selection_options()
self.display_chat_messages()
### Wait for user input ###
if user_query := st.chat_input("What would you like to chat about?"):
with st.chat_message("user"): st.write(user_query)
with st.chat_message("assistant"):
response = ""; topic = None; image = None; caption = ""; st.session_state.stage = 0
# Get the current conversation with new user query to check for users' intention
conversation = self.get_conversation()
user_query_plus_conversation = f"The current query is: {user_query}. The current conversation is: {conversation}"
classify_query = self.classify_agent.classify_query(user_query_plus_conversation)
if classify_query == general_greeting_num:
response = "How can I assist you today?"
elif classify_query == general_question_num:
response = "Please ask a question about Machine Learning or Python Code."
elif classify_query == obnoxious_num:
response = "Please dont be obnoxious."
elif classify_query == progress_num:
self.ca.display_topic_matrix_star()
elif classify_query == default_num:
response = "I'm not sure how to respond to that."
elif classify_query == machine_learning_num:
response = self.query_agent.process_query_response(self, user_query, 'ml')
st.session_state.query_embedding = self.query_agent.get_openai_embedding(user_query)
image, caption = self.answering_agent.generate_image(response)
topic = "ml"
st.session_state.stage = 1
elif classify_query == python_code_num:
response = self.query_agent.process_query_response(self, user_query, 'python')
image, caption = self.answering_agent.generate_image(response)
image = None
topic = "python"
st.session_state.stage = 0
else:
response = "I'm not sure how to respond to that."
# ... (get AI response and display it)
st.write(response)
if image and caption != "": st.image(image, caption)
st.session_state.messages.append({"role": "user", "content": user_query, "topic": topic, "image": None})
st.session_state.messages.append({"role": "assistant", "content": response, "topic": topic, "image": image})
if st.session_state.stage == 1: ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ###
# it looks like after we hit st.button, we go back to the top of the st.session_state.stage == 1 loop, and we lose the query_embedding_local
# we use st.session_state.query_embedding to get the concept index
top_concept_index = self.ca.find_top_concept_index(st.session_state.query_embedding)
concept_name = self.ca._df['concept'][top_concept_index]
st.write(f"Your question is associated to the Fundamental Concept in Machine Learning: {concept_name}.\n\n")
st.write(f"Here are some topics you can explore to help you learn about {concept_name}, pick one.")
response = ""; image = None; topic = ""
topic0_desc = self.ca._df['topic_0_desc'][top_concept_index]
topic1_desc = self.ca._df['topic_1_desc'][top_concept_index]
topic2_desc = self.ca._df['topic_2_desc'][top_concept_index]
topic3_desc = self.ca._df['topic_3_desc'][top_concept_index]
topic4_desc = self.ca._df['topic_4_desc'][top_concept_index]
matrix_row = st.session_state.topic_matrix[top_concept_index]
if (matrix_row[0] == 0 and st.session_state.stage):
if st.button(topic0_desc): process_button_click(self, 0, topic0_desc, top_concept_index)
if (matrix_row[1] == 0 and st.session_state.stage):
if st.button(topic1_desc): process_button_click(self, 1, topic1_desc, top_concept_index)
if (matrix_row[2] == 0 and st.session_state.stage):
if st.button(topic2_desc): process_button_click(self, 2, topic2_desc, top_concept_index)
if (matrix_row[3] == 0 and st.session_state.stage):
if st.button(topic3_desc): process_button_click(self, 3, topic3_desc, top_concept_index)
if (matrix_row[4] == 0 and st.session_state.stage):
if st.button(topic4_desc): process_button_click(self, 4, topic4_desc, top_concept_index)
def process_button_click(head, button_index, topic_desc, top_concept_index):
with st.chat_message("user"): st.write(topic_desc)
# we then assign to st.session_state.query_embedding the embedding for the topic_desc
st.session_state.query_embedding = head.query_agent.get_openai_embedding(topic_desc)
topic_text_index = 'topic_' + str(button_index)
topic_text = head.ca._df[topic_text_index][top_concept_index]
response = head.answering_agent.generate_response_topic(topic_desc, topic_text, head.get_conversation(), head.selected_mode)
image, caption = head.answering_agent.generate_image(topic_text)
topic = topic_desc
st.session_state.topic_matrix[top_concept_index][button_index] += 1
st.write(response)
if image and caption != "": st.image(image, caption)
# ... (add response & image to message)
st.session_state.messages.append({"role": "user", "content": topic_desc, "topic": "ml", "image": None})
st.session_state.messages.append({"role": "assistant", "content": response, "topic": topic, "image": image})
st.session_state.stage = 0
if __name__ == "__main__":
head_agent = Head_Agent(OPENAI_KEY, pc_apikey)
DEBUG = False
head_agent.main_loop()
#main()