|
import gc |
|
import os |
|
import re |
|
|
|
from audio_separator.separator import Separator |
|
|
|
os.environ["MODELSCOPE_CACHE"] = "./.cache/funasr" |
|
os.environ["UVR5_CACHE"] = "./.cache/uvr5-models" |
|
import json |
|
import subprocess |
|
from pathlib import Path |
|
|
|
import click |
|
import torch |
|
from loguru import logger |
|
from pydub import AudioSegment |
|
from silero_vad import get_speech_timestamps, load_silero_vad, read_audio |
|
from tqdm import tqdm |
|
|
|
from tools.file import AUDIO_EXTENSIONS, VIDEO_EXTENSIONS, list_files |
|
from tools.sensevoice.auto_model import AutoModel |
|
|
|
|
|
def uvr5_cli( |
|
audio_dir: Path, |
|
output_folder: Path, |
|
audio_files: list[Path] | None = None, |
|
output_format: str = "flac", |
|
model: str = "BS-Roformer-Viperx-1297.ckpt", |
|
): |
|
|
|
sepr = Separator( |
|
model_file_dir=os.environ["UVR5_CACHE"], |
|
output_dir=output_folder, |
|
output_format=output_format, |
|
) |
|
dictmodel = { |
|
"BS-Roformer-Viperx-1297.ckpt": "model_bs_roformer_ep_317_sdr_12.9755.ckpt", |
|
"BS-Roformer-Viperx-1296.ckpt": "model_bs_roformer_ep_368_sdr_12.9628.ckpt", |
|
"BS-Roformer-Viperx-1053.ckpt": "model_bs_roformer_ep_937_sdr_10.5309.ckpt", |
|
"Mel-Roformer-Viperx-1143.ckpt": "model_mel_band_roformer_ep_3005_sdr_11.4360.ckpt", |
|
} |
|
roformer_model = dictmodel[model] |
|
sepr.load_model(roformer_model) |
|
if audio_files is None: |
|
audio_files = list_files( |
|
path=audio_dir, extensions=AUDIO_EXTENSIONS, recursive=True |
|
) |
|
total_files = len(audio_files) |
|
|
|
print(f"{total_files} audio files found") |
|
|
|
res = [] |
|
for audio in tqdm(audio_files, desc="Denoising: "): |
|
file_path = str(audio_dir / audio) |
|
sep_out = sepr.separate(file_path) |
|
if isinstance(sep_out, str): |
|
res.append(sep_out) |
|
elif isinstance(sep_out, list): |
|
res.extend(sep_out) |
|
del sepr |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
return res, roformer_model |
|
|
|
|
|
def get_sample_rate(media_path: Path): |
|
result = subprocess.run( |
|
[ |
|
"ffprobe", |
|
"-v", |
|
"quiet", |
|
"-print_format", |
|
"json", |
|
"-show_streams", |
|
str(media_path), |
|
], |
|
capture_output=True, |
|
text=True, |
|
check=True, |
|
) |
|
media_info = json.loads(result.stdout) |
|
for stream in media_info.get("streams", []): |
|
if stream.get("codec_type") == "audio": |
|
return stream.get("sample_rate") |
|
return "44100" |
|
|
|
|
|
def convert_to_mono(src_path: Path, out_path: Path, out_fmt: str = "wav"): |
|
sr = get_sample_rate(src_path) |
|
out_path.parent.mkdir(parents=True, exist_ok=True) |
|
if src_path.resolve() == out_path.resolve(): |
|
output = str(out_path.with_stem(out_path.stem + f"_{sr}")) |
|
else: |
|
output = str(out_path) |
|
subprocess.run( |
|
[ |
|
"ffmpeg", |
|
"-loglevel", |
|
"error", |
|
"-i", |
|
str(src_path), |
|
"-acodec", |
|
"pcm_s16le" if out_fmt == "wav" else "flac", |
|
"-ar", |
|
sr, |
|
"-ac", |
|
"1", |
|
"-y", |
|
output, |
|
], |
|
check=True, |
|
) |
|
return out_path |
|
|
|
|
|
def convert_video_to_audio(video_path: Path, audio_dir: Path): |
|
cur_dir = audio_dir / video_path.relative_to(audio_dir).parent |
|
vocals = [ |
|
p |
|
for p in cur_dir.glob(f"{video_path.stem}_(Vocals)*.*") |
|
if p.suffix in AUDIO_EXTENSIONS |
|
] |
|
if len(vocals) > 0: |
|
return vocals[0] |
|
audio_path = cur_dir / f"{video_path.stem}.wav" |
|
convert_to_mono(video_path, audio_path) |
|
return audio_path |
|
|
|
|
|
@click.command() |
|
@click.option("--audio-dir", required=True, help="Directory containing audio files") |
|
@click.option( |
|
"--save-dir", required=True, help="Directory to save processed audio files" |
|
) |
|
@click.option("--device", default="cuda", help="Device to use [cuda / cpu]") |
|
@click.option("--language", default="auto", help="Language of the transcription") |
|
@click.option( |
|
"--max_single_segment_time", |
|
default=20000, |
|
type=int, |
|
help="Maximum of Output single audio duration(ms)", |
|
) |
|
@click.option("--fsmn-vad/--silero-vad", default=False) |
|
@click.option("--punc/--no-punc", default=False) |
|
@click.option("--denoise/--no-denoise", default=False) |
|
@click.option("--save_emo/--no_save_emo", default=False) |
|
def main( |
|
audio_dir: str, |
|
save_dir: str, |
|
device: str, |
|
language: str, |
|
max_single_segment_time: int, |
|
fsmn_vad: bool, |
|
punc: bool, |
|
denoise: bool, |
|
save_emo: bool, |
|
): |
|
|
|
audios_path = Path(audio_dir) |
|
save_path = Path(save_dir) |
|
save_path.mkdir(parents=True, exist_ok=True) |
|
|
|
video_files = list_files( |
|
path=audio_dir, extensions=VIDEO_EXTENSIONS, recursive=True |
|
) |
|
v2a_files = [convert_video_to_audio(p, audio_dir) for p in video_files] |
|
|
|
if denoise: |
|
VOCAL = "_(Vocals)" |
|
original_files = [ |
|
p |
|
for p in audios_path.glob("**/*") |
|
if p.suffix in AUDIO_EXTENSIONS and VOCAL not in p.stem |
|
] |
|
|
|
_, cur_model = uvr5_cli( |
|
audio_dir=audio_dir, output_folder=audio_dir, audio_files=original_files |
|
) |
|
need_remove = [p for p in audios_path.glob("**/*(Instrumental)*")] |
|
need_remove.extend(original_files) |
|
for _ in need_remove: |
|
_.unlink() |
|
vocal_files = [ |
|
p |
|
for p in audios_path.glob("**/*") |
|
if p.suffix in AUDIO_EXTENSIONS and VOCAL in p.stem |
|
] |
|
for f in vocal_files: |
|
fn, ext = f.stem, f.suffix |
|
|
|
v_pos = fn.find(VOCAL + "_" + cur_model.split(".")[0]) |
|
if v_pos != -1: |
|
new_fn = fn[: v_pos + len(VOCAL)] |
|
new_f = f.with_name(new_fn + ext) |
|
f = f.rename(new_f) |
|
convert_to_mono(f, f, "flac") |
|
f.unlink() |
|
|
|
audio_files = list_files( |
|
path=audio_dir, extensions=AUDIO_EXTENSIONS, recursive=True |
|
) |
|
|
|
logger.info("Loading / Downloading Funasr model...") |
|
|
|
model_dir = "iic/SenseVoiceSmall" |
|
|
|
vad_model = "fsmn-vad" if fsmn_vad else None |
|
vad_kwargs = {"max_single_segment_time": max_single_segment_time} |
|
punc_model = "ct-punc" if punc else None |
|
|
|
manager = AutoModel( |
|
model=model_dir, |
|
trust_remote_code=False, |
|
vad_model=vad_model, |
|
vad_kwargs=vad_kwargs, |
|
punc_model=punc_model, |
|
device=device, |
|
) |
|
|
|
if not fsmn_vad and vad_model is None: |
|
vad_model = load_silero_vad() |
|
|
|
logger.info("Model loaded.") |
|
|
|
pattern = re.compile(r"_\d{3}\.") |
|
|
|
for file_path in tqdm(audio_files, desc="Processing audio file"): |
|
|
|
if pattern.search(file_path.name): |
|
|
|
continue |
|
|
|
file_stem = file_path.stem |
|
file_suffix = file_path.suffix |
|
|
|
rel_path = Path(file_path).relative_to(audio_dir) |
|
(save_path / rel_path.parent).mkdir(parents=True, exist_ok=True) |
|
|
|
audio = AudioSegment.from_file(file_path) |
|
|
|
cfg = dict( |
|
cache={}, |
|
language=language, |
|
use_itn=False, |
|
batch_size_s=60, |
|
) |
|
|
|
if fsmn_vad: |
|
elapsed, vad_res = manager.vad(input=str(file_path), **cfg) |
|
else: |
|
wav = read_audio( |
|
str(file_path) |
|
) |
|
audio_key = file_path.stem |
|
audio_val = [] |
|
speech_timestamps = get_speech_timestamps( |
|
wav, |
|
vad_model, |
|
max_speech_duration_s=max_single_segment_time // 1000, |
|
return_seconds=True, |
|
) |
|
|
|
audio_val = [ |
|
[int(timestamp["start"] * 1000), int(timestamp["end"] * 1000)] |
|
for timestamp in speech_timestamps |
|
] |
|
vad_res = [] |
|
vad_res.append(dict(key=audio_key, value=audio_val)) |
|
|
|
res = manager.inference_with_vadres( |
|
input=str(file_path), vad_res=vad_res, **cfg |
|
) |
|
|
|
for i, info in enumerate(res): |
|
[start_ms, end_ms] = info["interval"] |
|
text = info["text"] |
|
emo = info["emo"] |
|
sliced_audio = audio[start_ms:end_ms] |
|
audio_save_path = ( |
|
save_path / rel_path.parent / f"{file_stem}_{i:03d}{file_suffix}" |
|
) |
|
sliced_audio.export(audio_save_path, format=file_suffix[1:]) |
|
print(f"Exported {audio_save_path}: {text}") |
|
|
|
transcript_save_path = ( |
|
save_path / rel_path.parent / f"{file_stem}_{i:03d}.lab" |
|
) |
|
with open( |
|
transcript_save_path, |
|
"w", |
|
encoding="utf-8", |
|
) as f: |
|
f.write(text) |
|
|
|
if save_emo: |
|
emo_save_path = save_path / rel_path.parent / f"{file_stem}_{i:03d}.emo" |
|
with open( |
|
emo_save_path, |
|
"w", |
|
encoding="utf-8", |
|
) as f: |
|
f.write(emo) |
|
|
|
if audios_path.resolve() == save_path.resolve(): |
|
file_path.unlink() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
exit(0) |
|
from funasr.utils.postprocess_utils import rich_transcription_postprocess |
|
|
|
|
|
audio_path = Path(r"D:\PythonProject\ok\1_output_(Vocals).wav") |
|
model_dir = "iic/SenseVoiceSmall" |
|
m, kwargs = SenseVoiceSmall.from_pretrained(model=model_dir, device="cuda:0") |
|
m.eval() |
|
|
|
res = m.inference( |
|
data_in=f"{kwargs['model_path']}/example/zh.mp3", |
|
language="auto", |
|
use_itn=False, |
|
ban_emo_unk=False, |
|
**kwargs, |
|
) |
|
|
|
print(res) |
|
text = rich_transcription_postprocess(res[0][0]["text"]) |
|
print(text) |
|
|