|
import os |
|
import gc |
|
import torchaudio |
|
import pandas |
|
from faster_whisper import WhisperModel |
|
from glob import glob |
|
|
|
from tqdm import tqdm |
|
|
|
from TTS.tts.layers.xtts.tokenizer import multilingual_cleaners |
|
|
|
|
|
|
|
import torch |
|
import torchaudio |
|
|
|
|
|
|
|
torch.set_num_threads(16) |
|
import os |
|
|
|
audio_types = (".wav", ".mp3", ".flac") |
|
|
|
def find_latest_best_model(folder_path): |
|
search_path = os.path.join(folder_path, '**', 'best_model.pth') |
|
files = glob(search_path, recursive=True) |
|
latest_file = max(files, key=os.path.getctime, default=None) |
|
return latest_file |
|
|
|
|
|
def list_audios(basePath, contains=None): |
|
|
|
return list_files(basePath, validExts=audio_types, contains=contains) |
|
|
|
def list_files(basePath, validExts=None, contains=None): |
|
|
|
for (rootDir, dirNames, filenames) in os.walk(basePath): |
|
|
|
for filename in filenames: |
|
|
|
|
|
if contains is not None and filename.find(contains) == -1: |
|
continue |
|
|
|
|
|
ext = filename[filename.rfind("."):].lower() |
|
|
|
|
|
if validExts is None or ext.endswith(validExts): |
|
|
|
audioPath = os.path.join(rootDir, filename) |
|
yield audioPath |
|
|
|
def format_audio_list(audio_files, asr_model, target_language="en", out_path=None, buffer=0.2, eval_percentage=0.15, speaker_name="coqui", gradio_progress=None): |
|
audio_total_size = 0 |
|
os.makedirs(out_path, exist_ok=True) |
|
|
|
lang_file_path = os.path.join(out_path, "lang.txt") |
|
current_language = None |
|
if os.path.exists(lang_file_path): |
|
with open(lang_file_path, 'r', encoding='utf-8') as existing_lang_file: |
|
current_language = existing_lang_file.read().strip() |
|
|
|
if current_language != target_language: |
|
with open(lang_file_path, 'w', encoding='utf-8') as lang_file: |
|
lang_file.write(target_language + '\n') |
|
print("Warning, existing language does not match target language. Updated lang.txt with target language.") |
|
else: |
|
print("Existing language matches target language") |
|
|
|
metadata = {"audio_file": [], "text": [], "speaker_name": []} |
|
train_metadata_path = os.path.join(out_path, "metadata_train.csv") |
|
eval_metadata_path = os.path.join(out_path, "metadata_eval.csv") |
|
|
|
existing_metadata = {'train': None, 'eval': None} |
|
if os.path.exists(train_metadata_path): |
|
existing_metadata['train'] = pandas.read_csv(train_metadata_path, sep="|") |
|
print("Existing training metadata found and loaded.") |
|
|
|
if os.path.exists(eval_metadata_path): |
|
existing_metadata['eval'] = pandas.read_csv(eval_metadata_path, sep="|") |
|
print("Existing evaluation metadata found and loaded.") |
|
|
|
if gradio_progress is not None: |
|
tqdm_object = gradio_progress.tqdm(audio_files, desc="Formatting...") |
|
else: |
|
tqdm_object = tqdm(audio_files) |
|
|
|
for audio_path in tqdm_object: |
|
audio_file_name_without_ext, _= os.path.splitext(os.path.basename(audio_path)) |
|
prefix_check = f"wavs/{audio_file_name_without_ext}_" |
|
|
|
skip_processing = False |
|
for key in ['train', 'eval']: |
|
if existing_metadata[key] is not None: |
|
mask = existing_metadata[key]['audio_file'].str.startswith(prefix_check) |
|
if mask.any(): |
|
print(f"Segments from {audio_file_name_without_ext} have been previously processed; skipping...") |
|
skip_processing = True |
|
break |
|
|
|
if skip_processing: |
|
continue |
|
|
|
wav, sr = torchaudio.load(audio_path) |
|
if wav.size(0) != 1: |
|
wav = torch.mean(wav, dim=0, keepdim=True) |
|
|
|
wav = wav.squeeze() |
|
audio_total_size += (wav.size(-1) / sr) |
|
|
|
segments, _= asr_model.transcribe(audio_path, vad_filter=True, word_timestamps=True, language=target_language) |
|
segments = list(segments) |
|
i = 0 |
|
sentence = "" |
|
sentence_start = None |
|
first_word = True |
|
words_list = [] |
|
for _, segment in enumerate(segments): |
|
words = list(segment.words) |
|
words_list.extend(words) |
|
|
|
for word_idx, word in enumerate(words_list): |
|
if first_word: |
|
sentence_start = word.start |
|
if word_idx == 0: |
|
sentence_start = max(sentence_start - buffer, 0) |
|
else: |
|
previous_word_end = words_list[word_idx - 1].end |
|
sentence_start = max(sentence_start - buffer, (previous_word_end + sentence_start) / 2) |
|
|
|
sentence = word.word |
|
first_word = False |
|
else: |
|
sentence += word.word |
|
|
|
if word.word[-1] in ["!", "。", ".", "?"]: |
|
sentence = sentence[1:] |
|
sentence = multilingual_cleaners(sentence, target_language) |
|
audio_file_name, _= os.path.splitext(os.path.basename(audio_path)) |
|
audio_file = f"wavs/{audio_file_name}_{str(i).zfill(8)}.wav" |
|
|
|
if word_idx + 1 < len(words_list): |
|
next_word_start = words_list[word_idx + 1].start |
|
else: |
|
next_word_start = (wav.shape[0] - 1) / sr |
|
|
|
word_end = min((word.end + next_word_start) / 2, word.end + buffer) |
|
|
|
absolute_path = os.path.join(out_path, audio_file) |
|
os.makedirs(os.path.dirname(absolute_path), exist_ok=True) |
|
i += 1 |
|
first_word = True |
|
|
|
audio = wav[int(sr*sentence_start):int(sr *word_end)].unsqueeze(0) |
|
if audio.size(-1) >= sr / 3: |
|
torchaudio.save(absolute_path, audio, sr) |
|
else: |
|
continue |
|
|
|
metadata["audio_file"].append(audio_file) |
|
metadata["text"].append(sentence) |
|
metadata["speaker_name"].append(speaker_name) |
|
|
|
df = pandas.DataFrame(metadata) |
|
|
|
mode = 'w' if not os.path.exists(train_metadata_path) else 'a' |
|
header = not os.path.exists(train_metadata_path) |
|
df.to_csv(train_metadata_path, sep="|", index=False, mode=mode, header=header) |
|
|
|
mode = 'w' if not os.path.exists(eval_metadata_path) else 'a' |
|
header = not os.path.exists(eval_metadata_path) |
|
df.to_csv(eval_metadata_path, sep="|", index=False, mode=mode, header=header) |
|
|
|
metadata = {"audio_file": [], "text": [], "speaker_name": []} |
|
|
|
if os.path.exists(train_metadata_path) and os.path.exists(eval_metadata_path): |
|
existing_train_df = existing_metadata['train'] |
|
existing_eval_df = existing_metadata['eval'] |
|
else: |
|
existing_train_df = pandas.DataFrame(columns=["audio_file", "text", "speaker_name"]) |
|
existing_eval_df = pandas.DataFrame(columns=["audio_file", "text", "speaker_name"]) |
|
|
|
new_data_df = pandas.read_csv(train_metadata_path, sep="|") |
|
|
|
combined_train_df = pandas.concat([existing_train_df, new_data_df], ignore_index=True).drop_duplicates().reset_index(drop=True) |
|
combined_eval_df = pandas.concat([existing_eval_df, new_data_df], ignore_index=True).drop_duplicates().reset_index(drop=True) |
|
|
|
combined_train_df_shuffled = combined_train_df.sample(frac=1) |
|
num_val_samples = int(len(combined_train_df_shuffled)* eval_percentage) |
|
|
|
final_eval_set = combined_train_df_shuffled[:num_val_samples] |
|
final_training_set = combined_train_df_shuffled[num_val_samples:] |
|
|
|
final_training_set.sort_values('audio_file').to_csv(train_metadata_path, sep='|', index=False) |
|
final_eval_set.sort_values('audio_file').to_csv(eval_metadata_path, sep='|', index=False) |
|
|
|
return train_metadata_path, eval_metadata_path, audio_total_size |
|
|