|
|
import os |
|
|
import sys |
|
|
import importlib |
|
|
import json |
|
|
import asyncio |
|
|
import tempfile |
|
|
from datetime import datetime |
|
|
|
|
|
import torch |
|
|
import gradio as gr |
|
|
import pydub |
|
|
import edge_tts |
|
|
import pysrt |
|
|
from pydub import AudioSegment |
|
|
|
|
|
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
src_path = os.path.join(script_dir, "src") |
|
|
if src_path not in sys.path: |
|
|
sys.path.insert(0, src_path) |
|
|
|
|
|
import chatterbox.vc |
|
|
importlib.reload(chatterbox.vc) |
|
|
from chatterbox.vc import ChatterboxVC |
|
|
|
|
|
|
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
_vc_model = None |
|
|
def get_vc_model(): |
|
|
global _vc_model |
|
|
if _vc_model is None: |
|
|
print(f"[VC] Đang tải model trên {DEVICE}…") |
|
|
_vc_model = ChatterboxVC.from_pretrained(DEVICE) |
|
|
print("[VC] Model sẵn sàng.") |
|
|
return _vc_model |
|
|
|
|
|
|
|
|
global_log_messages_vc = [] |
|
|
def yield_vc_updates(log_msg=None, audio_data=None, file_list=None, log_append=True): |
|
|
global global_log_messages_vc |
|
|
|
|
|
if log_msg is not None: |
|
|
prefix = datetime.now().strftime("[%H:%M:%S]") |
|
|
if log_append: |
|
|
global_log_messages_vc.append(f"{prefix} {log_msg}") |
|
|
else: |
|
|
global_log_messages_vc = [f"{prefix} {log_msg}"] |
|
|
log_update = gr.update(value="\n".join(global_log_messages_vc)) |
|
|
|
|
|
|
|
|
audio_update = gr.update( |
|
|
visible=(audio_data is not None), |
|
|
value=audio_data if audio_data is not None else None |
|
|
) |
|
|
|
|
|
files_update = gr.update( |
|
|
visible=(file_list is not None), |
|
|
value=file_list if file_list is not None else [] |
|
|
) |
|
|
|
|
|
yield log_update, audio_update, files_update |
|
|
|
|
|
|
|
|
def load_edge_tts_voices(json_path="voices.json"): |
|
|
with open(json_path, "r", encoding="utf-8") as f: |
|
|
voices = json.load(f) |
|
|
display_list, code_map = [], {} |
|
|
for lang, genders in voices.items(): |
|
|
for gender, items in genders.items(): |
|
|
for v in items: |
|
|
disp = f"{lang} - {gender} - {v['display_name']} ({v['voice_code']})" |
|
|
display_list.append(disp) |
|
|
code_map[disp] = v["voice_code"] |
|
|
return display_list, code_map |
|
|
|
|
|
edge_choices, edge_code_map = load_edge_tts_voices() |
|
|
|
|
|
|
|
|
async def _edge_tts_async(text, disp, rate_pct, vol_pct): |
|
|
code = edge_code_map.get(disp) |
|
|
rate_str = f"{rate_pct:+d}%" |
|
|
vol_str = f"{vol_pct:+d}%" |
|
|
out = "temp_edge_tts.wav" |
|
|
await edge_tts.Communicate(text, voice=code, rate=rate_str, volume=vol_str).save(out) |
|
|
return out |
|
|
|
|
|
def run_edge_tts(text, disp, rate_pct, vol_pct): |
|
|
path = asyncio.run(_edge_tts_async(text, disp, rate_pct, vol_pct)) |
|
|
return path, path |
|
|
|
|
|
|
|
|
async def _tts_save_segment(text: str, voice_code: str, rate_pct: int, vol_pct: int, path: str) -> bool: |
|
|
""" |
|
|
Save một đoạn text thành file audio bằng Edge TTS. |
|
|
Trả về True nếu có audio, False nếu bị NoAudioReceived. |
|
|
""" |
|
|
rate_str = f"{rate_pct:+d}%" |
|
|
vol_str = f"{vol_pct:+d}%" |
|
|
try: |
|
|
await edge_tts.Communicate(text, voice=voice_code, rate=rate_str, volume=vol_str).save(path) |
|
|
return True |
|
|
except edge_tts.exceptions.NoAudioReceived: |
|
|
|
|
|
return False |
|
|
|
|
|
async def _generate_audio_from_srt( |
|
|
srt_path: str, |
|
|
tmp_dir: str, |
|
|
out_path: str, |
|
|
voice_code: str, |
|
|
rate_pct: int, |
|
|
vol_pct: int |
|
|
): |
|
|
""" |
|
|
Đọc file .srt, chia nhỏ text nếu >200 ký tự, gọi Edge TTS từng phần, |
|
|
ghép các segment và export thành file WAV. |
|
|
""" |
|
|
subs = pysrt.open(srt_path, encoding='utf-8') |
|
|
segments = [] |
|
|
|
|
|
for i, sub in enumerate(subs): |
|
|
text = sub.text.replace('\n', ' ') |
|
|
|
|
|
if len(text) > 200: |
|
|
parts = [text[k:k+200] for k in range(0, len(text), 200)] |
|
|
else: |
|
|
parts = [text] |
|
|
|
|
|
seg = AudioSegment.silent(duration=0) |
|
|
for j, part in enumerate(parts): |
|
|
seg_path = os.path.join(tmp_dir, f"seg_{i}_{j}.wav") |
|
|
ok = await _tts_save_segment(part, voice_code, rate_pct, vol_pct, seg_path) |
|
|
if ok: |
|
|
seg += AudioSegment.from_file(seg_path) |
|
|
segments.append(seg) |
|
|
|
|
|
|
|
|
if segments: |
|
|
combined = segments[0] |
|
|
for seg in segments[1:]: |
|
|
combined += seg |
|
|
combined.export(out_path, format="wav") |
|
|
|
|
|
def synthesize_srt_audio( |
|
|
srt_path: str, |
|
|
disp_voice: str, |
|
|
work_dir: str, |
|
|
rate_pct: int, |
|
|
vol_pct: int |
|
|
) -> str: |
|
|
""" |
|
|
Wrapper đồng bộ để sinh file WAV từ SRT bằng Edge TTS, |
|
|
trả về đường dẫn file WAV để đưa vào pipeline clone voice. |
|
|
""" |
|
|
|
|
|
voice_code = edge_code_map.get(disp_voice) |
|
|
|
|
|
|
|
|
tmp_dir = tempfile.mkdtemp() |
|
|
out_path = os.path.join(work_dir, "srt_source.wav") |
|
|
|
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
loop.run_until_complete( |
|
|
_generate_audio_from_srt( |
|
|
srt_path, tmp_dir, out_path, |
|
|
voice_code, rate_pct, vol_pct |
|
|
) |
|
|
) |
|
|
return out_path |
|
|
|
|
|
|
|
|
def generate_vc( |
|
|
source_audio_path, |
|
|
target_voice_path, |
|
|
cfg_rate: float, |
|
|
sigma_min: float, |
|
|
batch_mode: bool, |
|
|
batch_parameter: str, |
|
|
batch_values: str |
|
|
): |
|
|
model = get_vc_model() |
|
|
yield from yield_vc_updates("Khởi tạo chuyển giọng…", log_append=False) |
|
|
|
|
|
|
|
|
date_folder = datetime.now().strftime("%Y%m%d") |
|
|
work_dir = os.path.join("outputs/vc", date_folder) |
|
|
os.makedirs(work_dir, exist_ok=True) |
|
|
|
|
|
def run_once(src, tgt, rate, sigma): |
|
|
return model.generate(src, target_voice_path=tgt, inference_cfg_rate=rate, sigma_min=sigma) |
|
|
|
|
|
outputs = [] |
|
|
try: |
|
|
if batch_mode: |
|
|
try: |
|
|
vals = [float(v.strip()) for v in batch_values.split(",") if v.strip()] |
|
|
except: |
|
|
raise gr.Error("Batch values phải là số, phân cách bởi dấu phẩy.") |
|
|
yield from yield_vc_updates(f"Chạy batch '{batch_parameter}': {vals}") |
|
|
for idx, v in enumerate(vals, 1): |
|
|
r, s = cfg_rate, sigma_min |
|
|
tag = "" |
|
|
if batch_parameter == "Inference CFG Rate": |
|
|
r, tag = v, f"cfg_{v}" |
|
|
else: |
|
|
s, tag = v, f"sigma_{v}" |
|
|
yield from yield_vc_updates(f" • Mục {idx}/{len(vals)}: {batch_parameter}={v}") |
|
|
wav = run_once(source_audio_path, target_voice_path, r, s) |
|
|
fn = f"{tag}_{idx}.wav" |
|
|
path = os.path.join(work_dir, fn) |
|
|
model.save_wav(wav, path) |
|
|
outputs.append(path) |
|
|
yield from yield_vc_updates(f"Đã lưu: {path}") |
|
|
else: |
|
|
audio = pydub.AudioSegment.from_file(source_audio_path) |
|
|
if len(audio) > 40_000: |
|
|
yield from yield_vc_updates("Audio dài >40s: tách thành đoạn 40s…") |
|
|
chunks = [audio[i:i+40_000] for i in range(0, len(audio), 40_000)] |
|
|
temp_paths = [] |
|
|
for i, chunk in enumerate(chunks): |
|
|
tmp = f"{source_audio_path}_chunk{i}.wav" |
|
|
chunk.export(tmp, format="wav") |
|
|
wav = run_once(tmp, target_voice_path, cfg_rate, sigma_min) |
|
|
outp = os.path.join(work_dir, f"part{i}.wav") |
|
|
model.save_wav(wav, outp) |
|
|
temp_paths.append(outp) |
|
|
os.remove(tmp) |
|
|
yield from yield_vc_updates(f"Xử lý đoạn {i+1}/{len(chunks)}") |
|
|
|
|
|
combined = AudioSegment.empty() |
|
|
for p in temp_paths: |
|
|
combined += AudioSegment.from_file(p) |
|
|
final = os.path.join(work_dir, "combined.wav") |
|
|
combined.export(final, format="wav") |
|
|
outputs.append(final) |
|
|
yield from yield_vc_updates("Chuyển xong.") |
|
|
else: |
|
|
yield from yield_vc_updates("Đang chuyển giọng…") |
|
|
wav = run_once(source_audio_path, target_voice_path, cfg_rate, sigma_min) |
|
|
outp = os.path.join(work_dir, f"LyTranTTS_{datetime.now().strftime('%H%M%S')}.wav") |
|
|
model.save_wav(wav, outp) |
|
|
outputs.append(outp) |
|
|
yield from yield_vc_updates("Hoàn thành.") |
|
|
except Exception as e: |
|
|
yield from yield_vc_updates(f"Lỗi: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
first = outputs[0] if outputs else None |
|
|
yield from yield_vc_updates(log_msg=None, audio_data=first, file_list=outputs) |
|
|
|
|
|
|
|
|
def run_vc_from_srt_or_file( |
|
|
use_srt: bool, |
|
|
srt_file, srt_voice, srt_rate, srt_vol, |
|
|
edge_text, edge_voice, edge_rate, edge_vol, |
|
|
src_audio, tgt_audio, |
|
|
cfg_rate, sigma_min, |
|
|
batch_mode, batch_parameter, batch_values |
|
|
): |
|
|
yield from yield_vc_updates("Bắt đầu…", log_append=False) |
|
|
|
|
|
date_folder = datetime.now().strftime("%Y%m%d") |
|
|
work_dir = os.path.join("outputs/vc", date_folder) |
|
|
os.makedirs(work_dir, exist_ok=True) |
|
|
|
|
|
if use_srt: |
|
|
yield from yield_vc_updates("Sinh audio từ SRT…") |
|
|
source = synthesize_srt_audio( |
|
|
srt_file.name, srt_voice, work_dir, |
|
|
rate_pct=srt_rate, vol_pct=srt_vol |
|
|
) |
|
|
elif edge_text and edge_voice: |
|
|
yield from yield_vc_updates("Sinh audio từ Edge TTS…") |
|
|
tmp, _ = run_edge_tts(edge_text, edge_voice, edge_rate, edge_vol) |
|
|
source = tmp |
|
|
else: |
|
|
source = src_audio |
|
|
|
|
|
yield from generate_vc( |
|
|
source, tgt_audio, |
|
|
cfg_rate, sigma_min, |
|
|
batch_mode, batch_parameter, batch_values |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Chuyển Giọng Nói AI") as demo: |
|
|
gr.Markdown("## 📣 Chuyển Giọng Nói AI") |
|
|
gr.Markdown("> Tác giả: **Lý Trần**") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
|
|
|
use_srt = gr.Checkbox(label="Sử dụng file SRT làm nguồn?", value=False) |
|
|
srt_file = gr.File(file_types=[".srt"], label="Tải lên file .srt", visible=False) |
|
|
srt_voice = gr.Dropdown(choices=edge_choices, label="Giọng Edge TTS (SRT)", visible=False) |
|
|
srt_rate = gr.Slider(-100, 100, value=0, step=1, label="Tốc độ SRT (% chuẩn)", visible=False) |
|
|
srt_vol = gr.Slider(-100, 100, value=0, step=1, label="Âm lượng SRT (% chuẩn)", visible=False) |
|
|
|
|
|
|
|
|
use_edge = gr.Checkbox(label="Tạo nguồn qua Edge TTS?", value=False) |
|
|
edge_text = gr.Textbox(label="Văn bản cho Edge TTS", visible=False) |
|
|
edge_voice = gr.Dropdown(choices=edge_choices, label="Giọng Edge TTS", visible=False) |
|
|
edge_rate = gr.Slider(-100, 100, value=0, step=1, label="Tốc độ Edge (% chuẩn)", visible=False) |
|
|
edge_vol = gr.Slider(-100, 100, value=0, step=1, label="Âm lượng Edge (% chuẩn)", visible=False) |
|
|
gen_edge_btn = gr.Button("🗣️ Tạo Edge TTS", visible=False) |
|
|
edge_audio = gr.Audio(label="Nguồn Edge TTS", type="filepath", visible=False) |
|
|
|
|
|
|
|
|
src_audio = gr.Audio(sources=["upload","microphone"], type="filepath", |
|
|
label="Tải lên / Ghi âm nguồn") |
|
|
|
|
|
|
|
|
gr.Markdown("### Giọng tham chiếu (mục tiêu)") |
|
|
tgt_audio = gr.Audio(sources=["upload","microphone"], type="filepath", |
|
|
label="Tải lên / Ghi âm giọng mục tiêu") |
|
|
|
|
|
|
|
|
gr.Markdown("### Tham số chuyển giọng") |
|
|
cfg_slider = gr.Slider(0.0, 30.0, value=0.5, step=0.1, label="CFG Rate") |
|
|
sigma_input = gr.Number(1e-6, label="Sigma Min", |
|
|
minimum=1e-7, maximum=1e-5, step=1e-7) |
|
|
|
|
|
|
|
|
with gr.Accordion("Tùy chọn Batch Sweep", open=False): |
|
|
batch_chk = gr.Checkbox(label="Kích hoạt Batch Sweep", value=False) |
|
|
batch_param = gr.Dropdown(choices=["Inference CFG Rate","Sigma Min"], |
|
|
label="Tham số thay đổi") |
|
|
batch_vals = gr.Textbox(placeholder="ví dụ: 0.5,1.0,2.0", |
|
|
label="Giá trị phân cách dấu phẩy") |
|
|
|
|
|
run_btn = gr.Button("🚀 Chuyển giọng") |
|
|
|
|
|
with gr.Column(): |
|
|
gr.Markdown("### Nhật ký") |
|
|
log_box = gr.Textbox(interactive=False, lines=12) |
|
|
gr.Markdown("### Kết quả") |
|
|
out_audio = gr.Audio(label="Âm thanh kết quả", type="filepath", visible=False) |
|
|
out_files = gr.Files(label="Tải xuống file đầu ra", visible=False) |
|
|
|
|
|
|
|
|
def toggle_srt(v): |
|
|
return ( |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=not v), |
|
|
gr.update(visible=not v), |
|
|
gr.update(visible=not v), |
|
|
gr.update(visible=not v), |
|
|
gr.update(visible=not v), |
|
|
gr.update(visible=not v), |
|
|
gr.update(visible=not v), |
|
|
gr.update(visible=not v) |
|
|
) |
|
|
use_srt.change( |
|
|
fn=toggle_srt, |
|
|
inputs=[use_srt], |
|
|
outputs=[ |
|
|
srt_file, srt_voice, srt_rate, srt_vol, |
|
|
use_edge, edge_text, edge_voice, edge_rate, edge_vol, |
|
|
gen_edge_btn, edge_audio, src_audio |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
def toggle_edge(v): |
|
|
return ( |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=v), |
|
|
gr.update(visible=not v) |
|
|
) |
|
|
use_edge.change( |
|
|
fn=toggle_edge, |
|
|
inputs=[use_edge], |
|
|
outputs=[edge_text, edge_voice, edge_rate, edge_vol, gen_edge_btn, edge_audio, src_audio] |
|
|
) |
|
|
|
|
|
|
|
|
gen_edge_btn.click( |
|
|
fn=run_edge_tts, |
|
|
inputs=[edge_text, edge_voice, edge_rate, edge_vol], |
|
|
outputs=[edge_audio, src_audio] |
|
|
) |
|
|
|
|
|
|
|
|
run_btn.click( |
|
|
fn=run_vc_from_srt_or_file, |
|
|
inputs=[ |
|
|
use_srt, srt_file, srt_voice, srt_rate, srt_vol, |
|
|
edge_text, edge_voice, edge_rate, edge_vol, |
|
|
src_audio, tgt_audio, |
|
|
cfg_slider, sigma_input, |
|
|
batch_chk, batch_param, batch_vals |
|
|
], |
|
|
outputs=[log_box, out_audio, out_files], |
|
|
show_progress="minimal" |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(share=True) |
|
|
|