Spaces:
Running
Running
# Copyright (c) 2023 Amphion. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
""" This code is modified from https://montreal-forced-aligner.readthedocs.io/en/latest/user_guide/performance.html""" | |
import os | |
import subprocess | |
from multiprocessing import Pool | |
from tqdm import tqdm | |
import torchaudio | |
from pathlib import Path | |
def remove_empty_dirs(path): | |
"""remove empty directories in a given path""" | |
# Check if the given path is a directory | |
if not os.path.isdir(path): | |
print(f"{path} is not a directory") | |
return | |
# Walk through all directories and subdirectories | |
for root, dirs, _ in os.walk(path, topdown=False): | |
for dir in dirs: | |
dir_path = os.path.join(root, dir) | |
# Check if the directory is empty | |
if not os.listdir(dir_path): | |
os.rmdir(dir_path) # "Removed empty directory | |
def process_single_wav_file(task): | |
"""process a single wav file""" | |
wav_file, output_dir = task | |
speaker_id, book_name, filename = Path(wav_file).parts[-3:] | |
output_book_dir = Path(output_dir, speaker_id) | |
output_book_dir.mkdir(parents=True, exist_ok=True) | |
new_filename = f"{speaker_id}_{book_name}_{filename}" | |
new_wav_file = Path(output_book_dir, new_filename) | |
command = [ | |
"ffmpeg", | |
"-nostdin", | |
"-hide_banner", | |
"-loglevel", | |
"error", | |
"-nostats", | |
"-i", | |
wav_file, | |
"-acodec", | |
"pcm_s16le", | |
"-ar", | |
"16000", | |
new_wav_file, | |
] | |
subprocess.check_call( | |
command | |
) # Run the command to convert the file to 16kHz and 16-bit PCM | |
os.remove(wav_file) | |
def process_wav_files(wav_files, output_dir, n_process): | |
"""process wav files in parallel""" | |
tasks = [(wav_file, output_dir) for wav_file in wav_files] | |
print(f"Processing {len(tasks)} files") | |
with Pool(processes=n_process) as pool: | |
for _ in tqdm( | |
pool.imap_unordered(process_single_wav_file, tasks), total=len(tasks) | |
): | |
pass | |
print("Removing empty directories...") | |
remove_empty_dirs(output_dir) | |
print("Done!") | |
def get_wav_files(dataset_path): | |
"""get all wav files in the dataset""" | |
wav_files = [] | |
for speaker_id in os.listdir(dataset_path): | |
speaker_dir = os.path.join(dataset_path, speaker_id) | |
if not os.path.isdir(speaker_dir): | |
continue | |
for book_name in os.listdir(speaker_dir): | |
book_dir = os.path.join(speaker_dir, book_name) | |
if not os.path.isdir(book_dir): | |
continue | |
for file in os.listdir(book_dir): | |
if file.endswith(".wav"): | |
wav_files.append(os.path.join(book_dir, file)) | |
print("Found {} wav files".format(len(wav_files))) | |
return wav_files | |
def filter_wav_files_by_length(wav_files, max_len_sec=15): | |
"""filter wav files by length""" | |
print("original wav files: {}".format(len(wav_files))) | |
filtered_wav_files = [] | |
for audio_file in wav_files: | |
metadata = torchaudio.info(str(audio_file)) | |
audio_length = metadata.num_frames / metadata.sample_rate | |
if audio_length <= max_len_sec: | |
filtered_wav_files.append(audio_file) | |
else: | |
os.remove(audio_file) | |
print("filtered wav files: {}".format(len(filtered_wav_files))) | |
return filtered_wav_files | |
if __name__ == "__main__": | |
dataset_path = "/path/to/output/directory" | |
n_process = 16 | |
max_len_sec = 15 | |
wav_files = get_wav_files(dataset_path) | |
filtered_wav_files = filter_wav_files_by_length(wav_files, max_len_sec) | |
process_wav_files(filtered_wav_files, dataset_path, n_process) | |