Spaces:
Sleeping
Sleeping
import datetime | |
import glob | |
import os | |
import random | |
import re | |
import numpy as np | |
from scipy import signal | |
from TTS.encoder.models.lstm import LSTMSpeakerEncoder | |
from TTS.encoder.models.resnet import ResNetSpeakerEncoder | |
from TTS.utils.io import save_fsspec | |
class AugmentWAV(object): | |
def __init__(self, ap, augmentation_config): | |
self.ap = ap | |
self.use_additive_noise = False | |
if "additive" in augmentation_config.keys(): | |
self.additive_noise_config = augmentation_config["additive"] | |
additive_path = self.additive_noise_config["sounds_path"] | |
if additive_path: | |
self.use_additive_noise = True | |
# get noise types | |
self.additive_noise_types = [] | |
for key in self.additive_noise_config.keys(): | |
if isinstance(self.additive_noise_config[key], dict): | |
self.additive_noise_types.append(key) | |
additive_files = glob.glob(os.path.join(additive_path, "**/*.wav"), recursive=True) | |
self.noise_list = {} | |
for wav_file in additive_files: | |
noise_dir = wav_file.replace(additive_path, "").split(os.sep)[0] | |
# ignore not listed directories | |
if noise_dir not in self.additive_noise_types: | |
continue | |
if not noise_dir in self.noise_list: | |
self.noise_list[noise_dir] = [] | |
self.noise_list[noise_dir].append(wav_file) | |
print( | |
f" | > Using Additive Noise Augmentation: with {len(additive_files)} audios instances from {self.additive_noise_types}" | |
) | |
self.use_rir = False | |
if "rir" in augmentation_config.keys(): | |
self.rir_config = augmentation_config["rir"] | |
if self.rir_config["rir_path"]: | |
self.rir_files = glob.glob(os.path.join(self.rir_config["rir_path"], "**/*.wav"), recursive=True) | |
self.use_rir = True | |
print(f" | > Using RIR Noise Augmentation: with {len(self.rir_files)} audios instances") | |
self.create_augmentation_global_list() | |
def create_augmentation_global_list(self): | |
if self.use_additive_noise: | |
self.global_noise_list = self.additive_noise_types | |
else: | |
self.global_noise_list = [] | |
if self.use_rir: | |
self.global_noise_list.append("RIR_AUG") | |
def additive_noise(self, noise_type, audio): | |
clean_db = 10 * np.log10(np.mean(audio**2) + 1e-4) | |
noise_list = random.sample( | |
self.noise_list[noise_type], | |
random.randint( | |
self.additive_noise_config[noise_type]["min_num_noises"], | |
self.additive_noise_config[noise_type]["max_num_noises"], | |
), | |
) | |
audio_len = audio.shape[0] | |
noises_wav = None | |
for noise in noise_list: | |
noiseaudio = self.ap.load_wav(noise, sr=self.ap.sample_rate)[:audio_len] | |
if noiseaudio.shape[0] < audio_len: | |
continue | |
noise_snr = random.uniform( | |
self.additive_noise_config[noise_type]["min_snr_in_db"], | |
self.additive_noise_config[noise_type]["max_num_noises"], | |
) | |
noise_db = 10 * np.log10(np.mean(noiseaudio**2) + 1e-4) | |
noise_wav = np.sqrt(10 ** ((clean_db - noise_db - noise_snr) / 10)) * noiseaudio | |
if noises_wav is None: | |
noises_wav = noise_wav | |
else: | |
noises_wav += noise_wav | |
# if all possible files is less than audio, choose other files | |
if noises_wav is None: | |
return self.additive_noise(noise_type, audio) | |
return audio + noises_wav | |
def reverberate(self, audio): | |
audio_len = audio.shape[0] | |
rir_file = random.choice(self.rir_files) | |
rir = self.ap.load_wav(rir_file, sr=self.ap.sample_rate) | |
rir = rir / np.sqrt(np.sum(rir**2)) | |
return signal.convolve(audio, rir, mode=self.rir_config["conv_mode"])[:audio_len] | |
def apply_one(self, audio): | |
noise_type = random.choice(self.global_noise_list) | |
if noise_type == "RIR_AUG": | |
return self.reverberate(audio) | |
return self.additive_noise(noise_type, audio) | |
def to_camel(text): | |
text = text.capitalize() | |
return re.sub(r"(?!^)_([a-zA-Z])", lambda m: m.group(1).upper(), text) | |
def setup_encoder_model(config: "Coqpit"): | |
if config.model_params["model_name"].lower() == "lstm": | |
model = LSTMSpeakerEncoder( | |
config.model_params["input_dim"], | |
config.model_params["proj_dim"], | |
config.model_params["lstm_dim"], | |
config.model_params["num_lstm_layers"], | |
use_torch_spec=config.model_params.get("use_torch_spec", False), | |
audio_config=config.audio, | |
) | |
elif config.model_params["model_name"].lower() == "resnet": | |
model = ResNetSpeakerEncoder( | |
input_dim=config.model_params["input_dim"], | |
proj_dim=config.model_params["proj_dim"], | |
log_input=config.model_params.get("log_input", False), | |
use_torch_spec=config.model_params.get("use_torch_spec", False), | |
audio_config=config.audio, | |
) | |
return model | |
def save_checkpoint(model, optimizer, criterion, model_loss, out_path, current_step, epoch): | |
checkpoint_path = "checkpoint_{}.pth".format(current_step) | |
checkpoint_path = os.path.join(out_path, checkpoint_path) | |
print(" | | > Checkpoint saving : {}".format(checkpoint_path)) | |
new_state_dict = model.state_dict() | |
state = { | |
"model": new_state_dict, | |
"optimizer": optimizer.state_dict() if optimizer is not None else None, | |
"criterion": criterion.state_dict(), | |
"step": current_step, | |
"epoch": epoch, | |
"loss": model_loss, | |
"date": datetime.date.today().strftime("%B %d, %Y"), | |
} | |
save_fsspec(state, checkpoint_path) | |
def save_best_model(model, optimizer, criterion, model_loss, best_loss, out_path, current_step, epoch): | |
if model_loss < best_loss: | |
new_state_dict = model.state_dict() | |
state = { | |
"model": new_state_dict, | |
"optimizer": optimizer.state_dict(), | |
"criterion": criterion.state_dict(), | |
"step": current_step, | |
"epoch": epoch, | |
"loss": model_loss, | |
"date": datetime.date.today().strftime("%B %d, %Y"), | |
} | |
best_loss = model_loss | |
bestmodel_path = "best_model.pth" | |
bestmodel_path = os.path.join(out_path, bestmodel_path) | |
print("\n > BEST MODEL ({0:.5f}) : {1:}".format(model_loss, bestmodel_path)) | |
save_fsspec(state, bestmodel_path) | |
return best_loss | |