text_to_speech_Vietnamese / proccess_wav.py
3v324v23's picture
up
af11ce4
from typing import List, Tuple
import numpy as np
from pydub import AudioSegment
import os
from chunkformer import ChunkFormerModel
from clearvoice import ClearVoice
# ======================= ASR + CLEARVOICE + AUDIO PROCESSING =======================
ASR_MODEL = None
CLEARVOICE_MODEL = None
REF_AUDIO_CACHE = {} # cache: đường dẫn input -> đường dẫn output đã xử lý
def get_asr_model() -> ChunkFormerModel:
"""Lazy-load ChunkFormer (ASR, chạy trên CPU)."""
global ASR_MODEL
if ASR_MODEL is None:
ASR_MODEL = ChunkFormerModel.from_pretrained("khanhld/chunkformer-ctc-large-vie")
return ASR_MODEL
def get_clearvoice_model() -> ClearVoice:
"""Lazy-load ClearVoice để khử nhiễu ref audio."""
global CLEARVOICE_MODEL
if CLEARVOICE_MODEL is None:
CLEARVOICE_MODEL = ClearVoice(
task="speech_enhancement",
model_names=["MossFormer2_SE_48K"],
)
return CLEARVOICE_MODEL
def find_silent_regions(
audio: AudioSegment,
silence_thresh: float = 0.05, # biên độ sau chuẩn hoá [-1, 1]
chunk_ms: int = 10,
min_silence_len: int = 200,
) -> List[Tuple[int, int]]:
"""
Tìm các khoảng lặng (start_ms, end_ms) trong AudioSegment dựa trên biên độ.
"""
samples = np.array(audio.get_array_of_samples(), dtype=np.float32)
if audio.channels > 1:
samples = samples.reshape((-1, audio.channels)).mean(axis=1)
norm = samples / (2 ** (audio.sample_width * 8 - 1))
sr = audio.frame_rate
chunk_size = max(1, int(sr * chunk_ms / 1000))
total_chunks = len(norm) // chunk_size
silent_regions: List[Tuple[int, int]] = []
start = None
for i in range(total_chunks):
chunk = norm[i * chunk_size: (i + 1) * chunk_size]
if chunk.size == 0:
continue
if np.all((chunk > -silence_thresh) & (chunk < silence_thresh)):
if start is None:
start = i
else:
if start is not None:
dur = (i - start) * chunk_ms
if dur >= min_silence_len:
silent_regions.append((start * chunk_ms, i * chunk_ms))
start = None
if start is not None:
dur = (total_chunks - start) * chunk_ms
if dur >= min_silence_len:
silent_regions.append((start * chunk_ms, total_chunks * chunk_ms))
return silent_regions
def trim_leading_trailing_silence(
audio: AudioSegment,
silence_thresh: float = 0.05,
chunk_ms: int = 10,
min_silence_len: int = 200,
) -> AudioSegment:
"""
Bỏ khoảng lặng đầu/cuối file.
"""
duration = len(audio)
silent_regions = find_silent_regions(
audio,
silence_thresh=silence_thresh,
chunk_ms=chunk_ms,
min_silence_len=min_silence_len,
)
if not silent_regions:
return audio
start_trim = 0
end_trim = duration
# khoảng lặng đầu file
first_start, first_end = silent_regions[0]
if first_start <= 0:
start_trim = max(start_trim, first_end)
# khoảng lặng cuối file
last_start, last_end = silent_regions[-1]
if last_end >= duration:
end_trim = min(end_trim, last_start)
return audio[start_trim:end_trim]
def compress_internal_silence(
audio: AudioSegment,
max_silence_ms: int = 300,
silence_thresh: float = 0.05,
chunk_ms: int = 10,
min_silence_len: int = 50,
) -> AudioSegment:
"""
Rút ngắn khoảng lặng giữa file:
- Khoảng lặng <= max_silence_ms: giữ nguyên
- Khoảng lặng > max_silence_ms: cắt còn max_silence_ms
"""
duration = len(audio)
silent_regions = find_silent_regions(
audio,
silence_thresh=silence_thresh,
chunk_ms=chunk_ms,
min_silence_len=min_silence_len,
)
if not silent_regions:
return audio
new_audio = AudioSegment.silent(duration=0, frame_rate=audio.frame_rate)
cursor = 0
for s_start, s_end in silent_regions:
# phần có tiếng nói trước khoảng lặng
if s_start > cursor:
new_audio += audio[cursor:s_start]
silence_len = s_end - s_start
if silence_len <= max_silence_ms:
new_audio += audio[s_start:s_end]
else:
new_audio += audio[s_start: s_start + max_silence_ms]
cursor = s_end
# phần còn lại sau khoảng lặng cuối
if cursor < duration:
new_audio += audio[cursor:]
return new_audio
def select_subsegment_by_silence(
audio: AudioSegment,
min_len_ms: int = 5000,
max_len_ms: int = 10000,
silence_thresh: float = 0.05,
chunk_ms: int = 10,
min_silence_len: int = 200,
) -> AudioSegment:
"""
Nếu audio > max_len_ms, chọn 1 đoạn dài trong khoảng [min_len_ms, max_len_ms],
cắt tại điểm nằm trong khoảng lặng để tránh cắt dính giọng nói.
"""
duration = len(audio)
if duration <= max_len_ms:
return audio
silent_regions = find_silent_regions(
audio,
silence_thresh=silence_thresh,
chunk_ms=chunk_ms,
min_silence_len=min_silence_len,
)
if not silent_regions:
# không tìm được khoảng lặng -> lấy đoạn giữa
target_len = min(max_len_ms, duration)
start = max(0, (duration - target_len) // 2)
end = start + target_len
return audio[start:end]
# boundary là midpoint của khoảng lặng (chắc chắn nằm trong vùng im lặng)
boundaries = [0]
for s_start, s_end in silent_regions:
mid = (s_start + s_end) // 2
if 0 < mid < duration:
boundaries.append(mid)
boundaries.append(duration)
boundaries = sorted(set(boundaries))
# ưu tiên đoạn đầu tiên thỏa 5–10s
for i in range(len(boundaries)):
for j in range(i + 1, len(boundaries)):
seg_len = boundaries[j] - boundaries[i]
if min_len_ms <= seg_len <= max_len_ms:
return audio[boundaries[i]:boundaries[j]]
# nếu không có đoạn nào nằm trọn trong [min, max], chọn đoạn gần max_len nhất
best_i, best_j, best_diff = 0, None, None
for i in range(len(boundaries)):
for j in range(i + 1, len(boundaries)):
seg_len = boundaries[j] - boundaries[i]
if seg_len >= min_len_ms:
diff = abs(seg_len - max_len_ms)
if best_diff is None or diff < best_diff:
best_diff = diff
best_i, best_j = i, j
if best_j is not None:
return audio[boundaries[best_i]:boundaries[best_j]]
# fallback cuối cùng
target_len = min(max_len_ms, duration)
start = max(0, (duration - target_len) // 2)
end = start + target_len
return audio[start:end]
def enhance_ref_audio(input_path: str) -> str:
"""
Pipeline xử lý WAV cho TTS:
- ClearVoice khử nhiễu
- Bỏ khoảng lặng đầu/cuối
- Rút ngắn khoảng lặng giữa > 0.3s thành 0.3s
- Nếu audio > 10s: chọn 1 đoạn 5–10s, cắt tại khoảng lặng
Trả về đường dẫn file wav đã xử lý.
"""
if not input_path:
raise ValueError("No input audio path for enhancement.")
# cache để cùng 1 file không phải xử lý nhiều lần
if input_path in REF_AUDIO_CACHE:
return REF_AUDIO_CACHE[input_path]
cv = get_clearvoice_model()
# 1) khử nhiễu
try:
cv_out = cv(input_path=input_path, online_write=False)
base = os.path.basename(input_path)
name, ext = os.path.splitext(base)
if not ext:
ext = ".wav"
denoised_path = os.path.join(os.path.dirname(input_path), f"{name}_denoised{ext}")
cv.write(cv_out, output_path=denoised_path)
except Exception as e:
print(f"[ClearVoice] Error during denoising, fallback to original: {e}")
denoised_path = input_path
# 2) pydub xử lý khoảng lặng + length
audio = AudioSegment.from_file(denoised_path)
# bỏ khoảng lặng đầu/cuối
audio = trim_leading_trailing_silence(audio)
# rút ngắn khoảng lặng giữa
audio = compress_internal_silence(audio, max_silence_ms=300)
# nếu >10s thì chọn đoạn trong khoảng 5–10s
audio = select_subsegment_by_silence(audio, min_len_ms=5000, max_len_ms=10000)
# 3) ghi ra file mới
enhanced_path = os.path.join(os.path.dirname(denoised_path), f"{name}_enhanced.wav")
audio.export(enhanced_path, format="wav")
REF_AUDIO_CACHE[input_path] = enhanced_path
return enhanced_path
def split_audio_by_silence(
audio: AudioSegment,
silence_thresh: float = 0.05,
chunk_ms: int = 10,
min_silence_len: int = 200,
min_segment_len: int = 200,
) -> List[Tuple[int, int]]:
"""
Từ AudioSegment, trả về các đoạn có tiếng nói (non-silent)
được tách bằng khoảng lặng.
"""
duration = len(audio)
silent_regions = find_silent_regions(
audio,
silence_thresh=silence_thresh,
chunk_ms=chunk_ms,
min_silence_len=min_silence_len,
)
segments: List[Tuple[int, int]] = []
cur_start = 0
for s_start, s_end in silent_regions:
if cur_start < s_start:
if s_start - cur_start >= min_segment_len:
segments.append((cur_start, s_start))
cur_start = s_end
if cur_start < duration and duration - cur_start >= min_segment_len:
segments.append((cur_start, duration))
# nếu không tìm được đoạn nào, lấy cả file
if not segments:
segments.append((0, duration))
return segments
def transcribe_ref_audio(audio_path: str) -> str:
"""
ASR theo yêu cầu:
- Cắt âm thanh theo khoảng lặng
- ASR từng đoạn
- Nối text bằng dấu phẩy
"""
if not audio_path:
raise ValueError("No audio path for ASR.")
model = get_asr_model()
audio = AudioSegment.from_file(audio_path)
segments = split_audio_by_silence(audio)
texts = []
base, _ = os.path.splitext(audio_path)
for idx, (start_ms, end_ms) in enumerate(segments):
seg_audio = audio[start_ms:end_ms]
seg_path = f"{base}_seg_{idx}.wav"
seg_audio.export(seg_path, format="wav")
try:
transcription = model.endless_decode(
audio_path=seg_path,
chunk_size=32,
left_context_size=0,
right_context_size=0,
total_batch_duration=400,
return_timestamps=False,
)
except TypeError:
transcription = model.endless_decode(
audio_path=seg_path,
chunk_size=32,
left_context_size=0,
right_context_size=0,
total_batch_duration=400,
)
if isinstance(transcription, str):
text = transcription
else:
text = str(transcription)
text = text.strip()
if text:
texts.append(text)
return ", ".join(texts)