Spaces:
Running
Running
File size: 4,294 Bytes
37f86d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import argparse
import os
import shutil
from pathlib import Path
import soundfile as sf
import torch
from tqdm import tqdm
from common.log import logger
from common.stdout_wrapper import SAFE_STDOUT
vad_model, utils = torch.hub.load(
repo_or_dir="snakers4/silero-vad",
model="silero_vad",
onnx=True,
trust_repo=True,
)
(get_speech_timestamps, _, read_audio, *_) = utils
def get_stamps(
audio_file, min_silence_dur_ms: int = 700, min_sec: float = 2, max_sec: float = 12
):
"""
min_silence_dur_ms: int (ミリ秒):
このミリ秒数以上を無音だと判断する。
逆に、この秒数以下の無音区間では区切られない。
小さくすると、音声がぶつ切りに小さくなりすぎ、
大きくすると音声一つ一つが長くなりすぎる。
データセットによってたぶん要調整。
min_sec: float (秒):
この秒数より小さい発話は無視する。
max_sec: float (秒):
この秒数より大きい発話は無視する。
"""
sampling_rate = 16000 # 16kHzか8kHzのみ対応
min_ms = int(min_sec * 1000)
wav = read_audio(audio_file, sampling_rate=sampling_rate)
speech_timestamps = get_speech_timestamps(
wav,
vad_model,
sampling_rate=sampling_rate,
min_silence_duration_ms=min_silence_dur_ms,
min_speech_duration_ms=min_ms,
max_speech_duration_s=max_sec,
)
return speech_timestamps
def split_wav(
audio_file,
target_dir="raw",
min_sec=2,
max_sec=12,
min_silence_dur_ms=700,
):
margin = 200 # ミリ秒単位で、音声の前後に余裕を持たせる
speech_timestamps = get_stamps(
audio_file,
min_silence_dur_ms=min_silence_dur_ms,
min_sec=min_sec,
max_sec=max_sec,
)
data, sr = sf.read(audio_file)
total_ms = len(data) / sr * 1000
file_name = os.path.basename(audio_file).split(".")[0]
os.makedirs(target_dir, exist_ok=True)
total_time_ms = 0
# タイムスタンプに従って分割し、ファイルに保存
for i, ts in enumerate(speech_timestamps):
start_ms = max(ts["start"] / 16 - margin, 0)
end_ms = min(ts["end"] / 16 + margin, total_ms)
start_sample = int(start_ms / 1000 * sr)
end_sample = int(end_ms / 1000 * sr)
segment = data[start_sample:end_sample]
sf.write(os.path.join(target_dir, f"{file_name}-{i}.wav"), segment, sr)
total_time_ms += end_ms - start_ms
return total_time_ms / 1000
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--min_sec", "-m", type=float, default=2, help="Minimum seconds of a slice"
)
parser.add_argument(
"--max_sec", "-M", type=float, default=12, help="Maximum seconds of a slice"
)
parser.add_argument(
"--input_dir",
"-i",
type=str,
default="inputs",
help="Directory of input wav files",
)
parser.add_argument(
"--output_dir",
"-o",
type=str,
default="raw",
help="Directory of output wav files",
)
parser.add_argument(
"--min_silence_dur_ms",
"-s",
type=int,
default=700,
help="Silence above this duration (ms) is considered as a split point.",
)
args = parser.parse_args()
input_dir = args.input_dir
output_dir = args.output_dir
min_sec = args.min_sec
max_sec = args.max_sec
min_silence_dur_ms = args.min_silence_dur_ms
wav_files = Path(input_dir).glob("**/*.wav")
wav_files = list(wav_files)
logger.info(f"Found {len(wav_files)} wav files.")
if os.path.exists(output_dir):
logger.warning(f"Output directory {output_dir} already exists, deleting...")
shutil.rmtree(output_dir)
total_sec = 0
for wav_file in tqdm(wav_files, file=SAFE_STDOUT):
time_sec = split_wav(
audio_file=str(wav_file),
target_dir=output_dir,
min_sec=min_sec,
max_sec=max_sec,
min_silence_dur_ms=min_silence_dur_ms,
)
total_sec += time_sec
logger.info(f"Slice done! Total time: {total_sec / 60:.2f} min.")
|