import os import tempfile import gradio as gr import openai from typing import Optional, List import hashlib import base64 import json import time from dotenv import load_dotenv from gtts import gTTS import io import numpy as np # Load environment variables load_dotenv() # Initialize OpenAI client with error handling api_key = os.getenv("OPENAI_API_KEY") if not api_key: raise ValueError("OPENAI_API_KEY environment variable is not set") # Initialize OpenAI client with older API syntax openai.api_key = api_key # Custom CSS for a beautiful, modern look custom_css = """ html, body, .gradio-container { height: 100vh !important; min-height: 100vh !important; max-width: 100vw !important; margin: 0 !important; padding: 0 !important; font-family: 'Inter', 'Segoe UI', Arial, sans-serif; background: #f4f7fb; color: #222; } .centered-main { display: flex; flex-direction: column; align-items: center; justify-content: flex-start; min-height: 100vh; width: 100vw; padding-top: 32px; } .compact-box { background: #fff; border-radius: 18px; box-shadow: 0 4px 24px rgba(0, 60, 180, 0.07), 0 1.5px 4px rgba(0,0,0,0.04); padding: 32px 32px 20px 32px; margin-bottom: 32px; width: 100%; max-width: 600px; margin-left: auto; margin-right: auto; border: 1.5px solid #e3e8f0; } .section-title { font-size: 1.25rem; font-weight: 700; margin-bottom: 18px; color: #1a237e; letter-spacing: 0.01em; } .upload-btn, .send-btn, .audio-btn, .reset-btn { background: linear-gradient(135deg, #1976D2 0%, #00bcd4 100%); color: white; border: none; padding: 12px 28px; border-radius: 24px; cursor: pointer; font-weight: 600; font-size: 16px; margin-top: 10px; margin-bottom: 10px; transition: all 0.2s; box-shadow: 0 2px 8px rgba(25, 118, 210, 0.08); } .upload-btn:hover, .send-btn:hover, .audio-btn:hover, .reset-btn:hover { background: linear-gradient(135deg, #00bcd4 0%, #1976D2 100%); box-shadow: 0 4px 16px rgba(0, 188, 212, 0.13); } .gradio-chatbot { border-radius: 14px !important; border: 1.5px solid #e3e8f0 !important; background: #f8fafc !important; padding: 12px !important; min-height: 350px !important; max-height: 400px !important; overflow-y: auto !important; margin-bottom: 10px; } .gradio-audio { margin-top: 12px; margin-bottom: 12px; } .textbox { border-radius: 12px !important; border: 1.5px solid #e3e8f0 !important; padding: 12px !important; font-size: 16px !important; margin-bottom: 10px; background: #f8fafc !important; color: #222 !important; } .textbox:focus { border-color: #1976D2 !important; box-shadow: 0 0 0 2px rgba(25, 118, 210, 0.13) !important; } .status-text { color: #1976D2; font-size: 15px; margin-top: 10px; font-weight: 500; background: #e3f2fd; border-radius: 8px; padding: 8px 12px; } /* File upload area */ input[type="file"]::-webkit-file-upload-button { background: #1976D2; color: #fff; border: none; border-radius: 8px; padding: 8px 18px; font-weight: 600; cursor: pointer; } input[type="file"]::-webkit-file-upload-button:hover { background: #00bcd4; } /* Only one main scroll */ body, .gradio-container, #root, #app { overflow: auto !important; height: 100vh !important; } #component-0, #component-1, #component-2, .chatbot, .chat-container { overflow: visible !important; height: auto !important; max-height: none !important; } """ # Custom audio recorder component with improved styling def create_audio_recorder(): return gr.HTML("""
""") class AdvancedRAG: def __init__(self): self.thread_id: Optional[str] = None self.file_ids: List[str] = [] self.assistant_id: Optional[str] = os.getenv("ASSISTANT_ID") if hasattr(self, 'vector_store_id'): self.vector_store_id = None def create_thread(self) -> str: thread = openai.beta.threads.create() self.thread_id = thread.id return self.thread_id def upload_document(self, file) -> str: # Delete previous file from OpenAI if it exists if self.file_ids: for file_id in self.file_ids: try: openai.files.delete(file_id) except Exception as e: print(f"Warning: Could not delete file {file_id}: {e}") self.thread_id = None self.file_ids = [] if hasattr(self, 'vector_store_id'): try: openai.beta.vector_stores.delete(self.vector_store_id) except Exception as e: print(f"Warning: Could not delete vector store: {e}") self.vector_store_id = None # Wait a moment to ensure deletion is processed time.sleep(2) # Upload new file if not file: raise Exception("No file uploaded.") filename = 'uploaded_file.pdf' with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as tmp: tmp.write(file) tmp.flush() with open(tmp.name, "rb") as file_obj: file_obj = openai.files.create( file=file_obj, purpose="assistants" ) self.file_ids = [file_obj.id] # Create a new thread for the new document thread = openai.beta.threads.create() self.thread_id = thread.id # Send a message in the new thread with only the new file as an attachment openai.beta.threads.messages.create( thread_id=self.thread_id, role="user", content="I have uploaded a document. Please analyze it.", attachments=[{"file_id": self.file_ids[0], "tools": [{"type": "file_search"}]}] ) return self.file_ids[0] def ask_question(self, question: str) -> str: try: if not self.thread_id: self.create_thread() # Add the question to the thread openai.beta.threads.messages.create( thread_id=self.thread_id, role="user", content=question ) # Create a run run = openai.beta.threads.runs.create( thread_id=self.thread_id, assistant_id=self.assistant_id ) # Wait for the run to complete waited = 0 while True: run_status = openai.beta.threads.runs.retrieve( thread_id=self.thread_id, run_id=run.id ) if run_status.status == 'completed': break elif run_status.status == 'failed': raise Exception("Run failed") time.sleep(0.2) waited += 0.2 if waited > 60: raise Exception("Run timed out after 60 seconds.") # Get the latest message messages = openai.beta.threads.messages.list( thread_id=self.thread_id, order='desc', limit=1 ) if not messages.data: return "No response received from the assistant." return messages.data[0].content[0].text.value except Exception as e: return f"[Error: {str(e)}]" def transcribe_audio(self, audio_file): try: with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp.write(audio_file.read()) tmp.flush() tmp_path = tmp.name with open(tmp_path, "rb") as audio: transcript = openai.audio.transcriptions.create( model="whisper-1", file=audio, language="en" ) os.remove(tmp_path) return transcript.text except Exception as e: return f"[Error transcribing audio: {str(e)}]" # Initialize RAG system rag = AdvancedRAG() def process_file(file): if file is None: return "Please upload a file first." try: rag.upload_document(file) return "File uploaded successfully! You can now ask questions about the document." except Exception as e: return f"Error uploading file: {str(e)}" def process_question(question, history): # Prevent sending empty messages if not question or not question.strip(): return "", history, "", None if not rag.thread_id: return "Please upload a document first.", history, "", None try: response = rag.ask_question(question) history.append({"role": "user", "content": question}) history.append({"role": "assistant", "content": response}) return "", history, "", None except Exception as e: history.append({"role": "assistant", "content": f"Error: {str(e)}"}) return "", history, "", None def synthesize_text(text): try: tts = gTTS(text) fp = io.BytesIO() tts.write_to_fp(fp) fp.seek(0) return fp.read() except Exception as e: return None def process_voice_note(audio_file, history): if audio_file is None: return "Please record or upload an audio file.", history, "", None, None try: transcript = None # If audio_file is a string (filepath), open it as a file if isinstance(audio_file, str): with open(audio_file, "rb") as f: transcript = rag.transcribe_audio(f) # If audio_file is a tuple (sample_rate, np.ndarray), save as temp WAV and open elif isinstance(audio_file, tuple) and isinstance(audio_file[1], np.ndarray): import soundfile as sf sample_rate, audio_data = audio_file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: sf.write(tmp.name, audio_data, sample_rate) tmp.flush() with open(tmp.name, "rb") as f: transcript = rag.transcribe_audio(f) else: transcript = rag.transcribe_audio(audio_file) if not transcript or not str(transcript).strip(): history.append({"role": "user", "content": "🎤 [No audio detected or transcription failed]"}) history.append({"role": "assistant", "content": "Sorry, I couldn't understand the audio. Please try again."}) return "", history, "", None, None if not rag.thread_id: return "Please upload a document first.", history, "", None, None response = rag.ask_question(transcript) history.append({"role": "user", "content": f"🎤 {transcript}"}) history.append({"role": "assistant", "content": response}) tts_audio = synthesize_text(response) return "", history, "", None, tts_audio except Exception as e: history.append({"role": "user", "content": f"🎤 [Error transcribing audio: {str(e)}]"}) history.append({"role": "assistant", "content": "It seems there was an error while transcribing audio due to a technical issue. If there's anything specific from the document or any other questions you have regarding the content, please let me know, and I can assist you with that information."}) return "", history, "", None, None def reset_all(): rag.thread_id = None if hasattr(rag, 'file_ids'): rag.file_ids = [] if hasattr(rag, 'vector_store_id'): rag.vector_store_id = None return "", [], "", None, None # Create Gradio interface with improved layout with gr.Blocks(css=custom_css, title="Document Q&A System") as demo: gr.Markdown(""" # Document Q&A System
Upload a document, record your voice, and chat!
""") chatbot = gr.Chatbot(height=400, elem_classes="gradio-chatbot", label=None, type="messages") audio_input = gr.Audio(type="filepath", label="Record or Upload Audio", elem_classes="gradio-audio", visible=False) tts_output = gr.Audio(label="Assistant Voice Reply", interactive=False, visible=False) with gr.Row(): # Left: Document Q&A controls with gr.Column(scale=1, min_width=350): with gr.Group(elem_classes="compact-box"): gr.Markdown("
Document Q&A Controls
") file_input = gr.File(label="Upload Document", file_types=[".pdf", ".txt", ".doc", ".docx"], file_count="single", type="binary", elem_classes="upload-btn") mic_btn = gr.Button("🎤 Record Voice", elem_classes="audio-btn") audio_input send_voice_btn = gr.Button("Send Voice Note", elem_classes="send-btn", visible=False) reset_btn = gr.Button("Reset Chat & Upload New Document", elem_classes="reset-btn") file_output = gr.Textbox(label="Upload Status", interactive=False, elem_classes="textbox") question = gr.Textbox(label="Type your question and press Enter", placeholder="Ask a question about your document...", elem_classes="textbox") file_input.change(process_file, file_input, file_output) def reset_all(): rag.thread_id = None if hasattr(rag, 'file_ids'): rag.file_ids = [] if hasattr(rag, 'vector_store_id'): rag.vector_store_id = None return "", [], "", None, None reset_btn.click(reset_all, None, [file_output, chatbot, question, audio_input, tts_output]) def show_audio(): return {audio_input: gr.update(visible=True), send_voice_btn: gr.update(visible=True)} mic_btn.click(show_audio, None, [audio_input, send_voice_btn]) def hide_audio(): return {audio_input: gr.update(visible=False), send_voice_btn: gr.update(visible=False)} send_voice_btn.click(process_voice_note, [audio_input, chatbot], [file_output, chatbot, question, audio_input, tts_output]) send_voice_btn.click(hide_audio, None, [audio_input, send_voice_btn]) question.submit(process_question, [question, chatbot], [question, chatbot, question, audio_input]) tts_output # Right: Chatbot screen with gr.Column(scale=2, min_width=500): with gr.Group(elem_classes="compact-box"): chatbot # Add JavaScript for audio handling demo.load( fn=None, inputs=None, outputs=None, js=""" function() { window.addEventListener('message', function(event) { if (event.data.type === 'audio_data') { const audioData = event.data.data; const byteString = atob(audioData.split(',')[1]); const mimeString = audioData.split(',')[0].split(':')[1].split(';')[0]; const ab = new ArrayBuffer(byteString.length); const ia = new Uint8Array(ab); for (let i = 0; i < byteString.length; i++) { ia[i] = byteString.charCodeAt(i); } const blob = new Blob([ab], {type: mimeString}); const file = new File([blob], "recording.wav", {type: mimeString}); const audioInput = document.querySelector('input[type="file"]'); const dataTransfer = new DataTransfer(); dataTransfer.items.add(file); audioInput.files = dataTransfer.files; audioInput.dispatchEvent(new Event('change', { bubbles: true })); } }); } """ ) if __name__ == "__main__": demo.launch( share=True, server_name="0.0.0.0", server_port=7860, show_error=True )