Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| #!/usr/bin/env python3 | |
| """ | |
| MiloMusic - Hugging Face Spaces Version | |
| AI-powered music generation platform optimized for cloud deployment with high-performance configuration. | |
| """ | |
| import multiprocessing | |
| import os | |
| import sys | |
| import subprocess | |
| import tempfile | |
| import gradio as gr | |
| import soundfile as sf | |
| from dataclasses import dataclass, field | |
| from typing import Any | |
| import xxhash | |
| import numpy as np | |
| import spaces | |
| import groq | |
| # Import environment setup for Spaces | |
| def setup_spaces_environment(): | |
| """Setup environment variables and paths for Hugging Face Spaces""" | |
| # Set HuggingFace cache directory | |
| os.environ["HF_HOME"] = "/tmp/hf_cache" | |
| os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers_cache" | |
| os.environ["HF_HUB_CACHE"] = "/tmp/hf_hub_cache" | |
| # 1.PyTorch CUDA memory optimization 2.用PyTorch的可扩展内存段分配, 提高GPU内存使用效率, 减少内存碎片问题 | |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" | |
| # Set temp directory for audio files | |
| os.environ["TMPDIR"] = "/tmp" | |
| print("🚀 Environment setup complete for Spaces") | |
| # Install flash-attn if not already installed | |
| def install_flash_attn() -> bool: | |
| """Install flash-attn from source with proper compilation flags""" | |
| try: | |
| import flash_attn | |
| print("✅ flash-attn already installed") | |
| return True | |
| except ImportError: | |
| print("📦 Installing flash-attn from source...") | |
| try: | |
| # Install with optimized settings for Spaces | |
| cmd = [ | |
| sys.executable, "-m", "pip", "install", | |
| "--no-build-isolation", | |
| "--no-cache-dir", | |
| "flash-attn", | |
| "--verbose" | |
| ] | |
| # Use more parallel jobs for faster compilation in Spaces | |
| env = os.environ.copy() | |
| max_jobs = min(4, multiprocessing.cpu_count()) # Utilize more CPU cores | |
| env["MAX_JOBS"] = str(max_jobs) | |
| env["NVCC_PREPEND_FLAGS"] = "-ccbin /usr/bin/gcc" | |
| result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=1800) # 30 min timeout | |
| if result.returncode == 0: | |
| print("✅ flash-attn installed successfully") | |
| return True | |
| else: | |
| print(f"❌ flash-attn installation failed: {result.stderr}") | |
| return False | |
| except subprocess.TimeoutExpired: | |
| print("⏰ flash-attn installation timed out") | |
| return False | |
| except Exception as e: | |
| print(f"❌ Error installing flash-attn: {e}") | |
| return False | |
| # Setup environment first | |
| setup_spaces_environment() | |
| # Download required models for YuE inference | |
| def download_required_models(): | |
| """Download required model files at startup""" | |
| try: | |
| from download_models import ensure_model_availability | |
| print("🚀 Checking and downloading required models...") | |
| success = ensure_model_availability() | |
| if success: | |
| print("✅ Model setup completed successfully") | |
| else: | |
| print("⚠️ Some models may be missing - continuing with available resources") | |
| return success | |
| except ImportError as e: | |
| print(f"⚠️ Model download script not found: {e}") | |
| return False | |
| except Exception as e: | |
| print(f"❌ Error during model download: {e}") | |
| return False | |
| # Download models before other setup | |
| models_ready = download_required_models() | |
| # Install flash-attn if needed | |
| flash_attn_available = install_flash_attn() | |
| # Now import the rest of the dependencies | |
| # Add project root to Python path for imports | |
| project_root = os.path.dirname(os.path.abspath(__file__)) | |
| if project_root not in sys.path: | |
| sys.path.insert(0, project_root) | |
| from tools.groq_client import client as groq_client | |
| from openai import OpenAI | |
| from tools.generate_lyrics import generate_structured_lyrics, format_lyrics | |
| # Import CUDA info after flash-attn setup | |
| import torch | |
| if torch.cuda.is_available(): | |
| print(f"🎮 GPU: {torch.cuda.get_device_name(0)}") | |
| print(f"💾 VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f}GB") | |
| else: | |
| print("⚠️ No CUDA GPU detected") | |
| class AppState: | |
| """ | |
| Maintains the application state throughout user interactions. | |
| """ | |
| conversation: list = field(default_factory=list) | |
| stopped: bool = False | |
| model_outs: Any = None | |
| lyrics: str = "" | |
| genre: str = "pop" | |
| mood: str = "upbeat" | |
| theme: str = "love" | |
| def validate_api_keys(): | |
| """Validate required API keys for Spaces deployment""" | |
| required_keys = ["GROQ_API_KEY", "GEMINI_API_KEY"] | |
| missing_keys = [] | |
| for key in required_keys: | |
| if not os.getenv(key): | |
| missing_keys.append(key) | |
| if missing_keys: | |
| print(f"⚠️ Missing API keys: {missing_keys}") | |
| return False | |
| print("✅ All API keys validated") | |
| return True | |
| def validate_file_structure(): | |
| """Validate that required files and directories exist""" | |
| required_paths = [ | |
| "YuE/inference/infer.py", | |
| "YuE/inference/codecmanipulator.py", | |
| "YuE/inference/mmtokenizer.py", | |
| "tools/generate_lyrics.py", | |
| "tools/groq_client.py", | |
| "schemas/lyrics.py" # Required for lyrics structure models | |
| ] | |
| missing_files = [] | |
| for path in required_paths: | |
| if not os.path.exists(path): | |
| missing_files.append(path) | |
| if missing_files: | |
| print(f"⚠️ Missing required files: {missing_files}") | |
| return False | |
| print("✅ All required files found") | |
| return True | |
| # H200 on ZeroGPU is free for 25mins inference per day, compatibility of L40s | |
| def generate_music_spaces(lyrics: str, genre: str, mood: str, theme: str, progress=gr.Progress()) -> str: | |
| """ | |
| Generate music using YuE model with high-performance Spaces configuration | |
| """ | |
| if not lyrics.strip(): | |
| return "Please provide lyrics to generate music." | |
| try: | |
| progress(0.1, desc="Preparing lyrics...") | |
| # Use lyrics directly (already formatted from chat interface) | |
| formatted_lyrics = lyrics | |
| # Create temporary files | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as genre_file: | |
| # Add specific instruments based on genre for better itrack generation | |
| if genre == "pop": | |
| instruments = "piano electric guitar bass drums synthesizer" | |
| elif genre == "rock": | |
| instruments = "electric guitar bass drums guitar" | |
| elif genre == "jazz": | |
| instruments = "piano bass drums saxophone" | |
| elif genre == "classical": | |
| instruments = "piano violin orchestra" | |
| elif genre == "electronic": | |
| instruments = "synthesizer drum machine bass synth" | |
| elif genre == "folk": | |
| instruments = "acoustic guitar piano" | |
| elif genre == "r&b": | |
| instruments = "piano bass drums electric guitar" | |
| elif genre == "country": | |
| instruments = "acoustic guitar bass drums" | |
| elif genre == "hip-hop": | |
| instruments = "drum machine bass synthesizer" | |
| else: | |
| instruments = "piano electric guitar bass drums" # default | |
| genre_file.write(f"{genre} {instruments} {mood} {theme} female airy vocal bright") | |
| genre_file_path = genre_file.name | |
| # Convert lyrics format for YuE compatibility | |
| # YuE expects [VERSE], [CHORUS] format, but our AI generates **VERSE**, **CHORUS** | |
| import re | |
| # Extract only the actual lyrics content, removing AI commentary | |
| formatted_lyrics_for_yue = formatted_lyrics | |
| # Convert **VERSE 1** to [verse], **CHORUS** to [chorus], etc. (lowercase as expected by YuE) | |
| formatted_lyrics_for_yue = re.sub(r'\*\*(VERSE\s*\d*)\*\*', r'[verse]', formatted_lyrics_for_yue) | |
| formatted_lyrics_for_yue = re.sub(r'\*\*(CHORUS)\*\*', r'[chorus]', formatted_lyrics_for_yue) | |
| formatted_lyrics_for_yue = re.sub(r'\*\*(BRIDGE)\*\*', r'[bridge]', formatted_lyrics_for_yue) | |
| formatted_lyrics_for_yue = re.sub(r'\*\*(OUTRO)\*\*', r'[outro]', formatted_lyrics_for_yue) | |
| # Remove AI commentary (lines that don't contain actual lyrics) | |
| lines = formatted_lyrics_for_yue.split('\n') | |
| clean_lines = [] | |
| in_song = False | |
| for line in lines: | |
| line = line.strip() | |
| # Start collecting from first section marker (lowercase as expected by YuE) | |
| if re.match(r'\[(verse|chorus|bridge|outro)', line, re.IGNORECASE): | |
| in_song = True | |
| # Stop at AI commentary | |
| if in_song and line and not line.startswith('[') and any(phrase in line.lower() for phrase in ['how do you like', 'would you like', 'let me know', 'take a look']): | |
| break | |
| if in_song: | |
| clean_lines.append(line) | |
| # Join with proper section separators as required by YuE (double newlines between sections) | |
| formatted_lyrics_for_yue = '\n'.join(clean_lines).strip() | |
| # Add double newlines between sections for YuE format | |
| formatted_lyrics_for_yue = re.sub(r'\n(\[(?:verse|chorus|bridge|outro)\])', r'\n\n\1', formatted_lyrics_for_yue) | |
| print(f"🐛 DEBUG - Original lyrics length: {len(formatted_lyrics)}") | |
| print(f"🐛 DEBUG - Converted lyrics for YuE: '{formatted_lyrics_for_yue}'") | |
| print(f"🐛 DEBUG - Converted lyrics length: {len(formatted_lyrics_for_yue)}") | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as lyrics_file: | |
| lyrics_file.write(formatted_lyrics_for_yue) | |
| lyrics_file_path = lyrics_file.name | |
| progress(0.2, desc="Setting up generation...") | |
| # Generate music with high-performance Spaces configuration | |
| output_dir = tempfile.mkdtemp() | |
| # High-performance command based on Spaces GPU resources | |
| # In Spaces, working directory is /app | |
| infer_script_path = os.path.join(os.getcwd(), "YuE", "inference", "infer.py") | |
| cmd = [ | |
| sys.executable, | |
| infer_script_path, | |
| "--cuda_idx", "0", | |
| "--stage1_model", "m-a-p/YuE-s1-7B-anneal-en-cot", | |
| "--stage2_model", "m-a-p/YuE-s2-1B-general", | |
| "--genre_txt", genre_file_path, | |
| "--lyrics_txt", lyrics_file_path, | |
| "--run_n_segments", "2", # Full segments for better quality | |
| "--stage2_batch_size", "4", # Higher batch size for speed | |
| "--output_dir", output_dir, | |
| "--max_new_tokens", "3000", # Full token count | |
| "--prompt_start_time", "0", | |
| "--prompt_end_time", "30" # Full 30-second clips | |
| ] | |
| # Use flash attention if available, otherwise fallback | |
| if not flash_attn_available: | |
| cmd.append("--sdpa") | |
| # More detailed progress updates | |
| progress(0.1, desc="🚀 Initializing models...") | |
| progress(0.15, desc="📝 Processing lyrics...") | |
| progress(0.2, desc="🎵 Starting Stage 1 (7B model generation)...") | |
| # Extract parameters from cmd for logging | |
| run_n_segments = cmd[cmd.index("--run_n_segments") + 1] if "--run_n_segments" in cmd else "2" | |
| max_new_tokens = cmd[cmd.index("--max_new_tokens") + 1] if "--max_new_tokens" in cmd else "3000" | |
| print("🎵 Starting high-quality music generation...") | |
| print(f"📊 Generation settings: {run_n_segments} segments, {max_new_tokens} tokens, 30s audio") | |
| print(f"⏱️ Estimated time: 10-13 minutes for high-quality generation") | |
| print(f"Working directory: {os.getcwd()}") | |
| print(f"Command: {' '.join(cmd)}") | |
| # Debug: Check decoder files before inference | |
| decoder_base = os.path.join(os.getcwd(), "YuE", "inference", "xcodec_mini_infer", "decoders") | |
| print(f"🔍 Checking decoder files at: {decoder_base}") | |
| if os.path.exists(decoder_base): | |
| decoder_files = os.listdir(decoder_base) | |
| print(f"🔍 Available decoder files: {decoder_files}") | |
| # Check specific decoder files | |
| vocal_decoder = os.path.join(decoder_base, "decoder_131000.pth") | |
| inst_decoder = os.path.join(decoder_base, "decoder_151000.pth") | |
| if os.path.exists(vocal_decoder): | |
| size_mb = os.path.getsize(vocal_decoder) // (1024*1024) | |
| print(f"✅ Vocal decoder: {size_mb}MB") | |
| else: | |
| print("❌ Vocal decoder missing!") | |
| if os.path.exists(inst_decoder): | |
| size_mb = os.path.getsize(inst_decoder) // (1024*1024) | |
| print(f"✅ Instrumental decoder: {size_mb}MB") | |
| else: | |
| print("❌ Instrumental decoder missing!") | |
| else: | |
| print("❌ Decoder directory not found!") | |
| # Change to YuE/inference directory for execution | |
| original_cwd = os.getcwd() | |
| inference_dir = os.path.join(os.getcwd(), "YuE", "inference") | |
| try: | |
| os.chdir(inference_dir) | |
| print(f"Changed to inference directory: {inference_dir}") | |
| cmd[1] = "infer.py" | |
| progress(0.25, desc="🔥 Stage 1: Running 7B parameter model...") | |
| # Start the subprocess | |
| import threading | |
| import time | |
| def parse_output_and_update_progress(process): | |
| """Parse subprocess output in real-time and update progress accordingly""" | |
| stage1_messages = [ | |
| "🧠 Stage 1: Generating musical concepts...", | |
| "🎼 Stage 1: Creating melody patterns...", | |
| "🎹 Stage 1: Composing harmony structure..." | |
| ] | |
| stage2_messages = [ | |
| "⚡ Starting Stage 2: Refining with 1B model...", | |
| "🎵 Stage 2: Adding musical details...", | |
| "🎶 Stage 2: Finalizing composition..." | |
| ] | |
| stage1_progress = [0.3, 0.45, 0.6] | |
| stage2_progress = [0.7, 0.8, 0.85] | |
| current_stage = 1 | |
| stage1_step = 0 | |
| stage2_step = 0 | |
| output_lines = [] | |
| try: | |
| while True: | |
| line = process.stdout.readline() | |
| if not line: | |
| break | |
| line = line.strip() | |
| output_lines.append(line) | |
| print(line) # Still print for debugging | |
| # Check for stage transitions based on actual output | |
| if "Stage 2 inference..." in line: | |
| current_stage = 2 | |
| stage2_step = 0 | |
| progress(0.7, desc=stage2_messages[0]) | |
| print(f"⏳ {stage2_messages[0]}") | |
| elif "Stage 2 DONE" in line: | |
| progress(0.9, desc="🔊 Decoding to audio format...") | |
| print("⏳ 🔊 Decoding to audio format...") | |
| # Update Stage 1 progress periodically | |
| elif current_stage == 1 and stage1_step < len(stage1_messages): | |
| # Update Stage 1 progress every 15 seconds or on specific markers | |
| if stage1_step < len(stage1_progress): | |
| progress(stage1_progress[stage1_step], desc=stage1_messages[stage1_step]) | |
| print(f"⏳ {stage1_messages[stage1_step]}") | |
| stage1_step += 1 | |
| # Update Stage 2 progress periodically | |
| elif current_stage == 2 and stage2_step < len(stage2_messages) - 1: | |
| stage2_step += 1 | |
| if stage2_step < len(stage2_progress): | |
| progress(stage2_progress[stage2_step], desc=stage2_messages[stage2_step]) | |
| print(f"⏳ {stage2_messages[stage2_step]}") | |
| except Exception as e: | |
| print(f"Progress parsing error: {e}") | |
| return '\n'.join(output_lines) | |
| print(f"🚀 Executing command: {' '.join(cmd)}") | |
| # Use Popen for real-time output processing | |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, | |
| text=True, bufsize=1, universal_newlines=True) | |
| # Parse output in real-time | |
| stdout_output = parse_output_and_update_progress(process) | |
| # Wait for process to complete and get return code | |
| return_code = process.wait() | |
| # Create result object similar to subprocess.run | |
| class Result: | |
| def __init__(self, returncode, stdout, stderr=""): | |
| self.returncode = returncode | |
| self.stdout = stdout | |
| self.stderr = stderr | |
| result = Result(return_code, stdout_output) | |
| # Print stdout and stderr for debugging | |
| if result.stdout: | |
| print(f"✅ Command output:\n{result.stdout}") | |
| if result.stderr: | |
| print(f"⚠️ Command stderr:\n{result.stderr}") | |
| print(f"📊 Return code: {result.returncode}") | |
| finally: | |
| os.chdir(original_cwd) | |
| progress(0.95, desc="🎉 Processing completed, finalizing output...") | |
| # Clean up input files | |
| os.unlink(genre_file_path) | |
| os.unlink(lyrics_file_path) | |
| if result.returncode == 0: | |
| # Find generated audio file - prioritize mixed audio from vocoder/mix directory | |
| import glob | |
| final_files = glob.glob(os.path.join(output_dir, "*_mixed.mp3")) | |
| if final_files: | |
| progress(1.0, desc="Finish music generation") | |
| print(f"✅ Found audio file at root: {final_files[0]}") | |
| return final_files[0] | |
| # First look for the final mixed audio in vocoder/mix | |
| mixed_files = glob.glob(os.path.join(output_dir, "vocoder/mix/*_mixed.mp3")) | |
| if mixed_files: | |
| progress(1.0, desc="Music generation complete!") | |
| print(f"✅ Found mixed audio file: {mixed_files[0]}") | |
| return mixed_files[0] | |
| # Fallback to any MP3 file | |
| audio_files = glob.glob(os.path.join(output_dir, "**/*.mp3"), recursive=True) | |
| if audio_files: | |
| progress(1.0, desc="Music generation complete!") | |
| print(f"✅ Found audio file: {audio_files[0]}") | |
| return audio_files[0] # Return path to generated audio | |
| else: | |
| print(f"❌ No audio files found in {output_dir}") | |
| print(f"Directory contents: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory not found'}") | |
| return "Music generation completed but no audio file found." | |
| else: | |
| error_msg = f"Return code: {result.returncode}\n" | |
| if result.stderr: | |
| error_msg += f"Error: {result.stderr[-1000:]}\n" | |
| if result.stdout: | |
| error_msg += f"Output: {result.stdout[-1000:]}" | |
| return f"Music generation failed:\n{error_msg}" | |
| except subprocess.TimeoutExpired: | |
| return "Music generation timed out after 20 minutes. Please try again." | |
| except Exception as e: | |
| return f"Error during music generation: {str(e)}" | |
| def respond(message, state): | |
| """Enhanced response function for conversational lyrics generation""" | |
| try: | |
| # Add user message to conversation | |
| state.conversation.append({"role": "user", "content": message}) | |
| # Use conversational generation logic (same as voice input) | |
| response = generate_chat_completion(groq_client, state.conversation, state.genre, state.mood, state.theme) | |
| # Add assistant response | |
| state.conversation.append({"role": "assistant", "content": response}) | |
| # Update lyrics with improved format recognition - extract only segments | |
| if any(marker in response.lower() for marker in ["[verse", "[chorus", "[bridge", "**verse", "**chorus", "sectiontype.verse", "verse:"]): | |
| state.lyrics = extract_lyrics_segments_only(response) | |
| # Format conversation for display | |
| return "", [{"role": msg["role"], "content": msg["content"]} for msg in state.conversation], state | |
| except Exception as e: | |
| error_response = f"Sorry, I encountered an error: {str(e)}" | |
| state.conversation.append({"role": "assistant", "content": error_response}) | |
| return "", [{"role": msg["role"], "content": msg["content"]} for msg in state.conversation], state | |
| def build_interface(): | |
| """Build the Gradio interface optimized for Spaces with high performance""" | |
| with gr.Blocks( | |
| title="MiloMusic - AI Music Generation", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .container { max-width: 1400px; margin: auto; } | |
| .performance-notice { background-color: #d4edda; padding: 15px; border-radius: 5px; margin: 10px 0; } | |
| .generation-status { background-color: #f8f9fa; padding: 10px; border-radius: 5px; } | |
| """ | |
| ) as demo: | |
| # Header | |
| gr.Markdown(""" | |
| # 🎵 MiloMusic - AI Music Generation | |
| ### Professional AI-powered music creation from natural language | |
| """) | |
| # Performance notice for Spaces | |
| gr.Markdown(""" | |
| <div class="performance-notice"> | |
| 🚀 <strong>High-Performance Mode:</strong> Running on Spaces GPU with optimized settings for best quality. | |
| Generation time: ~10-13 minutes for professional-grade music with vocals and instruments. | |
| </div> | |
| """) | |
| state = gr.State(AppState()) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Input controls | |
| with gr.Group(): | |
| gr.Markdown("### 🎛️ Music Settings") | |
| with gr.Row(): | |
| genre = gr.Dropdown( | |
| choices=["pop", "rock", "jazz", "classical", "folk", "r&b", "country", "hip-hop"], | |
| value="pop", label="Genre" | |
| ) | |
| mood = gr.Dropdown( | |
| choices=["upbeat", "melancholic", "energetic", "calm", "romantic", "dark", "mysterious", "joyful"], | |
| value="upbeat", label="Mood" | |
| ) | |
| theme = gr.Dropdown( | |
| choices=["love", "friendship", "adventure", "nostalgia", "freedom", "hope", "dreams", "nature"], | |
| value="love", label="Theme" | |
| ) | |
| # Voice Input | |
| with gr.Group(): | |
| gr.Markdown("### 🎤 Voice Input") | |
| input_audio = gr.Audio( | |
| label="Speak Your Musical Ideas", | |
| sources=["microphone"], | |
| type="numpy", | |
| streaming=False, | |
| waveform_options=gr.WaveformOptions(waveform_color="#B83A4B"), | |
| ) | |
| # Chat interface | |
| with gr.Group(): | |
| gr.Markdown("### 💬 Lyrics Creation Chat") | |
| chatbot = gr.Chatbot(height=400, label="AI Lyrics Assistant", show_copy_button=True, type="messages") | |
| with gr.Row(): | |
| text_input = gr.Textbox( | |
| placeholder="Or type your song idea here...", | |
| show_label=False, | |
| scale=4, | |
| lines=2 | |
| ) | |
| send_btn = gr.Button("Send", scale=1, variant="primary") | |
| with gr.Column(scale=1): | |
| # Output controls | |
| with gr.Group(): | |
| gr.Markdown("### 🎵 Music Generation") | |
| lyrics_display = gr.Textbox( | |
| label="Current Lyrics", | |
| lines=12, | |
| interactive=True, | |
| placeholder="Your generated lyrics will appear here..." | |
| ) | |
| generate_btn = gr.Button("🎼 Generate High-Quality Music", variant="primary", size="lg") | |
| with gr.Column(): | |
| music_output = gr.Audio(label="Generated Music", type="filepath", show_download_button=True) | |
| gr.Markdown(""" | |
| <div class="generation-status"> | |
| <strong>Generation Features:</strong><br> | |
| • Full 30-second clips<br> | |
| • Professional vocals<br> | |
| • Rich instrumentation<br> | |
| • High-fidelity audio | |
| </div> | |
| """) | |
| # Controls | |
| with gr.Group(): | |
| gr.Markdown("### 🔧 Controls") | |
| new_song_btn = gr.Button("🆕 Start New Song") | |
| clear_btn = gr.Button("🧹 Clear Chat") | |
| # Event handlers | |
| def update_state_settings(genre_val, mood_val, theme_val, state): | |
| state.genre = genre_val | |
| state.mood = mood_val | |
| state.theme = theme_val | |
| return state | |
| # Update state when settings change | |
| for component in [genre, mood, theme]: | |
| component.change( | |
| fn=update_state_settings, | |
| inputs=[genre, mood, theme, state], | |
| outputs=[state] | |
| ) | |
| # Voice recording functionality (from app.py) | |
| stream = input_audio.start_recording( | |
| process_audio, | |
| [input_audio, state], | |
| [input_audio, state], | |
| ) | |
| respond_audio = input_audio.stop_recording( | |
| response_audio, [state, input_audio, genre, mood, theme], [state, chatbot, lyrics_display] | |
| ) | |
| restart = respond_audio.then(start_recording_user, [state], [input_audio]).then( | |
| lambda state: state, state, state, js=js_reset | |
| ) | |
| # Text chat functionality with lyrics update | |
| def respond_with_lyrics_update(message, state): | |
| text_output, chat_output, updated_state = respond(message, state) | |
| return text_output, chat_output, updated_state, updated_state.lyrics | |
| send_btn.click( | |
| fn=respond_with_lyrics_update, | |
| inputs=[text_input, state], | |
| outputs=[text_input, chatbot, state, lyrics_display], | |
| queue=True | |
| ) | |
| text_input.submit( | |
| fn=respond_with_lyrics_update, | |
| inputs=[text_input, state], | |
| outputs=[text_input, chatbot, state, lyrics_display], | |
| queue=True | |
| ) | |
| # Music generation with progress | |
| generate_btn.click( | |
| fn=generate_music_spaces, | |
| inputs=[lyrics_display, genre, mood, theme], | |
| outputs=[music_output], | |
| queue=True, | |
| show_progress=True | |
| ) | |
| # Control buttons | |
| new_song_btn.click( | |
| fn=lambda: (AppState(), [], "", None, gr.Audio(recording=False)), | |
| outputs=[state, chatbot, lyrics_display, music_output, input_audio], | |
| cancels=[respond_audio, restart] | |
| ) | |
| clear_btn.click( | |
| fn=lambda: [], | |
| outputs=[chatbot] | |
| ) | |
| # Auto-update lyrics display when state changes | |
| state.change( | |
| fn=lambda s: s.lyrics, | |
| inputs=[state], | |
| outputs=[lyrics_display] | |
| ) | |
| # Instructions | |
| gr.Markdown(""" | |
| ### 📖 How to create your music: | |
| 1. **Set your preferences**: Choose genre, mood, and theme | |
| 2. **Voice or chat**: Either speak your ideas or type them in the chat | |
| 3. **Refine the lyrics**: Ask for changes, different verses, or style adjustments | |
| 4. **Generate music**: Click the generate button for professional-quality output | |
| 5. **Download & enjoy**: Your high-fidelity music with vocals and instruments | |
| **Tips**: Be specific about your vision - mention instruments, vocal style, or song structure! | |
| """) | |
| # Footer | |
| gr.Markdown(""" | |
| --- | |
| <center> | |
| Made with ❤️ by the MiloMusic Team | Powered by YuE (乐) Model | 🤗 Hugging Face Spaces | |
| </center> | |
| """) | |
| return demo | |
| # Audio transcription functions (from app.py) | |
| def process_whisper_response(completion): | |
| """ | |
| Process Whisper transcription response and filter out silence. | |
| """ | |
| if completion.segments and len(completion.segments) > 0: | |
| no_speech_prob = completion.segments[0].get('no_speech_prob', 0) | |
| print("No speech prob:", no_speech_prob) | |
| if no_speech_prob > 0.7: | |
| print("No speech detected") | |
| return None | |
| return completion.text.strip() | |
| return None | |
| def transcribe_audio(client, file_name): | |
| """ | |
| Transcribe an audio file using the Whisper model via the Groq API. | |
| """ | |
| if file_name is None: | |
| return None | |
| try: | |
| with open(file_name, "rb") as audio_file: | |
| with open("audio.wav", "wb") as f: | |
| f.write(audio_file.read()) | |
| response = client.audio.transcriptions.create( | |
| model="whisper-large-v3-turbo", | |
| file=("audio.wav", audio_file), | |
| response_format="text", | |
| language="en", | |
| ) | |
| # Process the response to filter out silence | |
| # For text response format, we need to check if response is meaningful | |
| if response and len(response.strip()) > 0: | |
| return response.strip() | |
| else: | |
| return None | |
| except Exception as e: | |
| print(f"Transcription error: {e}") | |
| return f"Error in audio transcription: {str(e)}" | |
| def start_recording_user(state: AppState): | |
| """ | |
| Reset the audio recording component for a new user input. | |
| """ | |
| return None | |
| def process_audio(audio: tuple, state: AppState): | |
| """ | |
| Process recorded audio in real-time during recording. | |
| """ | |
| return audio, state | |
| def response_audio(state: AppState, audio: tuple, genre_value, mood_value, theme_value): | |
| """ | |
| Process recorded audio and generate a response based on transcription. | |
| """ | |
| if not audio: | |
| return state, [] | |
| # Update state with current dropdown values | |
| state.genre, state.mood, state.theme = genre_value, mood_value, theme_value | |
| temp_dir = tempfile.gettempdir() | |
| file_name = os.path.join(temp_dir, f"{xxhash.xxh32(bytes(audio[1])).hexdigest()}.wav") | |
| sf.write(file_name, audio[1], audio[0], format="wav") | |
| api_key = os.environ.get("GROQ_API_KEY") | |
| if not api_key: | |
| raise ValueError("Please set the GROQ_API_KEY environment variable.") | |
| client = groq.Client(api_key=api_key) | |
| # Transcribe the audio file | |
| transcription = transcribe_audio(client, file_name) | |
| if transcription: | |
| if isinstance(transcription, str) and transcription.startswith("Error"): | |
| transcription = "Error in audio transcription." | |
| state.conversation.append({"role": "user", "content": transcription}) | |
| assistant_message = generate_chat_completion(client, state.conversation, state.genre, state.mood, state.theme) | |
| state.conversation.append({"role": "assistant", "content": assistant_message}) | |
| # Update lyrics using same logic as text input for consistency - extract only segments | |
| if any(marker in assistant_message.lower() for marker in ["[verse", "[chorus", "[bridge", "**verse", "**chorus", "sectiontype.verse", "verse:"]): | |
| state.lyrics = extract_lyrics_segments_only(assistant_message) | |
| os.remove(file_name) | |
| # Format conversation for display in messages format | |
| conversation_display = [] | |
| for msg in state.conversation: | |
| conversation_display.append({"role": msg["role"], "content": msg["content"]}) | |
| return state, conversation_display, state.lyrics | |
| def extract_lyrics_segments_only(content): | |
| """ | |
| Extract only the lyrics segments (VERSE, CHORUS, etc.) from AI response, | |
| removing any AI commentary or explanation text. | |
| """ | |
| import re | |
| if not content: | |
| return "" | |
| lines = content.split('\n') | |
| lyrics_lines = [] | |
| in_lyrics_section = False | |
| for line in lines: | |
| line = line.strip() | |
| # Check if this line is a section header (VERSE, CHORUS, etc.) | |
| if re.match(r'^\*\*(VERSE|CHORUS|BRIDGE|OUTRO).*\*\*$', line) or re.match(r'^\[(VERSE|CHORUS|BRIDGE|OUTRO).*\]$', line): | |
| in_lyrics_section = True | |
| lyrics_lines.append(line) | |
| continue | |
| # If we're in a lyrics section | |
| if in_lyrics_section: | |
| # Stop if we hit AI commentary | |
| if line and any(phrase in line.lower() for phrase in [ | |
| 'how do you like', 'would you like', 'let me know', | |
| 'what do you think', 'any changes', 'take a look', | |
| 'here are the lyrics', 'i\'ve created', 'feel free to' | |
| ]): | |
| break | |
| # Add lyrics line (including empty lines for formatting) | |
| lyrics_lines.append(line) | |
| return '\n'.join(lyrics_lines).strip() | |
| def extract_lyrics_from_conversation(conversation): | |
| """ | |
| Extract lyrics from conversation history with cross-platform compatibility. | |
| """ | |
| lyrics = "" | |
| for message in reversed(conversation): | |
| if message["role"] == "assistant": | |
| content_lower = message["content"].lower() | |
| # 先尝试严格匹配(保持原逻辑) | |
| if "verse" in content_lower and "chorus" in content_lower: | |
| lyrics = extract_lyrics_segments_only(message["content"]) | |
| break | |
| # 如果没找到,再用宽泛匹配(兼容性备选) | |
| elif any(marker in content_lower for marker in ["[verse", "[chorus", "**verse", "**chorus"]): | |
| lyrics = extract_lyrics_segments_only(message["content"]) | |
| break | |
| return lyrics | |
| def generate_chat_completion(client, history, genre, mood, theme): | |
| """ | |
| Generate an AI assistant response based on conversation history and song parameters. | |
| """ | |
| messages = [] | |
| system_prompt = f"""You are a creative AI music generator assistant. Help users create song lyrics in the {genre} genre with a {mood} mood about {theme}. | |
| When generating lyrics, create a chorus and at least one verse. Format lyrics clearly with VERSE and CHORUS labels. | |
| Ask if they like the lyrics or want changes. Be conversational, friendly, and creative. | |
| Keep the lyrics appropriate for the selected genre, mood, and theme unless the user specifically requests changes.""" | |
| messages.append({ | |
| "role": "system", | |
| "content": system_prompt, | |
| }) | |
| for message in history: | |
| messages.append(message) | |
| try: | |
| completion = client.chat.completions.create( | |
| model="meta-llama/llama-4-scout-17b-16e-instruct", | |
| messages=messages, | |
| ) | |
| return completion.choices[0].message.content | |
| except Exception as e: | |
| return f"Error in generating chat completion: {str(e)}" | |
| # JavaScript for frontend enhancements | |
| js_reset = """ | |
| () => { | |
| var record = document.querySelector('.record-button'); | |
| if (record) { | |
| record.textContent = "Just Start Talking!" | |
| record.style = "width: fit-content; padding-right: 0.5vw;" | |
| } | |
| } | |
| """ | |
| # Build the interface | |
| demo = build_interface() | |
| if __name__ == "__main__": | |
| """ | |
| Spaces entry point - optimized for high-performance deployment | |
| """ | |
| print("🚀 Starting MiloMusic High-Performance Mode on Hugging Face Spaces...") | |
| print(f"📁 Working directory: {os.getcwd()}") | |
| print(f"📂 Directory contents: {os.listdir('.')}") | |
| # Validate file structure | |
| if not validate_file_structure(): | |
| print("❌ Required files missing - please check your upload") | |
| sys.exit(1) | |
| # Validate environment | |
| if not validate_api_keys(): | |
| print("⚠️ Some API keys missing - functionality may be limited") | |
| # Launch with optimized settings for Spaces | |
| demo.queue( | |
| default_concurrency_limit=5, # Allow more concurrent users | |
| max_size=20 | |
| ).launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, # Spaces handles sharing | |
| show_error=True, | |
| quiet=False, | |
| favicon_path=None, | |
| ssl_verify=False | |
| ) | 
