import os import gc import torch import tempfile import traceback import numpy as np import librosa import gradio as gr from pydub import AudioSegment from pydub.effects import normalize from concurrent.futures import ThreadPoolExecutor from functools import partial from huggingface_hub import snapshot_download from tts.infer_cli import MegaTTS3DiTInfer, convert_to_wav, cut_wav # Cấu hình tối ưu CPU os.environ["OMP_NUM_THREADS"] = str(os.cpu_count() or 4) os.environ["MKL_NUM_THREADS"] = str(os.cpu_count() or 4) os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" torch.set_num_threads(os.cpu_count() or 4) # Bộ nhớ đệm AUDIO_CACHE = {} MODEL_CACHE = None class TTSEngine: def __init__(self): self.model = None self.weights_dir = "checkpoints" self.initialize_model() def download_weights(self): """Tải trọng số model nếu chưa có""" repo_id = "mrfakename/MegaTTS3-VoiceCloning" if not os.path.exists(self.weights_dir): print("Đang tải trọng số model từ HuggingFace...") snapshot_download( repo_id=repo_id, local_dir=self.weights_dir, local_dir_use_symlinks=False, resume_download=True ) print("Đã tải xong trọng số model!") else: print("Trọng số model đã tồn tại.") def initialize_model(self): """Khởi tạo model TTS""" self.download_weights() print("Đang khởi tạo model MegaTTS3...") self.model = MegaTTS3DiTInfer(device="cpu") print(f"Model đã được tải thành công trên CPU với {os.cpu_count()} luồng!") def reset_model(self): """Khởi tạo lại model""" try: print("Đang khởi tạo lại model...") self.model = MegaTTS3DiTInfer(device="cpu") print("Đã khởi tạo lại model thành công!") return True except Exception as e: print(f"Không thể khởi tạo lại model: {e}") return False def preprocess_audio(self, audio_path, target_sr=22050, max_duration=30): """Tiền xử lý audio đầu vào""" cache_key = f"preprocessed_{hash(audio_path)}" if cache_key in AUDIO_CACHE: return AUDIO_CACHE[cache_key] try: audio = AudioSegment.from_file(audio_path) audio = audio.set_channels(1).set_frame_rate(target_sr) if len(audio) > max_duration * 1000: audio = audio[:max_duration * 1000] audio = normalize(audio) temp_path = f"temp_{os.path.basename(audio_path)}" audio.export( temp_path, format="wav", parameters=["-acodec", "pcm_s16le", "-ac", "1", "-ar", str(target_sr)] ) # Xác thực chất lượng audio wav, sr = librosa.load(temp_path, sr=target_sr, mono=True) if np.any(np.isnan(wav)) or np.any(np.isinf(wav)): raise ValueError("Audio chứa giá trị không hợp lệ") if np.max(np.abs(wav)) < 1e-6: raise ValueError("Tín hiệu audio quá yếu") import soundfile as sf sf.write(temp_path, wav, sr) AUDIO_CACHE[cache_key] = temp_path return temp_path except Exception as e: print(f"Lỗi tiền xử lý audio: {e}") raise ValueError(f"Lỗi khi xử lý audio: {str(e)}") def process_sentence(self, audio_context, sentence, params): """Xử lý một câu đơn lẻ""" try: with torch.no_grad(): wav_bytes = self.model.forward( audio_context, sentence, time_step=params['infer_timestep'], p_w=params['p_w'], t_w=params['t_w'] ) if params['speed_factor'] != 1.0: wav_bytes = self.adjust_speed(wav_bytes, params['speed_factor']) return wav_bytes except Exception as e: print(f"Lỗi khi xử lý câu: {sentence[:50]}... - {str(e)}") return None def adjust_speed(self, wav_bytes, speed_factor): """Điều chỉnh tốc độ âm thanh""" try: with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_input: temp_input.write(wav_bytes) temp_input_path = temp_input.name audio = AudioSegment.from_file(temp_input_path) if speed_factor != 1.0: new_frame_rate = int(audio.frame_rate * speed_factor) audio = audio._spawn(audio.raw_data, overrides={ "frame_rate": new_frame_rate }).set_frame_rate(audio.frame_rate) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_output: audio.export(temp_output.name, format="wav") with open(temp_output.name, "rb") as f: result = f.read() os.unlink(temp_input_path) os.unlink(temp_output.name) return result except Exception as e: print(f"Lỗi điều chỉnh tốc độ: {e}") return wav_bytes def generate_speech(self, inp_audio, inp_text, params): """Tạo giọng nói từ văn bản""" if not inp_audio or not inp_text: gr.Warning("Vui lòng cung cấp cả audio tham chiếu và văn bản cần chuyển đổi.") return None try: print(f"Đang tạo giọng nói cho văn bản dài {len(inp_text)} ký tự...") # Xử lý audio đầu vào với bộ nhớ đệm cache_key = f"audio_{hash(inp_audio)}" if cache_key not in AUDIO_CACHE: processed_audio_path = self.preprocess_audio(inp_audio) cut_wav(processed_audio_path, max_len=28) with open(processed_audio_path, 'rb') as file: file_content = file.read() audio_context = self.model.preprocess(file_content) AUDIO_CACHE[cache_key] = audio_context else: audio_context = AUDIO_CACHE[cache_key] print("Đã sử dụng audio từ bộ nhớ đệm") # Chia văn bản thành các câu sentences = [s.strip() for s in inp_text.split('.') if s.strip()] if not sentences: gr.Warning("Không tìm thấy câu nào trong văn bản") return None # Xử lý song song các câu with ThreadPoolExecutor(max_workers=min(4, len(sentences))) as executor: process_fn = partial(self.process_sentence, audio_context, params=params) results = list(executor.map(process_fn, sentences)) # Ghép các đoạn âm thanh lại combined_audio = None for result in results: if result is None: continue with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: temp_file.write(result) temp_path = temp_file.name segment = AudioSegment.from_file(temp_path) os.unlink(temp_path) if combined_audio is None: combined_audio = segment else: combined_audio += AudioSegment.silent(duration=200) # Thêm khoảng nghỉ 200ms giữa các câu combined_audio += segment if combined_audio is None: gr.Warning("Không thể tạo bất kỳ đoạn âm thanh nào") return None # Xuất file kết quả with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as output_file: combined_audio.export(output_file.name, format="wav") with open(output_file.name, "rb") as f: final_result = f.read() os.unlink(output_file.name) self.cleanup_memory() return final_result except Exception as e: traceback.print_exc() gr.Warning(f"Lỗi khi tạo giọng nói: {str(e)}") self.cleanup_memory() return None def cleanup_memory(self): """Dọn dẹp bộ nhớ""" gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() AUDIO_CACHE.clear() # Khởi tạo engine TTS tts_engine = TTSEngine() # Giao diện Gradio def create_gradio_interface(): with gr.Blocks(title="MegaTTS3 - Chuyển văn bản thành giọng nói") as demo: with gr.Row(): with gr.Column(): reference_audio = gr.Audio( label="Audio tham chiếu", type="filepath", sources=["upload", "microphone"] ) text_input = gr.Textbox( label="Văn bản cần chuyển đổi", placeholder="Nhập văn bản bạn muốn chuyển thành giọng nói...", lines=5 ) with gr.Accordion("Tùy chọn nâng cao", open=False): infer_timestep = gr.Slider( label="Số bước suy luận", value=32, minimum=1, maximum=100, step=1 ) p_w = gr.Slider( label="Trọng số rõ ràng", value=1.4, minimum=0.1, maximum=5.0, step=0.1 ) t_w = gr.Slider( label="Trọng số tương đồng", value=3.0, minimum=0.1, maximum=10.0, step=0.1 ) speed_factor = gr.Slider( label="Tốc độ phát", value=1.0, minimum=0.5, maximum=2.0, step=0.1, info="1.0 = bình thường, <1.0 = chậm hơn, >1.0 = nhanh hơn" ) generate_btn = gr.Button("Tạo giọng nói", variant="primary") with gr.Column(): output_audio = gr.Audio(label="Kết quả âm thanh") status = gr.Textbox(label="Trạng thái") generate_btn.click( fn=generate_speech_wrapper, inputs=[reference_audio, text_input, infer_timestep, p_w, t_w, speed_factor], outputs=[output_audio, status] ) return demo def generate_speech_wrapper(audio, text, timestep, p_w, t_w, speed): params = { 'infer_timestep': timestep, 'p_w': p_w, 't_w': t_w, 'speed_factor': speed } result = tts_engine.generate_speech(audio, text, params) status = "Hoàn thành!" if result else "Đã xảy ra lỗi!" return result, status if __name__ == '__main__': demo = create_gradio_interface() demo.launch( server_name='0.0.0.0', server_port=7860, share=False, show_error=True )