Spaces:
Sleeping
Sleeping
import gradio as gr | |
from openai import OpenAI | |
import time, re | |
import os | |
from dotenv import load_dotenv | |
import requests | |
load_dotenv() | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
ASSISTANT_ID = os.getenv("ASSISTANT_ID") | |
BUBBLE_API_TOKEN = os.getenv("BUBBLE_API_TOKEN") | |
BUBBLE_API_URL = os.getenv("BUBBLE_API_URL") | |
client = OpenAI() | |
# Global variable for storing the thread ID | |
thread_id = None | |
def get_or_create_thread(): | |
"""Creates a new thread if one doesn't exist.""" | |
global thread_id | |
if thread_id is None: | |
thread = client.beta.threads.create() | |
thread_id = thread.id | |
return thread_id | |
def stream_response(response_text): | |
""" | |
Reformats the response to insert paragraph breaks and streams it in chunks. | |
Inserts two newlines after punctuation and then yields the growing text. | |
""" | |
formatted_response = re.sub(r'([.!?])\s+', r'\1\n\n', response_text) | |
words = formatted_response.split(" ") | |
current_text = "" | |
for word in words: | |
current_text += word + " " | |
yield current_text.strip() | |
time.sleep(0.05) # Adjust delay for a natural streaming effect | |
yield current_text.strip() | |
def extract_ingredients(response_text): | |
""" | |
Extracts a list of ingredients from the response text. | |
Looks for lines that start with a number (followed by a dot or parenthesis), dash, or bullet. | |
""" | |
pattern = r'^\s*(?:\d+[\.\)]|-|\*)\s*(.+)$' | |
ingredients = [] | |
for line in response_text.splitlines(): | |
match = re.match(pattern, line) | |
if match: | |
ingredient = match.group(1).strip() | |
ingredients.append(ingredient) | |
return ingredients | |
def send_to_bubble_database(ingredients, subject): | |
""" | |
Sends the list of ingredients and the subject (user's message) to your Bubble application's database via an API endpoint. | |
""" | |
bubble_api_url = BUBBLE_API_URL | |
payload = { | |
"ingredients": ingredients, | |
"subject": subject, | |
"api_token": BUBBLE_API_TOKEN | |
} | |
try: | |
response = requests.post(bubble_api_url, json=payload) | |
if response.status_code == 200: | |
print("Ingredients and subject successfully sent to Bubble database.") | |
else: | |
print(f"Failed to send data to Bubble. Status: {response.status_code}, Response: {response.text}") | |
except Exception as e: | |
print("Error sending data to Bubble:", str(e)) | |
def chat(message, history): | |
""" | |
Sends the user's message, waits for the assistant's reply, streams the reformatted response, | |
and then returns only the new conversation for this call (without appending previous history). | |
""" | |
# Reset history to an empty list so that previous conversation is not included. | |
history = [] | |
assistant_id = ASSISTANT_ID | |
get_or_create_thread() | |
# Get the current messages so we can detect new ones later. | |
old_messages = client.beta.threads.messages.list(thread_id=thread_id).data | |
old_ids = {msg.id for msg in old_messages} | |
# Send the user's message. | |
client.beta.threads.messages.create( | |
thread_id=thread_id, | |
role="user", | |
content=message | |
) | |
# Start a run so the assistant processes the conversation. | |
run = client.beta.threads.runs.create( | |
thread_id=thread_id, | |
assistant_id=assistant_id | |
) | |
# Wait for the run to complete. | |
while True: | |
run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) | |
if run_status.status == "completed": | |
break | |
time.sleep(1) | |
# Fetch updated messages. | |
new_messages = client.beta.threads.messages.list(thread_id=thread_id).data | |
# Identify the new assistant message. | |
assistant_response = "" | |
for msg in new_messages: | |
if msg.role == "assistant" and msg.id not in old_ids: | |
assistant_response = msg.content[0].text.value | |
break | |
# Fallback: if no new message is found, use the latest assistant message. | |
if not assistant_response: | |
for msg in reversed(new_messages): | |
if msg.role == "assistant": | |
assistant_response = msg.content[0].text.value | |
break | |
# Stream the assistant's response. | |
current_assistant_message = "" | |
for chunk in stream_response(assistant_response): | |
current_assistant_message = chunk | |
yield [ | |
{"role": "user", "content": message}, | |
{"role": "assistant", "content": current_assistant_message} | |
] | |
# Once streaming is complete, extract ingredients. | |
ingredients = extract_ingredients(assistant_response) | |
if ingredients: | |
send_to_bubble_database(ingredients, subject=message) | |
info_message = "[INFO] Ingredients list has been sent to your email." | |
else: | |
info_message = "[INFO] No ingredients list found in the assistant response." | |
final_response = [ | |
{"role": "user", "content": message}, | |
{"role": "assistant", "content": current_assistant_message}, | |
{"role": "assistant", "content": info_message} | |
] | |
yield final_response | |
# Optional: Custom CSS for styling the Gradio interface. | |
custom_css = """ | |
.gradio-container { | |
background-color: #186A27 !important; | |
} | |
#component-404 { | |
background-color: white !important; | |
} | |
""" | |
# Create the Gradio Chat Interface. | |
chat_interface = gr.ChatInterface( | |
fn=chat, | |
title="AfrocourierAI", | |
description="Let’s talk African food! From jollof rice to suya, I’ve got you covered—what’s on your mind?", | |
type="messages", | |
css=custom_css | |
) | |
chat_interface.launch(show_api=False) | |