Devakumar868's picture
Update app.py
e4a1156 verified
import gradio as gr
import torch
import numpy as np
import librosa
import soundfile as sf
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import warnings
import json
import time
from datetime import datetime
import os
import sys
import gc
# Import with enhanced error handling
try:
from dia.model import Dia
DIA_AVAILABLE = True
print("βœ… Dia TTS library imported successfully")
except ImportError as e:
print(f"⚠️ Dia TTS not available: {e}")
DIA_AVAILABLE = False
warnings.filterwarnings("ignore")
# Global models
asr_pipe = None
qwen_model = None
qwen_tokenizer = None
tts_model = None
tts_type = None
class ConversationManager:
def __init__(self, max_exchanges=5):
self.history = []
self.max_exchanges = max_exchanges
self.current_emotion = "neutral"
def add_exchange(self, user_input, ai_response, emotion="neutral"):
self.history.append({
"timestamp": datetime.now().isoformat(),
"user": user_input,
"ai": ai_response,
"emotion": emotion
})
if len(self.history) > self.max_exchanges:
self.history = self.history[-self.max_exchanges:]
def get_context(self):
context = ""
for exchange in self.history[-3:]:
context += f"User: {exchange['user']}\nAI: {exchange['ai']}\n"
return context
def clear(self):
self.history = []
self.current_emotion = "neutral"
def optimize_gpu_memory():
"""Optimize GPU memory usage"""
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
gc.collect()
def check_system_info():
"""Check system capabilities"""
print("πŸ” System Information:")
print(f"Python: {sys.version}")
print(f"PyTorch: {torch.__version__}")
if torch.cuda.is_available():
print(f"βœ… CUDA: {torch.cuda.get_device_name()}")
print(f"πŸ’Ύ GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
print(f"πŸ”₯ CUDA Version: {torch.version.cuda}")
# Check current memory usage
allocated = torch.cuda.memory_allocated() / 1e9
cached = torch.cuda.memory_reserved() / 1e9
print(f"πŸ“Š Current GPU Usage: {allocated:.1f}GB allocated, {cached:.1f}GB cached")
else:
print("⚠️ CUDA not available, using CPU")
def load_models():
"""Load all models with FIXED Dia loading"""
global asr_pipe, qwen_model, qwen_tokenizer, tts_model, tts_type
print("πŸš€ Loading Maya AI models...")
optimize_gpu_memory()
# Load ASR model (Whisper)
print("🎀 Loading Whisper for ASR...")
try:
asr_pipe = pipeline(
"automatic-speech-recognition",
model="openai/whisper-base",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device=0 if torch.cuda.is_available() else -1
)
print("βœ… Whisper ASR loaded successfully!")
optimize_gpu_memory()
except Exception as e:
print(f"❌ Error loading Whisper: {e}")
return False
# Load Qwen model
print("🧠 Loading Qwen2.5-1.5B for conversation...")
try:
model_name = "Qwen/Qwen2.5-1.5B-Instruct"
qwen_tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=True
)
qwen_model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto" if torch.cuda.is_available() else None,
trust_remote_code=True
)
print("βœ… Qwen loaded successfully!")
optimize_gpu_memory()
except Exception as e:
print(f"❌ Error loading Qwen: {e}")
return False
# FIXED: Load Dia TTS without unsupported parameters
if DIA_AVAILABLE:
try:
print("Attempting to load Dia TTS with FIXED parameters...")
# Clear memory before loading Dia
optimize_gpu_memory()
# FIXED: Remove unsupported parameters
tts_model = Dia.from_pretrained(
"nari-labs/Dia-1.6B",
compute_dtype="float16" if torch.cuda.is_available() else "float32"
# Removed: low_cpu_mem_usage=True (not supported by Dia)
)
# Move to GPU if available
if torch.cuda.is_available():
tts_model = tts_model.cuda()
tts_type = "dia"
print("βœ… Dia TTS loaded successfully!")
optimize_gpu_memory()
return True
except Exception as e:
print(f"⚠️ Dia TTS failed to load: {e}")
tts_model = None
print("⚠️ No TTS available, running in text-only mode")
tts_type = "none"
return True
def detect_emotion_from_text(text):
"""Enhanced emotion detection from text"""
text_lower = text.lower()
emotions = {
'happy': ['happy', 'great', 'awesome', 'wonderful', 'excited', 'laugh', 'amazing',
'fantastic', 'excellent', 'brilliant', 'perfect', 'love', 'joy', 'cheerful'],
'sad': ['sad', 'upset', 'disappointed', 'cry', 'terrible', 'awful', 'depressed',
'miserable', 'heartbroken', 'devastated', 'gloomy', 'melancholy'],
'angry': ['angry', 'mad', 'furious', 'annoyed', 'frustrated', 'hate', 'rage',
'irritated', 'outraged', 'livid', 'enraged'],
'surprised': ['wow', 'incredible', 'surprised', 'unbelievable', 'shocking',
'astonishing', 'remarkable', 'extraordinary', 'mind-blowing'],
'neutral': []
}
emotion_scores = {}
for emotion, keywords in emotions.items():
score = sum(1 for keyword in keywords if keyword in text_lower)
if score > 0:
emotion_scores[emotion] = score
if emotion_scores:
return max(emotion_scores, key=emotion_scores.get)
return 'neutral'
def speech_to_text_with_emotion(audio_input):
"""Enhanced STT with proper audio processing"""
try:
if audio_input is None:
return "", "neutral"
print("🎀 Processing audio input...")
if isinstance(audio_input, tuple):
sample_rate, audio_data = audio_input
print(f"Audio input: sample_rate={sample_rate}, shape={audio_data.shape}")
# Handle different audio formats
if audio_data.dtype == np.int16:
audio_data = audio_data.astype(np.float32) / 32768.0
elif audio_data.dtype == np.int32:
audio_data = audio_data.astype(np.float32) / 2147483648.0
elif audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32)
# Handle stereo audio
if len(audio_data.shape) > 1:
audio_data = audio_data.mean(axis=1)
else:
audio_data = audio_input
sample_rate = 16000
# Validate audio
if len(audio_data) < 1600:
return "Audio too short, please speak for at least 1 second", "neutral"
max_amplitude = np.max(np.abs(audio_data))
if max_amplitude < 0.01:
return "Audio too quiet, please speak louder", "neutral"
# Normalize audio
if max_amplitude > 0:
audio_data = audio_data / max_amplitude * 0.95
# Resample to 16kHz if needed
if sample_rate != 16000:
print(f"Resampling from {sample_rate}Hz to 16000Hz...")
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000)
print("πŸ”„ Running Whisper ASR...")
result = asr_pipe(audio_data, language='en') # Force English
transcription = result['text'].strip()
print(f"Transcription: '{transcription}'")
if not transcription or len(transcription) < 2:
return "No clear speech detected, please try speaking more clearly", "neutral"
emotion = detect_emotion_from_text(transcription)
print(f"Detected emotion: {emotion}")
return transcription, emotion
except Exception as e:
print(f"❌ Error in STT: {e}")
return "Sorry, I couldn't understand that. Please try again.", "neutral"
def generate_contextual_response(user_input, emotion, conversation_manager):
"""Enhanced response generation with memory optimization"""
try:
optimize_gpu_memory()
context = conversation_manager.get_context()
emotional_prompts = {
"happy": "Respond with genuine enthusiasm and joy. Use positive language and show excitement.",
"sad": "Respond with empathy and comfort. Be gentle and understanding.",
"angry": "Respond calmly and try to help. Be patient and de-escalate.",
"surprised": "Share in their surprise and show curiosity. Be engaging.",
"neutral": "Respond naturally and conversationally. Be helpful and friendly."
}
system_prompt = f"""You are Maya, a friendly AI assistant with emotional intelligence.
{emotional_prompts.get(emotion, emotional_prompts['neutral'])}
Previous context: {context}
User emotion: {emotion}
Guidelines:
- Keep responses very concise (1 sentence maximum)
- Be natural and conversational
- Show empathy and understanding
- Provide helpful responses
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_input}
]
text = qwen_tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
model_inputs = qwen_tokenizer([text], return_tensors="pt")
if torch.cuda.is_available():
model_inputs = model_inputs.to(qwen_model.device)
with torch.no_grad():
generated_ids = qwen_model.generate(
model_inputs.input_ids,
max_new_tokens=50,
do_sample=True,
temperature=0.7,
top_p=0.9,
repetition_penalty=1.1,
pad_token_id=qwen_tokenizer.eos_token_id,
attention_mask=model_inputs.attention_mask
)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = qwen_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
response = response.strip()
if response.startswith("Maya:"):
response = response[5:].strip()
optimize_gpu_memory()
return response
except Exception as e:
print(f"Error in response generation: {e}")
return "I'm sorry, I'm having trouble processing that right now."
def text_to_speech_emotional(text, emotion="neutral"):
"""FIXED TTS with proper Dia configuration"""
try:
if tts_model is None:
print(f"πŸ”Š Maya says ({emotion}): {text}")
return None
optimize_gpu_memory()
if tts_type == "dia":
# Simplified text processing for Dia
enhanced_text = f"[S1] {text}"
# Limit text length to prevent memory issues
if len(enhanced_text) > 200:
enhanced_text = enhanced_text[:200] + "..."
print(f"Generating Dia TTS for: {enhanced_text}")
try:
with torch.no_grad():
audio_output = tts_model.generate(
enhanced_text,
use_torch_compile=False,
verbose=False
)
# Enhanced audio processing
if isinstance(audio_output, torch.Tensor):
audio_output = audio_output.cpu().numpy()
# Ensure proper audio format
if len(audio_output.shape) > 1:
audio_output = audio_output.squeeze()
# Conservative normalization
if len(audio_output) > 0:
# Remove DC offset
audio_output = audio_output - np.mean(audio_output)
# Gentle normalization
max_val = np.max(np.abs(audio_output))
if max_val > 0:
audio_output = audio_output / max_val * 0.8
# Ensure correct data type
audio_output = audio_output.astype(np.float32)
# Validate audio output
if np.any(np.isnan(audio_output)) or np.any(np.isinf(audio_output)):
print("❌ Audio contains NaN or Inf values")
return None
print(f"βœ… Generated audio: shape={audio_output.shape}, dtype={audio_output.dtype}, range=[{audio_output.min():.3f}, {audio_output.max():.3f}]")
optimize_gpu_memory()
return (44100, audio_output)
except Exception as e:
print(f"❌ Error in Dia generation: {e}")
optimize_gpu_memory()
return None
else:
print(f"πŸ”Š Maya says ({emotion}): {text}")
return None
except Exception as e:
print(f"❌ Error in TTS: {e}")
optimize_gpu_memory()
print(f"πŸ”Š Maya says ({emotion}): {text}")
return None
# Initialize conversation manager
conv_manager = ConversationManager()
def start_call():
"""Initialize call and return greeting"""
conv_manager.clear()
optimize_gpu_memory()
greeting_text = "Hello! I'm Maya. How can I help you today?"
greeting_audio = text_to_speech_emotional(greeting_text, "happy")
tts_status = f"Using {tts_type.upper()} TTS" if tts_type != "none" else "Text-only mode"
return greeting_audio, greeting_text, f"πŸ“ž Call started! Maya is ready. {tts_status}"
def process_conversation(audio_input):
"""Main conversation processing pipeline"""
if audio_input is None:
return None, "Please record some audio first.", "", "❌ No audio input received."
try:
print("πŸ”„ Processing conversation...")
optimize_gpu_memory()
# STT + Emotion Detection
user_text, emotion = speech_to_text_with_emotion(audio_input)
# Check for STT errors
error_phrases = ["audio too short", "audio too quiet", "no clear speech", "sorry", "couldn't understand"]
if any(phrase in user_text.lower() for phrase in error_phrases):
return None, user_text, "", f"❌ STT Issue: {user_text}"
if not user_text or user_text.strip() == "":
return None, "I didn't catch that. Please speak louder and closer to the microphone.", "", "❌ No speech detected."
# Generate response
ai_response = generate_contextual_response(user_text, emotion, conv_manager)
# Convert to speech
response_audio = text_to_speech_emotional(ai_response, emotion)
# Update history
conv_manager.add_exchange(user_text, ai_response, emotion)
# Memory status
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated() / 1e9
status = f"βœ… Success! | Emotion: {emotion} | Exchange: {len(conv_manager.history)}/5 | GPU: {allocated:.1f}GB"
else:
status = f"βœ… Success! | Emotion: {emotion} | Exchange: {len(conv_manager.history)}/5"
return response_audio, ai_response, user_text, status
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
print(error_msg)
optimize_gpu_memory()
return None, "I'm sorry, I encountered an error. Please try again.", "", error_msg
def get_conversation_history():
"""Return conversation history"""
if not conv_manager.history:
return "No conversation history yet. Start a call to begin!"
history_text = "πŸ“‹ **Conversation History:**\n\n"
for i, exchange in enumerate(conv_manager.history, 1):
timestamp = exchange['timestamp'][:19].replace('T', ' ')
history_text += f"**Exchange {i}** ({timestamp}) - Emotion: {exchange['emotion']}\n"
history_text += f"πŸ‘€ **You:** {exchange['user']}\n"
history_text += f"πŸ€– **Maya:** {exchange['ai']}\n\n"
return history_text
def end_call():
"""End call with memory cleanup"""
farewell_text = "Thank you for talking with me! Have a wonderful day!"
farewell_audio = text_to_speech_emotional(farewell_text, "happy")
conv_manager.clear()
optimize_gpu_memory()
return farewell_audio, farewell_text, "πŸ“žβŒ Call ended. Thank you!"
def create_interface():
"""Create Gradio interface"""
with gr.Blocks(
title="Maya AI - Speech-to-Speech Assistant",
theme=gr.themes.Soft()
) as demo:
gr.HTML("""
<div style="text-align: center; padding: 25px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 15px; margin-bottom: 25px;">
<h1 style="color: white; margin: 0; font-size: 2.8em;">πŸŽ™οΈ Maya AI</h1>
<p style="color: white; margin: 15px 0; font-size: 1.3em;">Advanced Speech-to-Speech Conversational AI</p>
<p style="color: #E8E8E8; margin: 0;">Natural β€’ Emotional β€’ Contextual β€’ Intelligent</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.HTML("<h3>πŸ“ž Call Controls</h3>")
start_btn = gr.Button("πŸ“ž Start Call", variant="primary", size="lg")
end_btn = gr.Button("πŸ“žβŒ End Call", variant="secondary", size="lg")
gr.HTML("<h3>🎀 Voice Input</h3>")
audio_input = gr.Audio(
label="Record Your Message (Speak clearly for 2+ seconds)",
sources=["microphone"],
type="numpy"
)
process_btn = gr.Button("🎯 Process Message", variant="primary", size="lg")
status_display = gr.Textbox(
label="πŸ“Š System Status",
interactive=False,
lines=3,
value="πŸš€ Ready! Click 'Start Call' to begin."
)
with gr.Column(scale=2):
gr.HTML("<h3>πŸ”Š Maya's Response</h3>")
response_audio = gr.Audio(
label="Maya's Voice Response",
type="numpy",
interactive=False,
autoplay=True,
show_download_button=True
)
with gr.Row():
with gr.Column():
user_text_display = gr.Textbox(
label="πŸ‘€ What You Said",
interactive=False,
lines=4
)
with gr.Column():
ai_text_display = gr.Textbox(
label="πŸ€– Maya's Response",
interactive=False,
lines=4
)
with gr.Row():
with gr.Column():
gr.HTML("<h3>πŸ“‹ Conversation History</h3>")
history_btn = gr.Button("πŸ“‹ Show History", variant="secondary")
history_display = gr.Markdown("No conversation history yet.")
# Event handlers
start_btn.click(
fn=start_call,
outputs=[response_audio, ai_text_display, status_display]
)
process_btn.click(
fn=process_conversation,
inputs=[audio_input],
outputs=[response_audio, ai_text_display, user_text_display, status_display]
)
end_btn.click(
fn=end_call,
outputs=[response_audio, ai_text_display, status_display]
)
history_btn.click(
fn=get_conversation_history,
outputs=[history_display]
)
# Instructions
gr.HTML("""
<div style="margin-top: 30px; padding: 25px; background: #f8f9fa; border-radius: 15px;">
<h3>πŸ’‘ How to Use Maya AI:</h3>
<ol>
<li><strong>Start Call:</strong> Click "πŸ“ž Start Call" - Maya will greet you</li>
<li><strong>Record:</strong> Speak clearly for at least 2 seconds</li>
<li><strong>Process:</strong> Click "🎯 Process Message"</li>
<li><strong>Listen:</strong> Maya will respond with natural speech</li>
<li><strong>Continue:</strong> Keep chatting (up to 5 exchanges)</li>
<li><strong>End:</strong> Click "πŸ“žβŒ End Call" when done</li>
</ol>
<div style="margin-top: 20px; padding: 15px; background: #d1ecf1; border-radius: 8px;">
<p><strong>πŸ”§ Fixed Issues:</strong></p>
<ul>
<li>βœ… Pydantic version pinned to 2.10.6 (fixes Gradio crash)</li>
<li>βœ… Dia TTS loading parameters corrected</li>
<li>βœ… Memory optimization for T4 GPU</li>
<li>βœ… Audio processing enhanced</li>
</ul>
</div>
</div>
""")
return demo
if __name__ == "__main__":
print("πŸš€ Initializing Maya AI System...")
check_system_info()
if load_models():
print("βœ… All models loaded successfully!")
print(f"πŸŽ™οΈ TTS Mode: {tts_type.upper()}")
print("🌟 Launching Maya AI Interface...")
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_error=True
)
else:
print("❌ Failed to load models.")