|
import gradio as gr |
|
import torch |
|
import numpy as np |
|
import librosa |
|
import soundfile as sf |
|
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM |
|
import warnings |
|
import json |
|
import time |
|
from datetime import datetime |
|
import os |
|
import sys |
|
import gc |
|
|
|
|
|
try: |
|
from dia.model import Dia |
|
DIA_AVAILABLE = True |
|
print("β
Dia TTS library imported successfully") |
|
except ImportError as e: |
|
print(f"β οΈ Dia TTS not available: {e}") |
|
DIA_AVAILABLE = False |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
asr_pipe = None |
|
qwen_model = None |
|
qwen_tokenizer = None |
|
tts_model = None |
|
tts_type = None |
|
|
|
class ConversationManager: |
|
def __init__(self, max_exchanges=5): |
|
self.history = [] |
|
self.max_exchanges = max_exchanges |
|
self.current_emotion = "neutral" |
|
|
|
def add_exchange(self, user_input, ai_response, emotion="neutral"): |
|
self.history.append({ |
|
"timestamp": datetime.now().isoformat(), |
|
"user": user_input, |
|
"ai": ai_response, |
|
"emotion": emotion |
|
}) |
|
|
|
if len(self.history) > self.max_exchanges: |
|
self.history = self.history[-self.max_exchanges:] |
|
|
|
def get_context(self): |
|
context = "" |
|
for exchange in self.history[-3:]: |
|
context += f"User: {exchange['user']}\nAI: {exchange['ai']}\n" |
|
return context |
|
|
|
def clear(self): |
|
self.history = [] |
|
self.current_emotion = "neutral" |
|
|
|
def optimize_gpu_memory(): |
|
"""Optimize GPU memory usage""" |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
torch.cuda.synchronize() |
|
gc.collect() |
|
|
|
def check_system_info(): |
|
"""Check system capabilities""" |
|
print("π System Information:") |
|
print(f"Python: {sys.version}") |
|
print(f"PyTorch: {torch.__version__}") |
|
|
|
if torch.cuda.is_available(): |
|
print(f"β
CUDA: {torch.cuda.get_device_name()}") |
|
print(f"πΎ GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") |
|
print(f"π₯ CUDA Version: {torch.version.cuda}") |
|
|
|
|
|
allocated = torch.cuda.memory_allocated() / 1e9 |
|
cached = torch.cuda.memory_reserved() / 1e9 |
|
print(f"π Current GPU Usage: {allocated:.1f}GB allocated, {cached:.1f}GB cached") |
|
else: |
|
print("β οΈ CUDA not available, using CPU") |
|
|
|
def load_models(): |
|
"""Load all models with FIXED Dia loading""" |
|
global asr_pipe, qwen_model, qwen_tokenizer, tts_model, tts_type |
|
|
|
print("π Loading Maya AI models...") |
|
optimize_gpu_memory() |
|
|
|
|
|
print("π€ Loading Whisper for ASR...") |
|
try: |
|
asr_pipe = pipeline( |
|
"automatic-speech-recognition", |
|
model="openai/whisper-base", |
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
device=0 if torch.cuda.is_available() else -1 |
|
) |
|
print("β
Whisper ASR loaded successfully!") |
|
optimize_gpu_memory() |
|
except Exception as e: |
|
print(f"β Error loading Whisper: {e}") |
|
return False |
|
|
|
|
|
print("π§ Loading Qwen2.5-1.5B for conversation...") |
|
try: |
|
model_name = "Qwen/Qwen2.5-1.5B-Instruct" |
|
qwen_tokenizer = AutoTokenizer.from_pretrained( |
|
model_name, |
|
trust_remote_code=True |
|
) |
|
qwen_model = AutoModelForCausalLM.from_pretrained( |
|
model_name, |
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
device_map="auto" if torch.cuda.is_available() else None, |
|
trust_remote_code=True |
|
) |
|
print("β
Qwen loaded successfully!") |
|
optimize_gpu_memory() |
|
except Exception as e: |
|
print(f"β Error loading Qwen: {e}") |
|
return False |
|
|
|
|
|
if DIA_AVAILABLE: |
|
try: |
|
print("Attempting to load Dia TTS with FIXED parameters...") |
|
|
|
|
|
optimize_gpu_memory() |
|
|
|
|
|
tts_model = Dia.from_pretrained( |
|
"nari-labs/Dia-1.6B", |
|
compute_dtype="float16" if torch.cuda.is_available() else "float32" |
|
|
|
) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
tts_model = tts_model.cuda() |
|
|
|
tts_type = "dia" |
|
print("β
Dia TTS loaded successfully!") |
|
optimize_gpu_memory() |
|
return True |
|
except Exception as e: |
|
print(f"β οΈ Dia TTS failed to load: {e}") |
|
tts_model = None |
|
|
|
print("β οΈ No TTS available, running in text-only mode") |
|
tts_type = "none" |
|
return True |
|
|
|
def detect_emotion_from_text(text): |
|
"""Enhanced emotion detection from text""" |
|
text_lower = text.lower() |
|
|
|
emotions = { |
|
'happy': ['happy', 'great', 'awesome', 'wonderful', 'excited', 'laugh', 'amazing', |
|
'fantastic', 'excellent', 'brilliant', 'perfect', 'love', 'joy', 'cheerful'], |
|
'sad': ['sad', 'upset', 'disappointed', 'cry', 'terrible', 'awful', 'depressed', |
|
'miserable', 'heartbroken', 'devastated', 'gloomy', 'melancholy'], |
|
'angry': ['angry', 'mad', 'furious', 'annoyed', 'frustrated', 'hate', 'rage', |
|
'irritated', 'outraged', 'livid', 'enraged'], |
|
'surprised': ['wow', 'incredible', 'surprised', 'unbelievable', 'shocking', |
|
'astonishing', 'remarkable', 'extraordinary', 'mind-blowing'], |
|
'neutral': [] |
|
} |
|
|
|
emotion_scores = {} |
|
for emotion, keywords in emotions.items(): |
|
score = sum(1 for keyword in keywords if keyword in text_lower) |
|
if score > 0: |
|
emotion_scores[emotion] = score |
|
|
|
if emotion_scores: |
|
return max(emotion_scores, key=emotion_scores.get) |
|
return 'neutral' |
|
|
|
def speech_to_text_with_emotion(audio_input): |
|
"""Enhanced STT with proper audio processing""" |
|
try: |
|
if audio_input is None: |
|
return "", "neutral" |
|
|
|
print("π€ Processing audio input...") |
|
|
|
if isinstance(audio_input, tuple): |
|
sample_rate, audio_data = audio_input |
|
print(f"Audio input: sample_rate={sample_rate}, shape={audio_data.shape}") |
|
|
|
|
|
if audio_data.dtype == np.int16: |
|
audio_data = audio_data.astype(np.float32) / 32768.0 |
|
elif audio_data.dtype == np.int32: |
|
audio_data = audio_data.astype(np.float32) / 2147483648.0 |
|
elif audio_data.dtype != np.float32: |
|
audio_data = audio_data.astype(np.float32) |
|
|
|
|
|
if len(audio_data.shape) > 1: |
|
audio_data = audio_data.mean(axis=1) |
|
else: |
|
audio_data = audio_input |
|
sample_rate = 16000 |
|
|
|
|
|
if len(audio_data) < 1600: |
|
return "Audio too short, please speak for at least 1 second", "neutral" |
|
|
|
max_amplitude = np.max(np.abs(audio_data)) |
|
if max_amplitude < 0.01: |
|
return "Audio too quiet, please speak louder", "neutral" |
|
|
|
|
|
if max_amplitude > 0: |
|
audio_data = audio_data / max_amplitude * 0.95 |
|
|
|
|
|
if sample_rate != 16000: |
|
print(f"Resampling from {sample_rate}Hz to 16000Hz...") |
|
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000) |
|
|
|
print("π Running Whisper ASR...") |
|
result = asr_pipe(audio_data, language='en') |
|
|
|
transcription = result['text'].strip() |
|
print(f"Transcription: '{transcription}'") |
|
|
|
if not transcription or len(transcription) < 2: |
|
return "No clear speech detected, please try speaking more clearly", "neutral" |
|
|
|
emotion = detect_emotion_from_text(transcription) |
|
print(f"Detected emotion: {emotion}") |
|
|
|
return transcription, emotion |
|
|
|
except Exception as e: |
|
print(f"β Error in STT: {e}") |
|
return "Sorry, I couldn't understand that. Please try again.", "neutral" |
|
|
|
def generate_contextual_response(user_input, emotion, conversation_manager): |
|
"""Enhanced response generation with memory optimization""" |
|
try: |
|
optimize_gpu_memory() |
|
|
|
context = conversation_manager.get_context() |
|
|
|
emotional_prompts = { |
|
"happy": "Respond with genuine enthusiasm and joy. Use positive language and show excitement.", |
|
"sad": "Respond with empathy and comfort. Be gentle and understanding.", |
|
"angry": "Respond calmly and try to help. Be patient and de-escalate.", |
|
"surprised": "Share in their surprise and show curiosity. Be engaging.", |
|
"neutral": "Respond naturally and conversationally. Be helpful and friendly." |
|
} |
|
|
|
system_prompt = f"""You are Maya, a friendly AI assistant with emotional intelligence. |
|
|
|
{emotional_prompts.get(emotion, emotional_prompts['neutral'])} |
|
|
|
Previous context: {context} |
|
User emotion: {emotion} |
|
|
|
Guidelines: |
|
- Keep responses very concise (1 sentence maximum) |
|
- Be natural and conversational |
|
- Show empathy and understanding |
|
- Provide helpful responses |
|
""" |
|
|
|
messages = [ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": user_input} |
|
] |
|
|
|
text = qwen_tokenizer.apply_chat_template( |
|
messages, tokenize=False, add_generation_prompt=True |
|
) |
|
|
|
model_inputs = qwen_tokenizer([text], return_tensors="pt") |
|
if torch.cuda.is_available(): |
|
model_inputs = model_inputs.to(qwen_model.device) |
|
|
|
with torch.no_grad(): |
|
generated_ids = qwen_model.generate( |
|
model_inputs.input_ids, |
|
max_new_tokens=50, |
|
do_sample=True, |
|
temperature=0.7, |
|
top_p=0.9, |
|
repetition_penalty=1.1, |
|
pad_token_id=qwen_tokenizer.eos_token_id, |
|
attention_mask=model_inputs.attention_mask |
|
) |
|
|
|
generated_ids = [ |
|
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) |
|
] |
|
|
|
response = qwen_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
response = response.strip() |
|
|
|
if response.startswith("Maya:"): |
|
response = response[5:].strip() |
|
|
|
optimize_gpu_memory() |
|
|
|
return response |
|
|
|
except Exception as e: |
|
print(f"Error in response generation: {e}") |
|
return "I'm sorry, I'm having trouble processing that right now." |
|
|
|
def text_to_speech_emotional(text, emotion="neutral"): |
|
"""FIXED TTS with proper Dia configuration""" |
|
try: |
|
if tts_model is None: |
|
print(f"π Maya says ({emotion}): {text}") |
|
return None |
|
|
|
optimize_gpu_memory() |
|
|
|
if tts_type == "dia": |
|
|
|
enhanced_text = f"[S1] {text}" |
|
|
|
|
|
if len(enhanced_text) > 200: |
|
enhanced_text = enhanced_text[:200] + "..." |
|
|
|
print(f"Generating Dia TTS for: {enhanced_text}") |
|
|
|
try: |
|
with torch.no_grad(): |
|
audio_output = tts_model.generate( |
|
enhanced_text, |
|
use_torch_compile=False, |
|
verbose=False |
|
) |
|
|
|
|
|
if isinstance(audio_output, torch.Tensor): |
|
audio_output = audio_output.cpu().numpy() |
|
|
|
|
|
if len(audio_output.shape) > 1: |
|
audio_output = audio_output.squeeze() |
|
|
|
|
|
if len(audio_output) > 0: |
|
|
|
audio_output = audio_output - np.mean(audio_output) |
|
|
|
|
|
max_val = np.max(np.abs(audio_output)) |
|
if max_val > 0: |
|
audio_output = audio_output / max_val * 0.8 |
|
|
|
|
|
audio_output = audio_output.astype(np.float32) |
|
|
|
|
|
if np.any(np.isnan(audio_output)) or np.any(np.isinf(audio_output)): |
|
print("β Audio contains NaN or Inf values") |
|
return None |
|
|
|
print(f"β
Generated audio: shape={audio_output.shape}, dtype={audio_output.dtype}, range=[{audio_output.min():.3f}, {audio_output.max():.3f}]") |
|
|
|
optimize_gpu_memory() |
|
|
|
return (44100, audio_output) |
|
|
|
except Exception as e: |
|
print(f"β Error in Dia generation: {e}") |
|
optimize_gpu_memory() |
|
return None |
|
|
|
else: |
|
print(f"π Maya says ({emotion}): {text}") |
|
return None |
|
|
|
except Exception as e: |
|
print(f"β Error in TTS: {e}") |
|
optimize_gpu_memory() |
|
print(f"π Maya says ({emotion}): {text}") |
|
return None |
|
|
|
|
|
conv_manager = ConversationManager() |
|
|
|
def start_call(): |
|
"""Initialize call and return greeting""" |
|
conv_manager.clear() |
|
optimize_gpu_memory() |
|
|
|
greeting_text = "Hello! I'm Maya. How can I help you today?" |
|
greeting_audio = text_to_speech_emotional(greeting_text, "happy") |
|
|
|
tts_status = f"Using {tts_type.upper()} TTS" if tts_type != "none" else "Text-only mode" |
|
return greeting_audio, greeting_text, f"π Call started! Maya is ready. {tts_status}" |
|
|
|
def process_conversation(audio_input): |
|
"""Main conversation processing pipeline""" |
|
if audio_input is None: |
|
return None, "Please record some audio first.", "", "β No audio input received." |
|
|
|
try: |
|
print("π Processing conversation...") |
|
optimize_gpu_memory() |
|
|
|
|
|
user_text, emotion = speech_to_text_with_emotion(audio_input) |
|
|
|
|
|
error_phrases = ["audio too short", "audio too quiet", "no clear speech", "sorry", "couldn't understand"] |
|
if any(phrase in user_text.lower() for phrase in error_phrases): |
|
return None, user_text, "", f"β STT Issue: {user_text}" |
|
|
|
if not user_text or user_text.strip() == "": |
|
return None, "I didn't catch that. Please speak louder and closer to the microphone.", "", "β No speech detected." |
|
|
|
|
|
ai_response = generate_contextual_response(user_text, emotion, conv_manager) |
|
|
|
|
|
response_audio = text_to_speech_emotional(ai_response, emotion) |
|
|
|
|
|
conv_manager.add_exchange(user_text, ai_response, emotion) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
allocated = torch.cuda.memory_allocated() / 1e9 |
|
status = f"β
Success! | Emotion: {emotion} | Exchange: {len(conv_manager.history)}/5 | GPU: {allocated:.1f}GB" |
|
else: |
|
status = f"β
Success! | Emotion: {emotion} | Exchange: {len(conv_manager.history)}/5" |
|
|
|
return response_audio, ai_response, user_text, status |
|
|
|
except Exception as e: |
|
error_msg = f"β Error: {str(e)}" |
|
print(error_msg) |
|
optimize_gpu_memory() |
|
return None, "I'm sorry, I encountered an error. Please try again.", "", error_msg |
|
|
|
def get_conversation_history(): |
|
"""Return conversation history""" |
|
if not conv_manager.history: |
|
return "No conversation history yet. Start a call to begin!" |
|
|
|
history_text = "π **Conversation History:**\n\n" |
|
for i, exchange in enumerate(conv_manager.history, 1): |
|
timestamp = exchange['timestamp'][:19].replace('T', ' ') |
|
history_text += f"**Exchange {i}** ({timestamp}) - Emotion: {exchange['emotion']}\n" |
|
history_text += f"π€ **You:** {exchange['user']}\n" |
|
history_text += f"π€ **Maya:** {exchange['ai']}\n\n" |
|
|
|
return history_text |
|
|
|
def end_call(): |
|
"""End call with memory cleanup""" |
|
farewell_text = "Thank you for talking with me! Have a wonderful day!" |
|
farewell_audio = text_to_speech_emotional(farewell_text, "happy") |
|
conv_manager.clear() |
|
optimize_gpu_memory() |
|
|
|
return farewell_audio, farewell_text, "πβ Call ended. Thank you!" |
|
|
|
def create_interface(): |
|
"""Create Gradio interface""" |
|
with gr.Blocks( |
|
title="Maya AI - Speech-to-Speech Assistant", |
|
theme=gr.themes.Soft() |
|
) as demo: |
|
|
|
gr.HTML(""" |
|
<div style="text-align: center; padding: 25px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 15px; margin-bottom: 25px;"> |
|
<h1 style="color: white; margin: 0; font-size: 2.8em;">ποΈ Maya AI</h1> |
|
<p style="color: white; margin: 15px 0; font-size: 1.3em;">Advanced Speech-to-Speech Conversational AI</p> |
|
<p style="color: #E8E8E8; margin: 0;">Natural β’ Emotional β’ Contextual β’ Intelligent</p> |
|
</div> |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.HTML("<h3>π Call Controls</h3>") |
|
start_btn = gr.Button("π Start Call", variant="primary", size="lg") |
|
end_btn = gr.Button("πβ End Call", variant="secondary", size="lg") |
|
|
|
gr.HTML("<h3>π€ Voice Input</h3>") |
|
audio_input = gr.Audio( |
|
label="Record Your Message (Speak clearly for 2+ seconds)", |
|
sources=["microphone"], |
|
type="numpy" |
|
) |
|
|
|
process_btn = gr.Button("π― Process Message", variant="primary", size="lg") |
|
|
|
status_display = gr.Textbox( |
|
label="π System Status", |
|
interactive=False, |
|
lines=3, |
|
value="π Ready! Click 'Start Call' to begin." |
|
) |
|
|
|
with gr.Column(scale=2): |
|
gr.HTML("<h3>π Maya's Response</h3>") |
|
response_audio = gr.Audio( |
|
label="Maya's Voice Response", |
|
type="numpy", |
|
interactive=False, |
|
autoplay=True, |
|
show_download_button=True |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
user_text_display = gr.Textbox( |
|
label="π€ What You Said", |
|
interactive=False, |
|
lines=4 |
|
) |
|
|
|
with gr.Column(): |
|
ai_text_display = gr.Textbox( |
|
label="π€ Maya's Response", |
|
interactive=False, |
|
lines=4 |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.HTML("<h3>π Conversation History</h3>") |
|
history_btn = gr.Button("π Show History", variant="secondary") |
|
history_display = gr.Markdown("No conversation history yet.") |
|
|
|
|
|
start_btn.click( |
|
fn=start_call, |
|
outputs=[response_audio, ai_text_display, status_display] |
|
) |
|
|
|
process_btn.click( |
|
fn=process_conversation, |
|
inputs=[audio_input], |
|
outputs=[response_audio, ai_text_display, user_text_display, status_display] |
|
) |
|
|
|
end_btn.click( |
|
fn=end_call, |
|
outputs=[response_audio, ai_text_display, status_display] |
|
) |
|
|
|
history_btn.click( |
|
fn=get_conversation_history, |
|
outputs=[history_display] |
|
) |
|
|
|
|
|
gr.HTML(""" |
|
<div style="margin-top: 30px; padding: 25px; background: #f8f9fa; border-radius: 15px;"> |
|
<h3>π‘ How to Use Maya AI:</h3> |
|
<ol> |
|
<li><strong>Start Call:</strong> Click "π Start Call" - Maya will greet you</li> |
|
<li><strong>Record:</strong> Speak clearly for at least 2 seconds</li> |
|
<li><strong>Process:</strong> Click "π― Process Message"</li> |
|
<li><strong>Listen:</strong> Maya will respond with natural speech</li> |
|
<li><strong>Continue:</strong> Keep chatting (up to 5 exchanges)</li> |
|
<li><strong>End:</strong> Click "πβ End Call" when done</li> |
|
</ol> |
|
|
|
<div style="margin-top: 20px; padding: 15px; background: #d1ecf1; border-radius: 8px;"> |
|
<p><strong>π§ Fixed Issues:</strong></p> |
|
<ul> |
|
<li>β
Pydantic version pinned to 2.10.6 (fixes Gradio crash)</li> |
|
<li>β
Dia TTS loading parameters corrected</li> |
|
<li>β
Memory optimization for T4 GPU</li> |
|
<li>β
Audio processing enhanced</li> |
|
</ul> |
|
</div> |
|
</div> |
|
""") |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
print("π Initializing Maya AI System...") |
|
|
|
check_system_info() |
|
|
|
if load_models(): |
|
print("β
All models loaded successfully!") |
|
print(f"ποΈ TTS Mode: {tts_type.upper()}") |
|
print("π Launching Maya AI Interface...") |
|
|
|
demo = create_interface() |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=True, |
|
show_error=True |
|
) |
|
else: |
|
print("β Failed to load models.") |
|
|