import os
import tempfile
import gradio as gr
import openai
from typing import Optional, List
import hashlib
import base64
import json
import time
from dotenv import load_dotenv
from gtts import gTTS
import io
import numpy as np
# Load environment variables
load_dotenv()
# Initialize OpenAI client with error handling
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable is not set")
# Initialize OpenAI client with older API syntax
openai.api_key = api_key
# Custom CSS for a beautiful, modern look
custom_css = """
html, body, .gradio-container {
height: 100vh !important;
min-height: 100vh !important;
max-width: 100vw !important;
margin: 0 !important;
padding: 0 !important;
font-family: 'Inter', 'Segoe UI', Arial, sans-serif;
background: #f4f7fb;
color: #222;
}
.centered-main {
display: flex;
flex-direction: column;
align-items: center;
justify-content: flex-start;
min-height: 100vh;
width: 100vw;
padding-top: 32px;
}
.compact-box {
background: #fff;
border-radius: 18px;
box-shadow: 0 4px 24px rgba(0, 60, 180, 0.07), 0 1.5px 4px rgba(0,0,0,0.04);
padding: 32px 32px 20px 32px;
margin-bottom: 32px;
width: 100%;
max-width: 600px;
margin-left: auto;
margin-right: auto;
border: 1.5px solid #e3e8f0;
}
.section-title {
font-size: 1.25rem;
font-weight: 700;
margin-bottom: 18px;
color: #1a237e;
letter-spacing: 0.01em;
}
.upload-btn, .send-btn, .audio-btn, .reset-btn {
background: linear-gradient(135deg, #1976D2 0%, #00bcd4 100%);
color: white;
border: none;
padding: 12px 28px;
border-radius: 24px;
cursor: pointer;
font-weight: 600;
font-size: 16px;
margin-top: 10px;
margin-bottom: 10px;
transition: all 0.2s;
box-shadow: 0 2px 8px rgba(25, 118, 210, 0.08);
}
.upload-btn:hover, .send-btn:hover, .audio-btn:hover, .reset-btn:hover {
background: linear-gradient(135deg, #00bcd4 0%, #1976D2 100%);
box-shadow: 0 4px 16px rgba(0, 188, 212, 0.13);
}
.gradio-chatbot {
border-radius: 14px !important;
border: 1.5px solid #e3e8f0 !important;
background: #f8fafc !important;
padding: 12px !important;
min-height: 350px !important;
max-height: 400px !important;
overflow-y: auto !important;
margin-bottom: 10px;
}
.gradio-audio {
margin-top: 12px;
margin-bottom: 12px;
}
.textbox {
border-radius: 12px !important;
border: 1.5px solid #e3e8f0 !important;
padding: 12px !important;
font-size: 16px !important;
margin-bottom: 10px;
background: #f8fafc !important;
color: #222 !important;
}
.textbox:focus {
border-color: #1976D2 !important;
box-shadow: 0 0 0 2px rgba(25, 118, 210, 0.13) !important;
}
.status-text {
color: #1976D2;
font-size: 15px;
margin-top: 10px;
font-weight: 500;
background: #e3f2fd;
border-radius: 8px;
padding: 8px 12px;
}
/* File upload area */
input[type="file"]::-webkit-file-upload-button {
background: #1976D2;
color: #fff;
border: none;
border-radius: 8px;
padding: 8px 18px;
font-weight: 600;
cursor: pointer;
}
input[type="file"]::-webkit-file-upload-button:hover {
background: #00bcd4;
}
/* Only one main scroll */
body, .gradio-container, #root, #app {
overflow: auto !important;
height: 100vh !important;
}
#component-0, #component-1, #component-2, .chatbot, .chat-container {
overflow: visible !important;
height: auto !important;
max-height: none !important;
}
"""
# Custom audio recorder component with improved styling
def create_audio_recorder():
return gr.HTML("""
""")
class AdvancedRAG:
def __init__(self):
self.thread_id: Optional[str] = None
self.file_ids: List[str] = []
self.assistant_id: Optional[str] = os.getenv("ASSISTANT_ID")
if hasattr(self, 'vector_store_id'):
self.vector_store_id = None
def create_thread(self) -> str:
thread = openai.beta.threads.create()
self.thread_id = thread.id
return self.thread_id
def upload_document(self, file) -> str:
# Delete previous file from OpenAI if it exists
if self.file_ids:
for file_id in self.file_ids:
try:
openai.files.delete(file_id)
except Exception as e:
print(f"Warning: Could not delete file {file_id}: {e}")
self.thread_id = None
self.file_ids = []
if hasattr(self, 'vector_store_id'):
try:
openai.beta.vector_stores.delete(self.vector_store_id)
except Exception as e:
print(f"Warning: Could not delete vector store: {e}")
self.vector_store_id = None
# Wait a moment to ensure deletion is processed
time.sleep(2)
# Upload new file
if not file:
raise Exception("No file uploaded.")
filename = 'uploaded_file.pdf'
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as tmp:
tmp.write(file)
tmp.flush()
with open(tmp.name, "rb") as file_obj:
file_obj = openai.files.create(
file=file_obj,
purpose="assistants"
)
self.file_ids = [file_obj.id]
# Create a new thread for the new document
thread = openai.beta.threads.create()
self.thread_id = thread.id
# Send a message in the new thread with only the new file as an attachment
openai.beta.threads.messages.create(
thread_id=self.thread_id,
role="user",
content="I have uploaded a document. Please analyze it.",
attachments=[{"file_id": self.file_ids[0], "tools": [{"type": "file_search"}]}]
)
return self.file_ids[0]
def ask_question(self, question: str) -> str:
try:
if not self.thread_id:
self.create_thread()
# Add the question to the thread
openai.beta.threads.messages.create(
thread_id=self.thread_id,
role="user",
content=question
)
# Create a run
run = openai.beta.threads.runs.create(
thread_id=self.thread_id,
assistant_id=self.assistant_id
)
# Wait for the run to complete
waited = 0
while True:
run_status = openai.beta.threads.runs.retrieve(
thread_id=self.thread_id,
run_id=run.id
)
if run_status.status == 'completed':
break
elif run_status.status == 'failed':
raise Exception("Run failed")
time.sleep(0.2)
waited += 0.2
if waited > 60:
raise Exception("Run timed out after 60 seconds.")
# Get the latest message
messages = openai.beta.threads.messages.list(
thread_id=self.thread_id,
order='desc',
limit=1
)
if not messages.data:
return "No response received from the assistant."
return messages.data[0].content[0].text.value
except Exception as e:
return f"[Error: {str(e)}]"
def transcribe_audio(self, audio_file):
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_file.read())
tmp.flush()
tmp_path = tmp.name
with open(tmp_path, "rb") as audio:
transcript = openai.audio.transcriptions.create(
model="whisper-1",
file=audio,
language="en"
)
os.remove(tmp_path)
return transcript.text
except Exception as e:
return f"[Error transcribing audio: {str(e)}]"
# Initialize RAG system
rag = AdvancedRAG()
def process_file(file):
if file is None:
return "Please upload a file first."
try:
rag.upload_document(file)
return "File uploaded successfully! You can now ask questions about the document."
except Exception as e:
return f"Error uploading file: {str(e)}"
def process_question(question, history):
# Prevent sending empty messages
if not question or not question.strip():
return "", history, "", None
if not rag.thread_id:
return "Please upload a document first.", history, "", None
try:
response = rag.ask_question(question)
history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": response})
return "", history, "", None
except Exception as e:
history.append({"role": "assistant", "content": f"Error: {str(e)}"})
return "", history, "", None
def synthesize_text(text):
try:
tts = gTTS(text)
fp = io.BytesIO()
tts.write_to_fp(fp)
fp.seek(0)
return fp.read()
except Exception as e:
return None
def process_voice_note(audio_file, history):
if audio_file is None:
return "Please record or upload an audio file.", history, "", None, None
try:
transcript = None
# If audio_file is a string (filepath), open it as a file
if isinstance(audio_file, str):
with open(audio_file, "rb") as f:
transcript = rag.transcribe_audio(f)
# If audio_file is a tuple (sample_rate, np.ndarray), save as temp WAV and open
elif isinstance(audio_file, tuple) and isinstance(audio_file[1], np.ndarray):
import soundfile as sf
sample_rate, audio_data = audio_file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
sf.write(tmp.name, audio_data, sample_rate)
tmp.flush()
with open(tmp.name, "rb") as f:
transcript = rag.transcribe_audio(f)
else:
transcript = rag.transcribe_audio(audio_file)
if not transcript or not str(transcript).strip():
history.append({"role": "user", "content": "🎤 [No audio detected or transcription failed]"})
history.append({"role": "assistant", "content": "Sorry, I couldn't understand the audio. Please try again."})
return "", history, "", None, None
if not rag.thread_id:
return "Please upload a document first.", history, "", None, None
response = rag.ask_question(transcript)
history.append({"role": "user", "content": f"🎤 {transcript}"})
history.append({"role": "assistant", "content": response})
tts_audio = synthesize_text(response)
return "", history, "", None, tts_audio
except Exception as e:
history.append({"role": "user", "content": f"🎤 [Error transcribing audio: {str(e)}]"})
history.append({"role": "assistant", "content": "It seems there was an error while transcribing audio due to a technical issue. If there's anything specific from the document or any other questions you have regarding the content, please let me know, and I can assist you with that information."})
return "", history, "", None, None
def reset_all():
rag.thread_id = None
if hasattr(rag, 'file_ids'):
rag.file_ids = []
if hasattr(rag, 'vector_store_id'):
rag.vector_store_id = None
return "", [], "", None, None
# Create Gradio interface with improved layout
with gr.Blocks(css=custom_css, title="Document Q&A System") as demo:
gr.Markdown("""
# Document Q&A System
Upload a document, record your voice, and chat!
""")
chatbot = gr.Chatbot(height=400, elem_classes="gradio-chatbot", label=None, type="messages")
audio_input = gr.Audio(type="filepath", label="Record or Upload Audio", elem_classes="gradio-audio", visible=False)
tts_output = gr.Audio(label="Assistant Voice Reply", interactive=False, visible=False)
with gr.Row():
# Left: Document Q&A controls
with gr.Column(scale=1, min_width=350):
with gr.Group(elem_classes="compact-box"):
gr.Markdown("Document Q&A Controls
")
file_input = gr.File(label="Upload Document", file_types=[".pdf", ".txt", ".doc", ".docx"], file_count="single", type="binary", elem_classes="upload-btn")
mic_btn = gr.Button("🎤 Record Voice", elem_classes="audio-btn")
audio_input
send_voice_btn = gr.Button("Send Voice Note", elem_classes="send-btn", visible=False)
reset_btn = gr.Button("Reset Chat & Upload New Document", elem_classes="reset-btn")
file_output = gr.Textbox(label="Upload Status", interactive=False, elem_classes="textbox")
question = gr.Textbox(label="Type your question and press Enter", placeholder="Ask a question about your document...", elem_classes="textbox")
file_input.change(process_file, file_input, file_output)
def reset_all():
rag.thread_id = None
if hasattr(rag, 'file_ids'):
rag.file_ids = []
if hasattr(rag, 'vector_store_id'):
rag.vector_store_id = None
return "", [], "", None, None
reset_btn.click(reset_all, None, [file_output, chatbot, question, audio_input, tts_output])
def show_audio():
return {audio_input: gr.update(visible=True), send_voice_btn: gr.update(visible=True)}
mic_btn.click(show_audio, None, [audio_input, send_voice_btn])
def hide_audio():
return {audio_input: gr.update(visible=False), send_voice_btn: gr.update(visible=False)}
send_voice_btn.click(process_voice_note, [audio_input, chatbot], [file_output, chatbot, question, audio_input, tts_output])
send_voice_btn.click(hide_audio, None, [audio_input, send_voice_btn])
question.submit(process_question, [question, chatbot], [question, chatbot, question, audio_input])
tts_output
# Right: Chatbot screen
with gr.Column(scale=2, min_width=500):
with gr.Group(elem_classes="compact-box"):
chatbot
# Add JavaScript for audio handling
demo.load(
fn=None,
inputs=None,
outputs=None,
js="""
function() {
window.addEventListener('message', function(event) {
if (event.data.type === 'audio_data') {
const audioData = event.data.data;
const byteString = atob(audioData.split(',')[1]);
const mimeString = audioData.split(',')[0].split(':')[1].split(';')[0];
const ab = new ArrayBuffer(byteString.length);
const ia = new Uint8Array(ab);
for (let i = 0; i < byteString.length; i++) {
ia[i] = byteString.charCodeAt(i);
}
const blob = new Blob([ab], {type: mimeString});
const file = new File([blob], "recording.wav", {type: mimeString});
const audioInput = document.querySelector('input[type="file"]');
const dataTransfer = new DataTransfer();
dataTransfer.items.add(file);
audioInput.files = dataTransfer.files;
audioInput.dispatchEvent(new Event('change', { bubbles: true }));
}
});
}
"""
)
if __name__ == "__main__":
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)