Spaces:
Build error
Build error
| import os | |
| import time | |
| import numpy as np | |
| import torch | |
| import gradio as gr | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer, AutoProcessor, AutoModelForSpeechSeq2Seq | |
| from datasets import load_dataset | |
| import soundfile as sf | |
| # Global variables to track latency | |
| latency_ASR = 0.0 | |
| latency_LLM = 0.0 | |
| latency_TTS = 0.0 | |
| # Global variables to store conversation state | |
| conversation_history = [] | |
| audio_output = None | |
| # ASR Models | |
| ASR_OPTIONS = { | |
| "Whisper Small": "openai/whisper-small", | |
| "Wav2Vec2": "facebook/wav2vec2-base-960h" | |
| } | |
| # LLM Models | |
| LLM_OPTIONS = { | |
| "Llama-2 7B Chat": "meta-llama/Llama-2-7b-chat-hf", | |
| "Flan-T5 Small": "google/flan-t5-small" | |
| } | |
| # TTS Models | |
| TTS_OPTIONS = { | |
| "VITS": "espnet/kan-bayashi_ljspeech_vits", | |
| "FastSpeech2": "espnet/kan-bayashi_ljspeech_fastspeech2" | |
| } | |
| # Load models | |
| asr_models = {} | |
| llm_models = {} | |
| tts_models = {} | |
| def load_asr_model(model_name): | |
| """Load ASR model from Hugging Face""" | |
| global asr_models | |
| if model_name not in asr_models: | |
| print(f"Loading ASR model: {model_name}") | |
| model_id = ASR_OPTIONS[model_name] | |
| if "whisper" in model_id: | |
| asr_models[model_name] = pipeline("automatic-speech-recognition", model=model_id) | |
| else: # wav2vec2 | |
| processor = AutoProcessor.from_pretrained(model_id) | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id) | |
| asr_models[model_name] = {"processor": processor, "model": model} | |
| return asr_models[model_name] | |
| def load_llm_model(model_name): | |
| """Load LLM model from Hugging Face""" | |
| global llm_models | |
| if model_name not in llm_models: | |
| print(f"Loading LLM model: {model_name}") | |
| model_id = LLM_OPTIONS[model_name] | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| torch_dtype=torch.float16, | |
| device_map="auto" | |
| ) | |
| llm_models[model_name] = { | |
| "model": model, | |
| "tokenizer": tokenizer | |
| } | |
| return llm_models[model_name] | |
| def load_tts_model(model_name): | |
| """Load TTS model using ESPnet""" | |
| global tts_models | |
| if model_name not in tts_models: | |
| print(f"Loading TTS model: {model_name}") | |
| try: | |
| # Import ESPnet TTS modules | |
| from espnet2.bin.tts_inference import Text2Speech | |
| model_id = TTS_OPTIONS[model_name] | |
| tts = Text2Speech.from_pretrained(model_id) | |
| tts_models[model_name] = tts | |
| except ImportError: | |
| print("ESPnet not installed. Using mock TTS for demonstration.") | |
| tts_models[model_name] = "mock_tts" | |
| return tts_models[model_name] | |
| def transcribe_audio(audio_data, sr, asr_model_name): | |
| """Transcribe audio using selected ASR model""" | |
| global latency_ASR | |
| start_time = time.time() | |
| model = load_asr_model(asr_model_name) | |
| if "whisper" in ASR_OPTIONS[asr_model_name]: | |
| result = model({"array": audio_data, "sampling_rate": sr}) | |
| transcript = result["text"] | |
| else: # wav2vec2 | |
| inputs = model["processor"](audio_data, sampling_rate=sr, return_tensors="pt") | |
| with torch.no_grad(): | |
| outputs = model["model"].generate(**inputs) | |
| transcript = model["processor"].batch_decode(outputs, skip_special_tokens=True)[0] | |
| latency_ASR = time.time() - start_time | |
| return transcript | |
| def generate_response(transcript, llm_model_name, system_prompt): | |
| """Generate response using selected LLM model""" | |
| global latency_LLM, conversation_history | |
| start_time = time.time() | |
| model_info = load_llm_model(llm_model_name) | |
| model = model_info["model"] | |
| tokenizer = model_info["tokenizer"] | |
| # Format the prompt based on the model | |
| if "llama" in LLM_OPTIONS[llm_model_name].lower(): | |
| # Format for Llama models | |
| if not conversation_history: | |
| conversation_history.append({"role": "system", "content": system_prompt}) | |
| conversation_history.append({"role": "user", "content": transcript}) | |
| prompt = tokenizer.apply_chat_template( | |
| conversation_history, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| else: | |
| # Format for T5 models | |
| prompt = f"{system_prompt}\nUser: {transcript}\nAssistant:" | |
| # Generate text | |
| input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(model.device) | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| input_ids, | |
| max_new_tokens=100, | |
| temperature=0.7, | |
| top_p=0.9, | |
| ) | |
| # Decode the response | |
| if "llama" in LLM_OPTIONS[llm_model_name].lower(): | |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Extract just the assistant's response | |
| response = response.split("Assistant: ")[-1].strip() | |
| else: | |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Add to conversation history | |
| conversation_history.append({"role": "assistant", "content": response}) | |
| latency_LLM = time.time() - start_time | |
| return response | |
| def synthesize_speech(text, tts_model_name): | |
| """Synthesize speech using selected TTS model""" | |
| global latency_TTS | |
| start_time = time.time() | |
| tts = load_tts_model(tts_model_name) | |
| if tts == "mock_tts": | |
| # Mock TTS response for demonstration | |
| # In a real implementation, this would use the ESPnet model | |
| # Load a sample audio file for demonstration | |
| try: | |
| sample_rate = 16000 | |
| # Generate a simple sine wave as demo audio | |
| duration = 2 # seconds | |
| t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False) | |
| audio_data = 0.5 * np.sin(2 * np.pi * 220 * t) # 220 Hz sine wave | |
| except Exception as e: | |
| print(f"Error generating mock audio: {e}") | |
| audio_data = np.zeros(16000) # 1 second of silence | |
| sample_rate = 16000 | |
| else: | |
| # Use actual ESPnet TTS model | |
| with torch.no_grad(): | |
| wav = tts(text)["wav"] | |
| audio_data = wav.numpy() | |
| sample_rate = tts.fs | |
| latency_TTS = time.time() - start_time | |
| return (sample_rate, audio_data) | |
| def process_speech( | |
| audio_input, | |
| asr_option, | |
| llm_option, | |
| tts_option, | |
| system_prompt | |
| ): | |
| """Process speech: ASR -> LLM -> TTS pipeline""" | |
| global audio_output | |
| # Check if audio input is available | |
| if audio_input is None: | |
| return None, "", "", None | |
| # Get audio data | |
| sr, audio_data = audio_input | |
| # ASR: Speech to text | |
| transcript = transcribe_audio(audio_data, sr, asr_option) | |
| # LLM: Generate response | |
| response = generate_response(transcript, llm_option, system_prompt) | |
| # TTS: Text to speech | |
| audio_output = synthesize_speech(response, tts_option) | |
| # Return results | |
| return audio_input, transcript, response, audio_output | |
| def display_latency(): | |
| """Display latency information""" | |
| return f""" | |
| ASR Latency: {latency_ASR:.2f} seconds | |
| LLM Latency: {latency_LLM:.2f} seconds | |
| TTS Latency: {latency_TTS:.2f} seconds | |
| Total Latency: {latency_ASR + latency_LLM + latency_TTS:.2f} seconds | |
| """ | |
| def reset_conversation(): | |
| """Reset the conversation history""" | |
| global conversation_history, audio_output | |
| conversation_history = [] | |
| audio_output = None | |
| return None, "", "", None, "" | |
| # Create Gradio interface | |
| with gr.Blocks(title="Conversational Speech System") as demo: | |
| gr.Markdown( | |
| """ | |
| # Conversational Speech System with ASR, LLM, and TTS | |
| This demo showcases a complete speech-to-speech conversation system using: | |
| - **ASR** (Automatic Speech Recognition) to convert your speech to text | |
| - **LLM** (Large Language Model) to generate responses | |
| - **TTS** (Text-to-Speech) to convert the responses to speech | |
| Speak into your microphone and the system will respond with synthesized speech. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Input components | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| type="numpy", | |
| label="Speak here", | |
| ) | |
| system_prompt = gr.Textbox( | |
| label="System Prompt (instructions for the LLM)", | |
| value="You are a helpful and friendly AI assistant. Keep your responses concise and under 3 sentences." | |
| ) | |
| asr_dropdown = gr.Dropdown( | |
| choices=list(ASR_OPTIONS.keys()), | |
| value=list(ASR_OPTIONS.keys())[0], | |
| label="Select ASR Model" | |
| ) | |
| llm_dropdown = gr.Dropdown( | |
| choices=list(LLM_OPTIONS.keys()), | |
| value=list(LLM_OPTIONS.keys())[0], | |
| label="Select LLM Model" | |
| ) | |
| tts_dropdown = gr.Dropdown( | |
| choices=list(TTS_OPTIONS.keys()), | |
| value=list(TTS_OPTIONS.keys())[0], | |
| label="Select TTS Model" | |
| ) | |
| reset_btn = gr.Button("Reset Conversation") | |
| with gr.Column(scale=1): | |
| # Output components | |
| user_transcript = gr.Textbox(label="Your Speech (ASR Output)") | |
| system_response = gr.Textbox(label="AI Response (LLM Output)") | |
| audio_output_component = gr.Audio(label="AI Voice Response", autoplay=True) | |
| latency_info = gr.Textbox(label="Performance Metrics") | |
| # Set up event handlers | |
| audio_input.change( | |
| process_speech, | |
| inputs=[audio_input, asr_dropdown, llm_dropdown, tts_dropdown, system_prompt], | |
| outputs=[audio_input, user_transcript, system_response, audio_output_component] | |
| ).then( | |
| display_latency, | |
| inputs=[], | |
| outputs=[latency_info] | |
| ) | |
| reset_btn.click( | |
| reset_conversation, | |
| inputs=[], | |
| outputs=[audio_input, user_transcript, system_response, audio_output_component, latency_info] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() |