import os import sys import tempfile import zipfile import shutil import subprocess import importlib import traceback # from typing import Optional, Dict, Any # Force flush stdout and stderr for better logging in Hugging Face Spaces def log_print(*args, **kwargs): print(*args, **kwargs) sys.stdout.flush() def log_error(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) sys.stderr.flush() def check_gpu_memory(): """Check and log GPU memory usage""" if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated() / 1024**3 cached = torch.cuda.memory_reserved() / 1024**3 log_print(f"GPU Memory: {allocated:.2f}GB allocated, {cached:.2f}GB cached") else: log_print("CUDA not available, using CPU") try: import gradio as gr import spaces import torch import torchaudio import librosa import yaml import numpy as np import nltk import requests from pydub import AudioSegment import soundfile as sf except ImportError as e: log_error(f"Import error: {e}") log_error("Please install required packages using: pip install -r requirements.txt") sys.exit(1) # Ensure module import works regardless of working directory location ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) OPENVOICE_DIR = os.path.join(ROOT_DIR, "OpenVoice") if os.path.isdir(OPENVOICE_DIR) and OPENVOICE_DIR not in sys.path: sys.path.insert(0, OPENVOICE_DIR) # Also work inside the OpenVoice project subdir so relative ckpt paths resolve PROJECT_SUBDIR = "OpenVoice" if os.path.isdir(os.path.join(ROOT_DIR, PROJECT_SUBDIR)): os.chdir(os.path.join(ROOT_DIR, PROJECT_SUBDIR)) # Import OpenVoice modules from openvoice import se_extractor from openvoice.api import ToneColorConverter TTS = None # will import lazily after ensuring MeCab/Unidic # Import Seed-VC modules from modules.commons import build_model, load_checkpoint, recursive_munch from hf_utils import load_custom_model_from_hf # OpenVoice configuration CKPT_CONVERTER_DIR = 'checkpoints_v2/converter' BASE_SPEAKER_SE_DIR = 'checkpoints_v2/base_speakers/ses' OUTPUT_DIR = 'outputs_v2' os.makedirs(OUTPUT_DIR, exist_ok=True) DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" # Lazy singletons _tone_color_converter = None _melo_models = {} # Seed-VC models (will be initialized at startup) _seed_vc_models = None def get_tone_color_converter(): global _tone_color_converter if _tone_color_converter is None: ensure_checkpoints() converter = ToneColorConverter(f'{CKPT_CONVERTER_DIR}/config.json', device=DEVICE) converter.load_ckpt(f'{CKPT_CONVERTER_DIR}/checkpoint.pth') _tone_color_converter = converter return _tone_color_converter def ensure_unidic_available() -> None: try: import unidic # type: ignore dicdir = getattr(unidic, 'DICDIR', None) if not dicdir or not os.path.exists(os.path.join(dicdir, 'mecabrc')): subprocess.run([sys.executable, '-m', 'unidic', 'download'], check=False) # Reload to get DICDIR unidic = importlib.reload(unidic) dicdir = getattr(unidic, 'DICDIR', None) if dicdir and os.path.exists(os.path.join(dicdir, 'mecabrc')): os.environ['MECABRC'] = os.path.join(dicdir, 'mecabrc') except Exception: # Best-effort; MeloTTS may still work for non-Japanese pass def ensure_nltk_resources() -> None: try: nltk.data.find('taggers/averaged_perceptron_tagger_eng') except LookupError: try: nltk.download('averaged_perceptron_tagger_eng', quiet=True) except Exception: pass try: nltk.data.find('corpora/cmudict') except LookupError: try: nltk.download('cmudict', quiet=True) except Exception: pass def get_melo_model(language: str): # Normalize a couple of aliases from demo_part3 if language.lower() in {"en_us", "en-newest", "en_newest"}: language = "EN_NEWEST" language = language.upper() if language not in _melo_models: global TTS if TTS is None: ensure_unidic_available() ensure_nltk_resources() from melo.api import TTS as _TTS # type: ignore TTS = _TTS _melo_models[language] = TTS(language=language, device=DEVICE) return _melo_models[language] def list_supported_styles(): # Map speaker .pth names we have to user choices style_list = [] if not os.path.isdir(BASE_SPEAKER_SE_DIR): return style_list for name in sorted(os.listdir(BASE_SPEAKER_SE_DIR)): if name.endswith('.pth'): style_list.append(os.path.splitext(name)[0]) return style_list def ensure_checkpoints(): # Download and place checkpoints at exact expected paths. if os.path.exists(CKPT_CONVERTER_DIR) and os.path.isdir(BASE_SPEAKER_SE_DIR): return url = 'https://myshell-public-repo-host.s3.amazonaws.com/openvoice/checkpoints_v2_0417.zip' tmp_zip = os.path.join(tempfile.gettempdir(), 'checkpoints_v2_0417.zip') tmp_extract_root = tempfile.mkdtemp(prefix='ov_ckpt_') try: with requests.get(url, stream=True, timeout=300) as r: r.raise_for_status() with open(tmp_zip, 'wb') as f: for chunk in r.iter_content(chunk_size=1 << 20): if chunk: f.write(chunk) with zipfile.ZipFile(tmp_zip, 'r') as zf: zf.extractall(tmp_extract_root) # Find the folder that contains 'converter/config.json' directly under it candidate = None for root, _, _ in os.walk(tmp_extract_root): cfg = os.path.join(root, 'converter', 'config.json') ses_dir = os.path.join(root, 'base_speakers', 'ses') if os.path.isfile(cfg) and os.path.isdir(ses_dir): candidate = root break if candidate is None: raise RuntimeError('Could not locate converter/config.json inside the downloaded archive.') # Place contents into 'checkpoints_v2' target_root = 'checkpoints_v2' if os.path.exists(target_root): shutil.rmtree(target_root, ignore_errors=True) os.makedirs(target_root, exist_ok=True) # Copy only required subfolders for name in ['converter', 'base_speakers']: src_path = os.path.join(candidate, name) dst_path = os.path.join(target_root, name) if os.path.exists(dst_path): shutil.rmtree(dst_path, ignore_errors=True) shutil.copytree(src_path, dst_path) except Exception as e: raise gr.Error(f"Failed to prepare checkpoints: {e}") finally: try: if os.path.isdir(tmp_extract_root): shutil.rmtree(tmp_extract_root, ignore_errors=True) except Exception: pass def initialize_seed_vc_models(): """Initialize Seed-VC models with memory optimization""" global _seed_vc_models if _seed_vc_models is not None: return _seed_vc_models log_print("Loading Seed-VC models...") # Clear GPU cache before loading models if torch.cuda.is_available(): torch.cuda.empty_cache() # Load DiT model dit_checkpoint_path, dit_config_path = load_custom_model_from_hf("Plachta/Seed-VC", "DiT_seed_v2_uvit_whisper_small_wavenet_bigvgan_pruned.pth", "config_dit_mel_seed_uvit_whisper_small_wavenet.yml") with open(dit_config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) model_params = recursive_munch(config['model_params']) model = build_model(model_params, stage='DiT') hop_length = config['preprocess_params']['spect_params']['hop_length'] sr = config['preprocess_params']['sr'] # Load checkpoints with memory optimization model, _, _, _ = load_checkpoint(model, None, dit_checkpoint_path, load_only_params=True, ignore_modules=[], is_distributed=False) for key in model: model[key].eval() model[key].to(DEVICE) model.cfm.estimator.setup_caches(max_batch_size=1, max_seq_length=1024) # Optimized for ZeroGPU # Clear GPU cache after DiT model loading if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() # Load CAMPPlus from modules.campplus.DTDNN import CAMPPlus campplus_ckpt_path = load_custom_model_from_hf("funasr/campplus", "campplus_cn_common.bin", config_filename=None) campplus_model = CAMPPlus(feat_dim=80, embedding_size=192) campplus_model.load_state_dict(torch.load(campplus_ckpt_path, map_location="cpu")) campplus_model.eval() campplus_model.to(DEVICE) # Clear GPU cache after CAMPPlus loading if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() # Load BigVGAN - FAIL IF CANNOT LOAD (원래 의도 유지) try: from modules.bigvgan import bigvgan bigvgan_model = bigvgan.BigVGAN.from_pretrained('nvidia/bigvgan_v2_22khz_80band_256x', use_cuda_kernel=False) bigvgan_model.remove_weight_norm() bigvgan_model = bigvgan_model.eval().to(DEVICE) log_print("✓ BigVGAN loaded successfully") # Clear GPU cache after BigVGAN loading if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() except Exception as e: log_error(f"CRITICAL ERROR: Failed to load BigVGAN: {e}") log_error(f"BigVGAN error traceback: {traceback.format_exc()}") raise gr.Error(f"BigVGAN 모델 로딩 실패: {e}. 앱을 시작할 수 없습니다.") # Load FAcodec - FAIL IF CANNOT LOAD (원래 의도 유지) try: ckpt_path, config_path = load_custom_model_from_hf("Plachta/FAcodec", 'pytorch_model.bin', 'config.yml') with open(config_path, 'r', encoding='utf-8') as f: codec_config = yaml.safe_load(f) codec_model_params = recursive_munch(codec_config['model_params']) # Remove problematic parameters if hasattr(codec_model_params, 'dac_params'): dac_params = codec_model_params.dac_params problematic_params = ['causal', 'causal_conv', 'causal_attention', 'lstm'] for param in problematic_params: if hasattr(dac_params, param): delattr(dac_params, param) log_print(f"Removed '{param}' parameter from DAC config") codec_encoder = build_model(codec_model_params, stage="codec") log_print("✓ FAcodec loaded successfully") # Clear GPU cache after FAcodec loading if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() except Exception as e: log_error(f"CRITICAL ERROR: Failed to load FAcodec: {e}") log_error(f"FAcodec error traceback: {traceback.format_exc()}") raise gr.Error(f"FAcodec 모델 로딩 실패: {e}. 앱을 시작할 수 없습니다.") # Load codec checkpoint - FAIL IF CANNOT LOAD (원래 의도 유지) try: ckpt_params = torch.load(ckpt_path, map_location="cpu") if 'codec' in ckpt_params: codec_encoder.codec.load_state_dict(ckpt_params['codec'], strict=False) elif 'model' in ckpt_params: codec_encoder.codec.load_state_dict(ckpt_params['model'], strict=False) else: codec_encoder.codec.load_state_dict(ckpt_params, strict=False) log_print("✓ Codec checkpoint loaded successfully") # Clear GPU cache after codec checkpoint loading if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() except Exception as e: log_error(f"CRITICAL ERROR: Failed to load codec checkpoint: {e}") log_error(f"Codec checkpoint error traceback: {traceback.format_exc()}") raise gr.Error(f"코덱 체크포인트 로딩 실패: {e}. 앱을 시작할 수 없습니다.") _ = [codec_encoder[key].eval() for key in codec_encoder] _ = [codec_encoder[key].to(DEVICE) for key in codec_encoder] # Load Whisper from transformers import AutoFeatureExtractor, WhisperModel whisper_name = model_params.speech_tokenizer.whisper_name if hasattr(model_params.speech_tokenizer, 'whisper_name') else "openai/whisper-small" whisper_model = WhisperModel.from_pretrained(whisper_name, torch_dtype=torch.float16).to(DEVICE) del whisper_model.decoder whisper_feature_extractor = AutoFeatureExtractor.from_pretrained(whisper_name) # Clear GPU cache after Whisper loading if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() # Mel spectrogram function mel_fn_args = { "n_fft": config['preprocess_params']['spect_params']['n_fft'], "win_size": config['preprocess_params']['spect_params']['win_length'], "hop_size": config['preprocess_params']['spect_params']['hop_length'], "num_mels": config['preprocess_params']['spect_params']['n_mels'], "sampling_rate": sr, "fmin": 0, "fmax": None, "center": False } from modules.audio import mel_spectrogram to_mel = lambda x: mel_spectrogram(x, **mel_fn_args) # Load F0 conditioned model dit_checkpoint_path_f0, dit_config_path_f0 = load_custom_model_from_hf("Plachta/Seed-VC", "DiT_seed_v2_uvit_whisper_base_f0_44k_bigvgan_pruned_ft_ema.pth", "config_dit_mel_seed_uvit_whisper_base_f0_44k.yml") with open(dit_config_path_f0, 'r', encoding='utf-8') as f: config_f0 = yaml.safe_load(f) model_params_f0 = recursive_munch(config_f0['model_params']) model_f0 = build_model(model_params_f0, stage='DiT') hop_length_f0 = config_f0['preprocess_params']['spect_params']['hop_length'] sr_f0 = config_f0['preprocess_params']['sr'] # Load checkpoints model_f0, _, _, _ = load_checkpoint(model_f0, None, dit_checkpoint_path_f0, load_only_params=True, ignore_modules=[], is_distributed=False) for key in model_f0: model_f0[key].eval() model_f0[key].to(DEVICE) model_f0.cfm.estimator.setup_caches(max_batch_size=1, max_seq_length=1024) # Optimized for ZeroGPU # Clear GPU cache after F0 model loading if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() # Load RMVPE from modules.rmvpe import RMVPE model_path = load_custom_model_from_hf("lj1995/VoiceConversionWebUI", "rmvpe.pt", None) rmvpe = RMVPE(model_path, is_half=False, device=DEVICE) mel_fn_args_f0 = { "n_fft": config_f0['preprocess_params']['spect_params']['n_fft'], "win_size": config_f0['preprocess_params']['spect_params']['win_length'], "hop_size": config_f0['preprocess_params']['spect_params']['hop_length'], "num_mels": config_f0['preprocess_params']['spect_params']['n_mels'], "sampling_rate": sr_f0, "fmin": 0, "fmax": None, "center": False } to_mel_f0 = lambda x: mel_spectrogram(x, **mel_fn_args_f0) # Load BigVGAN 44k - FAIL IF CANNOT LOAD (원래 의도 유지) try: bigvgan_44k_model = bigvgan.BigVGAN.from_pretrained('nvidia/bigvgan_v2_44khz_128band_512x', use_cuda_kernel=False) bigvgan_44k_model.remove_weight_norm() bigvgan_44k_model = bigvgan_44k_model.eval().to(DEVICE) log_print("✓ BigVGAN 44k loaded successfully") # Clear GPU cache after BigVGAN 44k loading if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() except Exception as e: log_error(f"CRITICAL ERROR: Failed to load BigVGAN 44k: {e}") log_error(f"BigVGAN 44k error traceback: {traceback.format_exc()}") raise gr.Error(f"BigVGAN 44k 모델 로딩 실패: {e}. 앱을 시작할 수 없습니다.") _seed_vc_models = { 'model': model, 'model_f0': model_f0, 'campplus_model': campplus_model, 'bigvgan_model': bigvgan_model, 'bigvgan_44k_model': bigvgan_44k_model, 'codec_encoder': codec_encoder, 'whisper_model': whisper_model, 'whisper_feature_extractor': whisper_feature_extractor, 'to_mel': to_mel, 'to_mel_f0': to_mel_f0, 'rmvpe': rmvpe, 'config': config, 'config_f0': config_f0, 'hop_length': hop_length, 'sr': sr, 'hop_length_f0': hop_length_f0, 'sr_f0': sr_f0 } log_print("✓ All Seed-VC models loaded successfully!") return _seed_vc_models def adjust_f0_semitones(f0_sequence, n_semitones): factor = 2 ** (n_semitones / 12) return f0_sequence * factor def crossfade(chunk1, chunk2, overlap): fade_out = np.cos(np.linspace(0, np.pi / 2, overlap)) ** 2 fade_in = np.cos(np.linspace(np.pi / 2, 0, overlap)) ** 2 chunk2[:overlap] = chunk2[:overlap] * fade_in + chunk1[-overlap:] * fade_out return chunk2 # Step 1: OpenVoice TTS + Voice Cloning def run_openvoice_inference(text: str, style_key: str, speed: float, reference_audio_path: str) -> str: if not text or not reference_audio_path: raise gr.Error("Please provide text and a reference audio.") # Re-evaluate device at call time for ZeroGPU global DEVICE DEVICE = "cuda:0" if torch.cuda.is_available() else DEVICE converter = get_tone_color_converter() # Extract target speaker embedding from uploaded reference audio target_se, _ = se_extractor.get_se(reference_audio_path, converter, vad=True) # Prepare base speech with Melo language_from_style = "EN_NEWEST" if style_key.startswith("en-") else None if style_key.startswith("es"): language_from_style = "ES" elif style_key.startswith("fr"): language_from_style = "FR" elif style_key.startswith("zh"): language_from_style = "ZH" elif style_key.startswith("jp"): language_from_style = "JP" elif style_key.startswith("kr"): language_from_style = "KR" melo = get_melo_model(language_from_style or "EN_NEWEST") speaker_ids = melo.hps.data.spk2id # Pick first available speaker id for that language speaker_id = next(iter(speaker_ids.values())) # Disable MPS quirk similar to demo_part3 if torch.backends.mps.is_available() and DEVICE == 'cpu': torch.backends.mps.is_available = lambda: False tmp_wav = os.path.join(OUTPUT_DIR, 'tmp.wav') melo.tts_to_file(text, speaker_id, tmp_wav, speed=speed) # Source speaker embedding from selected base style source_se_path = os.path.join(BASE_SPEAKER_SE_DIR, f'{style_key}.pth') if not os.path.exists(source_se_path): raise gr.Error(f"Missing base speaker embedding: {source_se_path}") source_se = torch.load(source_se_path, map_location=DEVICE) out_path = os.path.join(OUTPUT_DIR, f'openvoice_output_{style_key}.wav') # Convert tone color get_tone_color_converter().convert( audio_src_path=tmp_wav, src_se=source_se, tgt_se=target_se, output_path=out_path, message='@MyShell', ) return out_path # Step 2: Seed-VC Voice Conversion @torch.no_grad() @torch.inference_mode() def run_seed_vc_inference(source_audio_path: str, target_audio_path: str, vc_diffusion_steps: int, vc_length_adjust: float, vc_inference_cfg_rate: float, vc_f0_condition: bool, vc_auto_f0_adjust: bool, vc_pitch_shift: int) -> str: log_print("Initializing Seed-VC models...") models = initialize_seed_vc_models() log_print("✓ Seed-VC models ready") # Clear GPU cache before inference if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() inference_module = models['model_f0'] if vc_f0_condition else models['model'] mel_fn = models['to_mel_f0'] if vc_f0_condition else models['to_mel'] bigvgan_fn = models['bigvgan_44k_model'] if vc_f0_condition else models['bigvgan_model'] sr = models['sr_f0'] if vc_f0_condition else models['sr'] hop_length = models['hop_length_f0'] if vc_f0_condition else models['hop_length'] max_context_window = sr // hop_length * 30 overlap_frame_len = 16 overlap_wave_len = overlap_frame_len * hop_length bitrate = "320k" # Load audio source_audio = librosa.load(source_audio_path, sr=sr)[0] ref_audio = librosa.load(target_audio_path, sr=sr)[0] # Process audio source_audio = torch.tensor(source_audio).unsqueeze(0).float().to(DEVICE) ref_audio = torch.tensor(ref_audio[:sr * 25]).unsqueeze(0).float().to(DEVICE) # Resample ref_waves_16k = torchaudio.functional.resample(ref_audio, sr, 16000) converted_waves_16k = torchaudio.functional.resample(source_audio, sr, 16000) # Whisper processing if converted_waves_16k.size(-1) <= 16000 * 30: alt_inputs = models['whisper_feature_extractor']([converted_waves_16k.squeeze(0).cpu().numpy()], return_tensors="pt", return_attention_mask=True, sampling_rate=16000) alt_input_features = models['whisper_model']._mask_input_features( alt_inputs.input_features, attention_mask=alt_inputs.attention_mask).to(DEVICE) alt_outputs = models['whisper_model'].encoder( alt_input_features.to(models['whisper_model'].encoder.dtype), head_mask=None, output_attentions=False, output_hidden_states=False, return_dict=True, ) S_alt = alt_outputs.last_hidden_state.to(torch.float32) S_alt = S_alt[:, :converted_waves_16k.size(-1) // 320 + 1] else: overlapping_time = 5 # 5 seconds S_alt_list = [] buffer = None traversed_time = 0 while traversed_time < converted_waves_16k.size(-1): if buffer is None: # first chunk chunk = converted_waves_16k[:, traversed_time:traversed_time + 16000 * 30] else: chunk = torch.cat([buffer, converted_waves_16k[:, traversed_time:traversed_time + 16000 * (30 - overlapping_time)]], dim=-1) alt_inputs = models['whisper_feature_extractor']([chunk.squeeze(0).cpu().numpy()], return_tensors="pt", return_attention_mask=True, sampling_rate=16000) alt_input_features = models['whisper_model']._mask_input_features( alt_inputs.input_features, attention_mask=alt_inputs.attention_mask).to(DEVICE) alt_outputs = models['whisper_model'].encoder( alt_input_features.to(models['whisper_model'].encoder.dtype), head_mask=None, output_attentions=False, output_hidden_states=False, return_dict=True, ) S_alt = alt_outputs.last_hidden_state.to(torch.float32) S_alt = S_alt[:, :chunk.size(-1) // 320 + 1] if traversed_time == 0: S_alt_list.append(S_alt) else: S_alt_list.append(S_alt[:, 50 * overlapping_time:]) buffer = chunk[:, -16000 * overlapping_time:] traversed_time += 30 * 16000 if traversed_time == 0 else chunk.size(-1) - 16000 * overlapping_time S_alt = torch.cat(S_alt_list, dim=1) ori_waves_16k = torchaudio.functional.resample(ref_audio, sr, 16000) ori_inputs = models['whisper_feature_extractor']([ori_waves_16k.squeeze(0).cpu().numpy()], return_tensors="pt", return_attention_mask=True) ori_input_features = models['whisper_model']._mask_input_features( ori_inputs.input_features, attention_mask=ori_inputs.attention_mask).to(DEVICE) with torch.no_grad(): ori_outputs = models['whisper_model'].encoder( ori_input_features.to(models['whisper_model'].encoder.dtype), head_mask=None, output_attentions=False, output_hidden_states=False, return_dict=True, ) S_ori = ori_outputs.last_hidden_state.to(torch.float32) S_ori = S_ori[:, :ori_waves_16k.size(-1) // 320 + 1] mel = mel_fn(source_audio.to(DEVICE).float()) mel2 = mel_fn(ref_audio.to(DEVICE).float()) target_lengths = torch.LongTensor([int(mel.size(2) * vc_length_adjust)]).to(mel.device) target2_lengths = torch.LongTensor([mel2.size(2)]).to(mel2.device) feat2 = torchaudio.compliance.kaldi.fbank(ref_waves_16k, num_mel_bins=80, dither=0, sample_frequency=16000) feat2 = feat2 - feat2.mean(dim=0, keepdim=True) style2 = models['campplus_model'](feat2.unsqueeze(0)) if vc_f0_condition: F0_ori = models['rmvpe'].infer_from_audio(ref_waves_16k[0], thred=0.5) F0_alt = models['rmvpe'].infer_from_audio(converted_waves_16k[0], thred=0.5) F0_ori = torch.from_numpy(F0_ori).to(DEVICE)[None] F0_alt = torch.from_numpy(F0_alt).to(DEVICE)[None] voiced_F0_ori = F0_ori[F0_ori > 1] voiced_F0_alt = F0_alt[F0_alt > 1] log_f0_alt = torch.log(F0_alt + 1e-5) voiced_log_f0_ori = torch.log(voiced_F0_ori + 1e-5) voiced_log_f0_alt = torch.log(voiced_F0_alt + 1e-5) median_log_f0_ori = torch.median(voiced_log_f0_ori) median_log_f0_alt = torch.median(voiced_log_f0_alt) # shift alt log f0 level to ori log f0 level shifted_log_f0_alt = log_f0_alt.clone() if vc_auto_f0_adjust: shifted_log_f0_alt[F0_alt > 1] = log_f0_alt[F0_alt > 1] - median_log_f0_alt + median_log_f0_ori shifted_f0_alt = torch.exp(shifted_log_f0_alt) if vc_pitch_shift != 0: shifted_f0_alt[F0_alt > 1] = adjust_f0_semitones(shifted_f0_alt[F0_alt > 1], vc_pitch_shift) else: F0_ori = None F0_alt = None shifted_f0_alt = None # Length regulation cond, _, _, _, _ = inference_module.length_regulator(S_alt, ylens=target_lengths, n_quantizers=3, f0=shifted_f0_alt) prompt_condition, _, _, _, _ = inference_module.length_regulator(S_ori, ylens=target2_lengths, n_quantizers=3, f0=F0_ori) max_source_window = max_context_window - mel2.size(2) # split source condition (cond) into chunks processed_frames = 0 generated_wave_chunks = [] # generate chunk by chunk and stream the output while processed_frames < cond.size(1): chunk_cond = cond[:, processed_frames:processed_frames + max_source_window] is_last_chunk = processed_frames + max_source_window >= cond.size(1) cat_condition = torch.cat([prompt_condition, chunk_cond], dim=1) with torch.autocast(device_type='cuda', dtype=torch.float16): # Voice Conversion vc_target = inference_module.cfm.inference(cat_condition, torch.LongTensor([cat_condition.size(1)]).to(mel2.device), mel2, style2, None, vc_diffusion_steps, inference_cfg_rate=vc_inference_cfg_rate) vc_target = vc_target[:, :, mel2.size(-1):] vc_wave = bigvgan_fn(vc_target.float())[0] if processed_frames == 0: if is_last_chunk: output_wave = vc_wave[0].cpu().numpy() generated_wave_chunks.append(output_wave) output_wave = (output_wave * 32768.0).astype(np.int16) mp3_bytes = AudioSegment( output_wave.tobytes(), frame_rate=sr, sample_width=output_wave.dtype.itemsize, channels=1 ).export(format="mp3", bitrate=bitrate).read() yield mp3_bytes, (sr, np.concatenate(generated_wave_chunks)) break output_wave = vc_wave[0, :-overlap_wave_len].cpu().numpy() generated_wave_chunks.append(output_wave) previous_chunk = vc_wave[0, -overlap_wave_len:] processed_frames += vc_target.size(2) - overlap_frame_len output_wave = (output_wave * 32768.0).astype(np.int16) mp3_bytes = AudioSegment( output_wave.tobytes(), frame_rate=sr, sample_width=output_wave.dtype.itemsize, channels=1 ).export(format="mp3", bitrate=bitrate).read() yield mp3_bytes, None elif is_last_chunk: output_wave = crossfade(previous_chunk.cpu().numpy(), vc_wave[0].cpu().numpy(), overlap_wave_len) generated_wave_chunks.append(output_wave) processed_frames += vc_target.size(2) - overlap_frame_len output_wave = (output_wave * 32768.0).astype(np.int16) mp3_bytes = AudioSegment( output_wave.tobytes(), frame_rate=sr, sample_width=output_wave.dtype.itemsize, channels=1 ).export(format="mp3", bitrate=bitrate).read() yield mp3_bytes, (sr, np.concatenate(generated_wave_chunks)) break else: output_wave = crossfade(previous_chunk.cpu().numpy(), vc_wave[0, :-overlap_wave_len].cpu().numpy(), overlap_wave_len) generated_wave_chunks.append(output_wave) previous_chunk = vc_wave[0, -overlap_wave_len:] processed_frames += vc_target.size(2) - overlap_frame_len output_wave = (output_wave * 32768.0).astype(np.int16) mp3_bytes = AudioSegment( output_wave.tobytes(), frame_rate=sr, sample_width=output_wave.dtype.itemsize, channels=1 ).export(format="mp3", bitrate=bitrate).read() yield mp3_bytes, None # Main integrated function @spaces.GPU def process_integrated_tts_vc(text, style, speed, reference_audio, vc_diffusion_steps, vc_length_adjust, vc_inference_cfg_rate, vc_f0_condition, vc_auto_f0_adjust, vc_pitch_shift): """Integrated TTS + Voice Conversion pipeline""" log_print("=" * 50) log_print("STARTING PROCESSING...") log_print(f"Text: {text[:50]}...") log_print(f"Style: {style}, Speed: {speed}") log_print(f"VC params: steps={vc_diffusion_steps}, length={vc_length_adjust}") log_print("=" * 50) # Handle Gradio audio input format ref_path = None if isinstance(reference_audio, tuple): sr, data = reference_audio with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: sf.write(f.name, data, sr) ref_path = f.name elif isinstance(reference_audio, str): ref_path = reference_audio if not ref_path: log_error("ERROR: No reference audio provided") raise gr.Error("Please provide a reference audio.") try: # Clear GPU cache before processing if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() # Step 1: OpenVoice TTS + Voice Cloning log_print("Step 1: Running OpenVoice TTS...") intermediate_audio = run_openvoice_inference(text, style, speed, ref_path) log_print(f"✓ OpenVoice completed. Intermediate audio: {intermediate_audio}") # Clear GPU cache after OpenVoice if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() # Step 2: Seed-VC Voice Conversion log_print("Step 2: Running Seed-VC Voice Conversion...") # Call the actual voice conversion function and collect all results results = list(run_seed_vc_inference(intermediate_audio, ref_path, vc_diffusion_steps, vc_length_adjust, vc_inference_cfg_rate, vc_f0_condition, vc_auto_f0_adjust, vc_pitch_shift)) log_print(f"✓ Seed-VC completed. Results count: {len(results)}") # Clear GPU cache after Seed-VC if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() except Exception as e: log_error(f"CRITICAL ERROR in processing: {str(e)}") log_error(f"Error type: {type(e).__name__}") log_error("Full traceback:") log_error(traceback.format_exc()) # Clear GPU cache on error if torch.cuda.is_available(): torch.cuda.empty_cache() check_gpu_memory() # Re-raise the error to see it in Gradio raise # Find the final result (the one with the complete audio data) final_result = None for result in results: if isinstance(result, tuple) and len(result) == 2 and result[1] is not None: # This is the final result with complete audio data final_result = result[1] break if final_result is not None: # Save the final audio to a temporary file sr, audio_data = final_result with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: sf.write(f.name, audio_data, sr) return f.name return None # Get supported styles styles = list_supported_styles() or [ 'en-newest', 'en-default', 'en-us', 'en-br', 'en-au', 'en-india', 'es', 'fr', 'zh', 'jp', 'kr' ] # 앱 시작 시 모델 초기화 (원래 의도 유지) log_print("=" * 50) log_print("INITIALIZING MODELS...") log_print("=" * 50) try: # 모든 모델을 미리 로드하여 완전한 기능 보장 initialize_seed_vc_models() log_print("✓ All models initialized successfully!") except Exception as e: log_error(f"CRITICAL ERROR during model initialization: {e}") log_error(f"Error type: {type(e).__name__}") log_error("Full traceback:") log_error(traceback.format_exc()) log_error("App will not start due to model initialization failure") # 앱 시작 중단 sys.exit(1) # Create Gradio interface with gr.Blocks(title="Integrated TTS + Voice Conversion", analytics_enabled=False) as demo: gr.Markdown(""" # **Integrated TTS + Voice Conversion** — Convert text to speech and then apply voice conversion Enter text and upload a reference audio to first convert text to speech, then apply voice conversion to match the reference style. **How to use:** 1. Enter the text you want to convert 2. Upload a reference audio (3-10 seconds recommended) 3. Select the base voice style and speed 4. Adjust voice conversion parameters 5. Click the "Convert" button """) with gr.Row(): with gr.Column(scale=6): # TTS Parameters gr.Markdown("### 🎤 Text-to-Speech Settings") text_input = gr.Textbox( label="Text to Convert", value="Hello! This is an integrated TTS and voice conversion demo.", lines=3 ) style_input = gr.Dropdown( label="Base Voice Style", choices=styles, value=styles[0] ) speed_input = gr.Slider( 0.6, 1.4, value=1.0, step=0.05, label="Speech Speed (×)" ) reference_audio_input = gr.Audio( label="Reference Audio", sources=["upload", "microphone"], type="filepath" ) # Voice Conversion Parameters gr.Markdown("### 🔄 Voice Conversion Settings") with gr.Row(): vc_diffusion_steps = gr.Slider( minimum=1, maximum=200, value=25, step=1, label="Diffusion Steps", info="25 default, 50~100 for best quality" ) vc_length_adjust = gr.Slider( minimum=0.5, maximum=2.0, step=0.1, value=1.0, label="Length Adjustment", info="<1.0 faster, >1.0 slower" ) with gr.Row(): vc_inference_cfg_rate = gr.Slider( minimum=0.0, maximum=1.0, step=0.1, value=0.7, label="CFG Rate", info="Subtle influence" ) vc_pitch_shift = gr.Slider( minimum=-24, maximum=24, step=1, value=0, label="Pitch Shift", info="In semitones" ) with gr.Row(): vc_f0_condition = gr.Checkbox( label="Use F0 Conditioned Model", value=False, info="Required for singing voice conversion" ) vc_auto_f0_adjust = gr.Checkbox( label="Auto F0 Adjustment", value=True, info="Adjust F0 to match target voice" ) convert_btn = gr.Button("Convert", variant="primary", size="lg") with gr.Column(scale=6): output_audio = gr.Audio( label="Final Converted Audio", autoplay=True, format="wav" ) gr.Markdown(""" ### 📋 Processing Steps: 1. **Text → Speech**: Input text is converted to speech with the reference voice tone 2. **Voice Conversion**: Generated speech is converted to match the reference voice style ### 💡 Tips: - Use clean reference audio of 3-10 seconds length - Check "Use F0 Conditioned Model" for singing voice conversion - Set diffusion steps to 50-100 for higher quality """) # Connect the button click to the processing function convert_btn.click( fn=process_integrated_tts_vc, inputs=[ text_input, style_input, speed_input, reference_audio_input, vc_diffusion_steps, vc_length_adjust, vc_inference_cfg_rate, vc_f0_condition, vc_auto_f0_adjust, vc_pitch_shift ], outputs=[output_audio], concurrency_limit=1 ) demo.queue() demo.launch()