ryder / app.py
praneeth dodedu
s
5a08c7e
import os
os.system("pip uninstall -y gradio")
os.system("pip install gradio==3.31.0")
import numpy as np
from sentence_transformers import SentenceTransformer, models
import faiss
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
import openai
import pickle
import gradio as gr
import base64
from pathlib import Path
import pandas as pd
import gzip
openai.api_key = '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'
nltk.download('punkt')
# Load BERT model
model = SentenceTransformer('multi-qa-MiniLM-L6-cos-v1')
# Directory containing text files
directory = "cleaned_files"
# Define the index file name
index_filename = "faiss.index"
# Define the mapping file name
mapping_filename = "mapping.pkl1"
# Declare Textbox globally
txt = gr.Textbox(
label="Type your query here:",
placeholder="What would you like to learn today?"
).style(container=True)
def apply_html(text, color):
if "<table>" in text and "</table>" in text:
# If the text contains table tags, modify the table structure for Gradio
table_start = text.index("<table>")
table_end = text.index("</table>") + len("</table>")
table_content = text[table_start:table_end]
# Modify the table structure for Gradio
modified_table = table_content.replace("<table>", "<table style='border-collapse: collapse;'>")
modified_table = modified_table.replace("<th>", "<th style='border: 1px solid #ddd; padding: 8px; background-color: #f2f2f2;'>")
modified_table = modified_table.replace("<td>", "<td style='border: 1px solid #ddd; padding: 8px;'>")
# Replace the modified table back into the original text
modified_text = text[:table_start] + modified_table + text[table_end:]
return modified_text
else:
# Return the plain text as is
return text
'''
def apply_html(text, color):
return f'<b style="color:{color}; font-size: 15px; !important">{text}</b>'
'''
def apply_filelist_html(text, color):
return f'<b style="color:{color}; font-size: 12px; !important">{text}</b>'
# Check if the index file exists
if os.path.exists(index_filename) and os.path.exists(mapping_filename):
# Load the index from disk
index = faiss.read_index(index_filename)
# Load the mapping from disk
with open(mapping_filename, 'rb') as f:
chunks, filenames = pickle.load(f)
else:
# Lists to hold file names, corresponding embeddings and text chunks
filenames = []
embeddings = []
chunks = []
# Define chunk size and overlap
chunk_size = 5 # Size of each chunk
overlap = 2 # Size of overlap between chunks
# Iterate over files to create the index
for filename in os.listdir(directory):
if filename.endswith(".txt"):
with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file:
text = file.read()
# Split text into sentences
sentences = sent_tokenize(text)
# Group sentences into chunks with overlap
for i in range(0, len(sentences), chunk_size-overlap):
chunk = ' '.join(sentences[i:i+chunk_size])
chunks.append(chunk)
# Compute BERT embedding and append to list
embeddings.append(model.encode(chunk))
filenames.append(filename)
# Convert list of embeddings to numpy array
embeddings = np.array(embeddings)
# Dimension of our vector space
d = embeddings.shape[1]
# Construct the index
index = faiss.IndexFlatL2(d)
# Add vectors to the index
index.add(embeddings)
# Save the index to disk
faiss.write_index(index, index_filename)
# Save the mapping to disk
with open(mapping_filename, 'wb') as f:
pickle.dump((chunks, filenames), f)
def add_text(history, text):
# Apply selected rules
if history is not None:
# If all rules pass, add message to chat history with bot's response set to None
history.append([apply_html(text, "blue"), None])
return history, text
def bot(query, history, fileListHistory, k=5):
print("QUERY : " + query)
# Compute embedding for the query
query_embedding = model.encode(query)
# Faiss works with single precision
query_embedding = query_embedding.astype('float32')
# Search the index
D, I = index.search(np.array([query_embedding]), k)
# Retrieve and join the top k chunks
top_chunks = [chunks[I[0, i]] for i in range(I.shape[1])]
context = '\n'.join(top_chunks)
# Retrieve the corresponding filenames
top_filenames = [filenames[I[0, i]] for i in range(I.shape[1])]
# Deduplicate file list
top_filenames = list(set(top_filenames))
# Print the filenames
print("Corresponding filenames: ", top_filenames)
# Add the query and filenames to the fileListHistory
# Create file links
file_links = [f'<a href="https://huggingface.co/spaces/happiestminds/rybot/resolve/main/raw/{filename.replace(".txt", ".pdf")}" target="_blank">{filename.replace(".txt", ".pdf")}</a>' for filename in top_filenames]
file_links_str = ', '.join(file_links)
# Update file history with query and file links
fileListHistory.append([apply_filelist_html(f"QUERY: {query} | REFERENCES: {file_links_str}", "green"), None])
# Call OpenAI API
prompt = f'''The following is a query from a user who is a mechanic. Use the context provided to respond to the user.
QUERY: {query}
CONTEXT: {context}
Respond to the point. Do not include terms like - (according to the context provided) in your response.'''
#Remember to respond in bullet points. Respond with a table when appropriate
messages = [{"role": "user", "content": prompt}]
print(messages)
# Initialize response
response = None
# Send messages to OpenAI API
# Attempt the call 3 times
for i in range(3):
try:
# Send message to OpenAI API
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=1000,
stop=None,
temperature=0,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
)
# If the call is successful, break the loop
break
except openai.OpenAIError as e:
# If the call times out, wait for 1 second and then try again
if str(e) == "Request timed out":
time.sleep(1)
else:
# If the error is something else, break the loop
break
# If the call was not successful after 3 attempts, set the response to a timeout message
if response is None:
print("Unfortunately, the connection to ChatGPT timed out. Please try after some time.")
if history is not None and len(history) > 0:
# Update the chat history with the bot's response
history[-1][1] = apply_html(response.text.strip(), "black")
else:
# Print the generated response
print("\nGPT RESPONSE:\n")
print(response['choices'][0]['message']['content'].strip())
if history is not None and len(history) > 0:
# Update the chat history with the bot's response
history[-1][1] = apply_html(response['choices'][0]['message']['content'].strip(), "black")
'''
# Send messages to OpenAI API
# Attempt the call 3 times
for i in range(3):
try:
# Send message to OpenAI API
response = openai.Completion.create(
engine="text-davinci-002",
prompt=prompt,
max_tokens=1000,
temperature=0,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
)
# If the call is successful, break the loop
break
except openai.OpenAIError as e:
# If the call times out, wait for 1 second and then try again
if str(e) == "Request timed out":
time.sleep(1)
else:
# If the error is something else, break the loop
break
# If the call was not successful after 3 attempts, set the response to a timeout message
if response is None:
print("Unfortunately, the connection to ChatGPT timed out. Please try after some time.")
if history is not None and len(history) > 0:
# Update the chat history with the bot's response
history[-1][1] = apply_html(response.text.strip(), "black")
else:
# Print the generated response
print("\nGPT RESPONSE:\n")
print(response.choices[0].text.strip())
if history is not None and len(history) > 0:
# Update the chat history with the bot's response
history[-1][1] = apply_html(response.choices[0].text.strip(), "black")
'''
return history, fileListHistory
# Open the image and convert it to base64
with open(Path("rybot_small.png"), "rb") as img_file:
img_str = base64.b64encode(img_file.read()).decode()
html_code = f'''
<!DOCTYPE html>
<html>
<head>
<style>
.center {{
display: flex;
justify-content: center;
align-items: center;
margin-top: -40px; /* adjust this value as per your requirement */
margin-bottom: 5px;
}}
.large-text {{
font-size: 40px;
font-family: Arial, Helvetica, sans-serif;
font-weight: 900 !important;
margin-left: 5px;
color: #5b5b5b !important;
}}
.image-container {{
display: inline-block;
vertical-align: middle;
height: 50px; /* Twice the font-size */
margin-bottom: 5px;
}}
</style>
</head>
<body>
<div class="center">
<img src="data:image/jpg;base64,{img_str}" alt="RyBOT image" class="image-container" />
<strong class="large-text">RyBOT</strong>
</div>
<br>
<div class="center">
<h3> [ "I'm smart but the humans have me running on a hamster wheel. Please forgive the slow responses." ] </h3>
</div>
</body>
</html>
'''
css = """
.feedback textarea {background-color: #e9f0f7}
.gradio-container {background-color: #eeeeee}
"""
def clear_textbox():
print("Calling CLEAR")
return None
with gr.Blocks(theme=gr.themes.Soft(), css=css, title="RyBOT") as demo:
gr.HTML(html_code)
chatbot = gr.Chatbot([], elem_id="chatbot", label="Chat", color_map=["blue","grey"]).style(height=450)
fileListBot = gr.Chatbot([], elem_id="fileListBot", label="References", color_map=["blue","grey"]).style(height=150)
txt = gr.Textbox(
label="Type your query here:",
placeholder="What would you like to find today?"
).style(container=True)
txt.submit(
add_text,
[chatbot, txt],
[chatbot, txt]
).then(
bot,
[txt, chatbot, fileListBot],
[chatbot, fileListBot]
).then(
clear_textbox,
inputs=None,
outputs=[txt]
)
btn = gr.Button(value="Send")
btn.click(
add_text,
[chatbot, txt],
[chatbot, txt],
).then(
bot,
[txt, chatbot, fileListBot],
[chatbot, fileListBot]
).then(
clear_textbox,
inputs=None,
outputs=[txt]
)
demo.launch()