Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import gradio as gr | |
import openai | |
from typing import Optional, List | |
import hashlib | |
import base64 | |
import json | |
import time | |
from dotenv import load_dotenv | |
from gtts import gTTS | |
import io | |
import numpy as np | |
# Load environment variables | |
load_dotenv() | |
# Initialize OpenAI client with error handling | |
api_key = os.getenv("OPENAI_API_KEY") | |
if not api_key: | |
raise ValueError("OPENAI_API_KEY environment variable is not set") | |
# Initialize OpenAI client with older API syntax | |
openai.api_key = api_key | |
# Custom CSS for a beautiful, modern look | |
custom_css = """ | |
html, body, .gradio-container { | |
height: 100vh !important; | |
min-height: 100vh !important; | |
max-width: 100vw !important; | |
margin: 0 !important; | |
padding: 0 !important; | |
font-family: 'Inter', 'Segoe UI', Arial, sans-serif; | |
background: #f4f7fb; | |
color: #222; | |
} | |
.centered-main { | |
display: flex; | |
flex-direction: column; | |
align-items: center; | |
justify-content: flex-start; | |
min-height: 100vh; | |
width: 100vw; | |
padding-top: 32px; | |
} | |
.compact-box { | |
background: #fff; | |
border-radius: 18px; | |
box-shadow: 0 4px 24px rgba(0, 60, 180, 0.07), 0 1.5px 4px rgba(0,0,0,0.04); | |
padding: 32px 32px 20px 32px; | |
margin-bottom: 32px; | |
width: 100%; | |
max-width: 600px; | |
margin-left: auto; | |
margin-right: auto; | |
border: 1.5px solid #e3e8f0; | |
} | |
.section-title { | |
font-size: 1.25rem; | |
font-weight: 700; | |
margin-bottom: 18px; | |
color: #1a237e; | |
letter-spacing: 0.01em; | |
} | |
.upload-btn, .send-btn, .audio-btn, .reset-btn { | |
background: linear-gradient(135deg, #1976D2 0%, #00bcd4 100%); | |
color: white; | |
border: none; | |
padding: 12px 28px; | |
border-radius: 24px; | |
cursor: pointer; | |
font-weight: 600; | |
font-size: 16px; | |
margin-top: 10px; | |
margin-bottom: 10px; | |
transition: all 0.2s; | |
box-shadow: 0 2px 8px rgba(25, 118, 210, 0.08); | |
} | |
.upload-btn:hover, .send-btn:hover, .audio-btn:hover, .reset-btn:hover { | |
background: linear-gradient(135deg, #00bcd4 0%, #1976D2 100%); | |
box-shadow: 0 4px 16px rgba(0, 188, 212, 0.13); | |
} | |
.gradio-chatbot { | |
border-radius: 14px !important; | |
border: 1.5px solid #e3e8f0 !important; | |
background: #f8fafc !important; | |
padding: 12px !important; | |
min-height: 350px !important; | |
max-height: 400px !important; | |
overflow-y: auto !important; | |
margin-bottom: 10px; | |
} | |
.gradio-audio { | |
margin-top: 12px; | |
margin-bottom: 12px; | |
} | |
.textbox { | |
border-radius: 12px !important; | |
border: 1.5px solid #e3e8f0 !important; | |
padding: 12px !important; | |
font-size: 16px !important; | |
margin-bottom: 10px; | |
background: #f8fafc !important; | |
color: #222 !important; | |
} | |
.textbox:focus { | |
border-color: #1976D2 !important; | |
box-shadow: 0 0 0 2px rgba(25, 118, 210, 0.13) !important; | |
} | |
.status-text { | |
color: #1976D2; | |
font-size: 15px; | |
margin-top: 10px; | |
font-weight: 500; | |
background: #e3f2fd; | |
border-radius: 8px; | |
padding: 8px 12px; | |
} | |
/* File upload area */ | |
input[type="file"]::-webkit-file-upload-button { | |
background: #1976D2; | |
color: #fff; | |
border: none; | |
border-radius: 8px; | |
padding: 8px 18px; | |
font-weight: 600; | |
cursor: pointer; | |
} | |
input[type="file"]::-webkit-file-upload-button:hover { | |
background: #00bcd4; | |
} | |
/* Only one main scroll */ | |
body, .gradio-container, #root, #app { | |
overflow: auto !important; | |
height: 100vh !important; | |
} | |
#component-0, #component-1, #component-2, .chatbot, .chat-container { | |
overflow: visible !important; | |
height: auto !important; | |
max-height: none !important; | |
} | |
""" | |
# Custom audio recorder component with improved styling | |
def create_audio_recorder(): | |
return gr.HTML(""" | |
<div class="audio-recorder"> | |
<button id="recordButton" class="record-button"> | |
<span class="record-icon">π€</span> | |
<span class="record-text">Start Recording</span> | |
</button> | |
<div id="recordingStatus" class="status-text"></div> | |
<audio id="audioPlayback" controls style="display: none; margin-top: 10px;"></audio> | |
</div> | |
<script> | |
let mediaRecorder; | |
let audioChunks = []; | |
let isRecording = false; | |
const recordButton = document.getElementById('recordButton'); | |
const recordingStatus = document.getElementById('recordingStatus'); | |
const audioPlayback = document.getElementById('audioPlayback'); | |
recordButton.addEventListener('click', async () => { | |
if (!isRecording) { | |
try { | |
const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); | |
mediaRecorder = new MediaRecorder(stream); | |
audioChunks = []; | |
mediaRecorder.ondataavailable = (event) => { | |
audioChunks.push(event.data); | |
}; | |
mediaRecorder.onstop = () => { | |
const audioBlob = new Blob(audioChunks, { type: 'audio/wav' }); | |
const audioUrl = URL.createObjectURL(audioBlob); | |
audioPlayback.src = audioUrl; | |
audioPlayback.style.display = 'block'; | |
const reader = new FileReader(); | |
reader.readAsDataURL(audioBlob); | |
reader.onloadend = () => { | |
const base64Audio = reader.result; | |
window.parent.postMessage({ | |
type: 'audio_data', | |
data: base64Audio | |
}, '*'); | |
}; | |
}; | |
mediaRecorder.start(); | |
isRecording = true; | |
recordButton.classList.add('recording'); | |
recordButton.querySelector('.record-text').textContent = 'Stop Recording'; | |
recordingStatus.textContent = 'Recording...'; | |
} catch (err) { | |
console.error('Error accessing microphone:', err); | |
recordingStatus.textContent = 'Error accessing microphone'; | |
} | |
} else { | |
mediaRecorder.stop(); | |
isRecording = false; | |
recordButton.classList.remove('recording'); | |
recordButton.querySelector('.record-text').textContent = 'Start Recording'; | |
recordingStatus.textContent = 'Recording saved'; | |
} | |
}); | |
</script> | |
""") | |
class AdvancedRAG: | |
def __init__(self): | |
self.thread_id: Optional[str] = None | |
self.file_ids: List[str] = [] | |
self.assistant_id: Optional[str] = os.getenv("ASSISTANT_ID") | |
if hasattr(self, 'vector_store_id'): | |
self.vector_store_id = None | |
def create_thread(self) -> str: | |
thread = openai.beta.threads.create() | |
self.thread_id = thread.id | |
return self.thread_id | |
def upload_document(self, file) -> str: | |
# Delete previous file from OpenAI if it exists | |
if self.file_ids: | |
for file_id in self.file_ids: | |
try: | |
openai.files.delete(file_id) | |
except Exception as e: | |
print(f"Warning: Could not delete file {file_id}: {e}") | |
self.thread_id = None | |
self.file_ids = [] | |
if hasattr(self, 'vector_store_id'): | |
try: | |
openai.beta.vector_stores.delete(self.vector_store_id) | |
except Exception as e: | |
print(f"Warning: Could not delete vector store: {e}") | |
self.vector_store_id = None | |
# Wait a moment to ensure deletion is processed | |
time.sleep(2) | |
# Upload new file | |
if not file: | |
raise Exception("No file uploaded.") | |
filename = 'uploaded_file.pdf' | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as tmp: | |
tmp.write(file) | |
tmp.flush() | |
with open(tmp.name, "rb") as file_obj: | |
file_obj = openai.files.create( | |
file=file_obj, | |
purpose="assistants" | |
) | |
self.file_ids = [file_obj.id] | |
# Create a new thread for the new document | |
thread = openai.beta.threads.create() | |
self.thread_id = thread.id | |
# Send a message in the new thread with only the new file as an attachment | |
openai.beta.threads.messages.create( | |
thread_id=self.thread_id, | |
role="user", | |
content="I have uploaded a document. Please analyze it.", | |
attachments=[{"file_id": self.file_ids[0], "tools": [{"type": "file_search"}]}] | |
) | |
return self.file_ids[0] | |
def ask_question(self, question: str) -> str: | |
try: | |
if not self.thread_id: | |
self.create_thread() | |
# Add the question to the thread | |
openai.beta.threads.messages.create( | |
thread_id=self.thread_id, | |
role="user", | |
content=question | |
) | |
# Create a run | |
run = openai.beta.threads.runs.create( | |
thread_id=self.thread_id, | |
assistant_id=self.assistant_id | |
) | |
# Wait for the run to complete | |
waited = 0 | |
while True: | |
run_status = openai.beta.threads.runs.retrieve( | |
thread_id=self.thread_id, | |
run_id=run.id | |
) | |
if run_status.status == 'completed': | |
break | |
elif run_status.status == 'failed': | |
raise Exception("Run failed") | |
time.sleep(0.2) | |
waited += 0.2 | |
if waited > 60: | |
raise Exception("Run timed out after 60 seconds.") | |
# Get the latest message | |
messages = openai.beta.threads.messages.list( | |
thread_id=self.thread_id, | |
order='desc', | |
limit=1 | |
) | |
if not messages.data: | |
return "No response received from the assistant." | |
return messages.data[0].content[0].text.value | |
except Exception as e: | |
return f"[Error: {str(e)}]" | |
def transcribe_audio(self, audio_file): | |
try: | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
tmp.write(audio_file.read()) | |
tmp.flush() | |
tmp_path = tmp.name | |
with open(tmp_path, "rb") as audio: | |
transcript = openai.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio, | |
language="en" | |
) | |
os.remove(tmp_path) | |
return transcript.text | |
except Exception as e: | |
return f"[Error transcribing audio: {str(e)}]" | |
# Initialize RAG system | |
rag = AdvancedRAG() | |
def process_file(file): | |
if file is None: | |
return "Please upload a file first." | |
try: | |
rag.upload_document(file) | |
return "File uploaded successfully! You can now ask questions about the document." | |
except Exception as e: | |
return f"Error uploading file: {str(e)}" | |
def process_question(question, history): | |
# Prevent sending empty messages | |
if not question or not question.strip(): | |
return "", history, "", None | |
if not rag.thread_id: | |
return "Please upload a document first.", history, "", None | |
try: | |
response = rag.ask_question(question) | |
history.append({"role": "user", "content": question}) | |
history.append({"role": "assistant", "content": response}) | |
return "", history, "", None | |
except Exception as e: | |
history.append({"role": "assistant", "content": f"Error: {str(e)}"}) | |
return "", history, "", None | |
def synthesize_text(text): | |
try: | |
tts = gTTS(text) | |
fp = io.BytesIO() | |
tts.write_to_fp(fp) | |
fp.seek(0) | |
return fp.read() | |
except Exception as e: | |
return None | |
def process_voice_note(audio_file, history): | |
if audio_file is None: | |
return "Please record or upload an audio file.", history, "", None, None | |
try: | |
transcript = None | |
# If audio_file is a string (filepath), open it as a file | |
if isinstance(audio_file, str): | |
with open(audio_file, "rb") as f: | |
transcript = rag.transcribe_audio(f) | |
# If audio_file is a tuple (sample_rate, np.ndarray), save as temp WAV and open | |
elif isinstance(audio_file, tuple) and isinstance(audio_file[1], np.ndarray): | |
import soundfile as sf | |
sample_rate, audio_data = audio_file | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
sf.write(tmp.name, audio_data, sample_rate) | |
tmp.flush() | |
with open(tmp.name, "rb") as f: | |
transcript = rag.transcribe_audio(f) | |
else: | |
transcript = rag.transcribe_audio(audio_file) | |
if not transcript or not str(transcript).strip(): | |
history.append({"role": "user", "content": "π€ [No audio detected or transcription failed]"}) | |
history.append({"role": "assistant", "content": "Sorry, I couldn't understand the audio. Please try again."}) | |
return "", history, "", None, None | |
if not rag.thread_id: | |
return "Please upload a document first.", history, "", None, None | |
response = rag.ask_question(transcript) | |
history.append({"role": "user", "content": f"π€ {transcript}"}) | |
history.append({"role": "assistant", "content": response}) | |
tts_audio = synthesize_text(response) | |
return "", history, "", None, tts_audio | |
except Exception as e: | |
history.append({"role": "user", "content": f"π€ [Error transcribing audio: {str(e)}]"}) | |
history.append({"role": "assistant", "content": "It seems there was an error while transcribing audio due to a technical issue. If there's anything specific from the document or any other questions you have regarding the content, please let me know, and I can assist you with that information."}) | |
return "", history, "", None, None | |
def reset_all(): | |
rag.thread_id = None | |
if hasattr(rag, 'file_ids'): | |
rag.file_ids = [] | |
if hasattr(rag, 'vector_store_id'): | |
rag.vector_store_id = None | |
return "", [], "", None, None | |
# Create Gradio interface with improved layout | |
with gr.Blocks(css=custom_css, title="Document Q&A System") as demo: | |
gr.Markdown(""" | |
# <span style='color:#1976D2;'>Document Q&A System</span> | |
<div style='text-align:center; color:#1976D2; margin-bottom:18px;'>Upload a document, record your voice, and chat!</div> | |
""") | |
chatbot = gr.Chatbot(height=400, elem_classes="gradio-chatbot", label=None, type="messages") | |
audio_input = gr.Audio(type="filepath", label="Record or Upload Audio", elem_classes="gradio-audio", visible=False) | |
tts_output = gr.Audio(label="Assistant Voice Reply", interactive=False, visible=False) | |
with gr.Row(): | |
# Left: Document Q&A controls | |
with gr.Column(scale=1, min_width=350): | |
with gr.Group(elem_classes="compact-box"): | |
gr.Markdown("<div class='section-title'>Document Q&A Controls</div>") | |
file_input = gr.File(label="Upload Document", file_types=[".pdf", ".txt", ".doc", ".docx"], file_count="single", type="binary", elem_classes="upload-btn") | |
mic_btn = gr.Button("π€ Record Voice", elem_classes="audio-btn") | |
audio_input | |
send_voice_btn = gr.Button("Send Voice Note", elem_classes="send-btn", visible=False) | |
reset_btn = gr.Button("Reset Chat & Upload New Document", elem_classes="reset-btn") | |
file_output = gr.Textbox(label="Upload Status", interactive=False, elem_classes="textbox") | |
question = gr.Textbox(label="Type your question and press Enter", placeholder="Ask a question about your document...", elem_classes="textbox") | |
file_input.change(process_file, file_input, file_output) | |
def reset_all(): | |
rag.thread_id = None | |
if hasattr(rag, 'file_ids'): | |
rag.file_ids = [] | |
if hasattr(rag, 'vector_store_id'): | |
rag.vector_store_id = None | |
return "", [], "", None, None | |
reset_btn.click(reset_all, None, [file_output, chatbot, question, audio_input, tts_output]) | |
def show_audio(): | |
return {audio_input: gr.update(visible=True), send_voice_btn: gr.update(visible=True)} | |
mic_btn.click(show_audio, None, [audio_input, send_voice_btn]) | |
def hide_audio(): | |
return {audio_input: gr.update(visible=False), send_voice_btn: gr.update(visible=False)} | |
send_voice_btn.click(process_voice_note, [audio_input, chatbot], [file_output, chatbot, question, audio_input, tts_output]) | |
send_voice_btn.click(hide_audio, None, [audio_input, send_voice_btn]) | |
question.submit(process_question, [question, chatbot], [question, chatbot, question, audio_input]) | |
tts_output | |
# Right: Chatbot screen | |
with gr.Column(scale=2, min_width=500): | |
with gr.Group(elem_classes="compact-box"): | |
chatbot | |
# Add JavaScript for audio handling | |
demo.load( | |
fn=None, | |
inputs=None, | |
outputs=None, | |
js=""" | |
function() { | |
window.addEventListener('message', function(event) { | |
if (event.data.type === 'audio_data') { | |
const audioData = event.data.data; | |
const byteString = atob(audioData.split(',')[1]); | |
const mimeString = audioData.split(',')[0].split(':')[1].split(';')[0]; | |
const ab = new ArrayBuffer(byteString.length); | |
const ia = new Uint8Array(ab); | |
for (let i = 0; i < byteString.length; i++) { | |
ia[i] = byteString.charCodeAt(i); | |
} | |
const blob = new Blob([ab], {type: mimeString}); | |
const file = new File([blob], "recording.wav", {type: mimeString}); | |
const audioInput = document.querySelector('input[type="file"]'); | |
const dataTransfer = new DataTransfer(); | |
dataTransfer.items.add(file); | |
audioInput.files = dataTransfer.files; | |
audioInput.dispatchEvent(new Event('change', { bubbles: true })); | |
} | |
}); | |
} | |
""" | |
) | |
if __name__ == "__main__": | |
demo.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |