import json import random import sys from pathlib import Path import torch import numpy as np from torch.nn import functional as F # import cupy as cp from numba import jit, prange CYTHON = False def maximum_path(value, mask, max_neg_val=None): """ Monotonic alignment search algorithm Numpy-friendly version. It's about 4 times faster than torch version. value: [b, t_x, t_y] mask: [b, t_x, t_y] """ if max_neg_val is None: max_neg_val = -np.inf # Patch for Sphinx complaint value = value * mask device = value.device dtype = value.dtype value = value.cpu().detach().numpy() mask = mask.cpu().detach().numpy().astype(np.bool) b, t_x, t_y = value.shape direction = np.zeros(value.shape, dtype=np.int64) v = np.zeros((b, t_x), dtype=np.float32) x_range = np.arange(t_x, dtype=np.float32).reshape(1, -1) for j in range(t_y): v0 = np.pad(v, [[0, 0], [1, 0]], mode="constant", constant_values=max_neg_val)[:, :-1] v1 = v max_mask = v1 >= v0 v_max = np.where(max_mask, v1, v0) direction[:, :, j] = max_mask index_mask = x_range <= j v = np.where(index_mask, v_max + value[:, :, j], max_neg_val) direction = np.where(mask, direction, 1) path = np.zeros(value.shape, dtype=np.float32) index = mask[:, :, 0].sum(1).astype(np.int64) - 1 index_range = np.arange(b) for j in reversed(range(t_y)): path[index_range, index, j] = 1 index = index + direction[index_range, index, j] - 1 path = path * mask.astype(np.float32) path = torch.from_numpy(path).to(device=device, dtype=dtype) return path @jit(parallel=True) def maximum_path_numba(value, mask, max_neg_val=None): """ Monotonic alignment search algorithm Numpy-friendly version. It's about 4 times faster than torch version. value: [b, t_x, t_y] mask: [b, t_x, t_y] """ if max_neg_val is None: max_neg_val = -np.inf # Patch for Sphinx complaint value = value * mask # device = value.device # dtype = value.dtype # value = value.cpu().detach().numpy() # mask = mask.cpu().detach().numpy().astype(np.bool) b, t_x, t_y = value.shape direction = np.zeros(value.shape, dtype=np.int64) v = np.zeros((b, t_x), dtype=np.float32) x_range = np.arange(t_x, dtype=np.float32).reshape(1, -1) for j in prange(t_y): v0 = np.pad(v, [[0, 0], [1, 0]], mode="constant", constant_values=max_neg_val)[:, :-1] v1 = v max_mask = v1 >= v0 v_max = np.where(max_mask, v1, v0) direction[:, :, j] = max_mask index_mask = x_range <= j v = np.where(index_mask, v_max + value[:, :, j], max_neg_val) direction = np.where(mask, direction, 1) path = np.zeros(value.shape, dtype=np.float32) index = mask[:, :, 0].sum(1).astype(np.int64) - 1 index_range = np.arange(b) # for j in reversed(prange(t_y)): for j in prange(t_y): path[index_range, index, (t_y-1)-j] = 1 index = index + direction[index_range, index, (t_y-1)-j] - 1 path = path * mask.astype(np.float32) # path = torch.from_numpy(path).to(device=device, dtype=dtype) return path # import pytorch_pfn_extras as ppe # ppe.cuda.use_torch_mempool_in_cupy() # print("torch.cuda.memory_allocated()", torch.cuda.memory_allocated()) def maximum_path_cupy(value, mask, max_neg_val=None): """ Monotonic alignment search algorithm Numpy-friendly version. It's about 4 times faster than torch version. value: [b, t_x, t_y] mask: [b, t_x, t_y] """ if max_neg_val is None: max_neg_val = -cp.inf # Patch for Sphinx complaint value = value * mask device = value.device dtype = value.dtype # value = value.cpu().detach().numpy() # mask = mask.cpu().detach().numpy().astype(cp.bool) value = cp.array(value) mask = cp.array(mask).astype(cp.bool) b, t_x, t_y = value.shape direction = cp.zeros(value.shape, dtype=cp.int64) v = cp.zeros((b, t_x), dtype=cp.float32) x_range = cp.arange(t_x, dtype=cp.float32).reshape(1, -1) for j in range(t_y): v0 = cp.pad(v, [[0, 0], [1, 0]], mode="constant", constant_values=max_neg_val)[:, :-1] v1 = v max_mask = v1 >= v0 v_max = cp.where(max_mask, v1, v0) direction[:, :, j] = max_mask index_mask = x_range <= j v = cp.where(index_mask, v_max + value[:, :, j], max_neg_val) direction = cp.where(mask, direction, 1) path = cp.zeros(value.shape, dtype=cp.float32) index = mask[:, :, 0].sum(1).astype(cp.int64) - 1 index_range = cp.arange(b) for j in reversed(range(t_y)): path[index_range, index, j] = 1 index = index + direction[index_range, index, j] - 1 path = path * mask.astype(cp.float32) path = torch.as_tensor(path, device=device) return path def rand_segments(x: torch.tensor, x_lengths: torch.tensor = None, segment_size=4): """Create random segments based on the input lengths. Args: x (torch.tensor): Input tensor. x_lengths (torch.tensor): Input lengths. segment_size (int): Expected output segment size. Shapes: - x: :math:`[B, C, T]` - x_lengths: :math:`[B]` """ B, _, T = x.size() if x_lengths is None: x_lengths = T max_idxs = x_lengths - segment_size + 1 assert all(max_idxs > 0), " [!] At least one sample is shorter than the segment size." segment_indices = (torch.rand([B]).type_as(x) * max_idxs).long() ret = segment(x, segment_indices, segment_size) return ret, segment_indices def segment(x: torch.tensor, segment_indices: torch.tensor, segment_size=4): """Segment each sample in a batch based on the provided segment indices Args: x (torch.tensor): Input tensor. segment_indices (torch.tensor): Segment indices. segment_size (int): Expected output segment size. """ segments = torch.zeros_like(x[:, :, :segment_size]) for i in range(x.size(0)): index_start = segment_indices[i] index_end = index_start + segment_size segments[i] = x[i, :, index_start:index_end] return segments # from https://gist.github.com/jihunchoi/f1434a77df9db1bb337417854b398df1 def sequence_mask(sequence_length, max_len=None): """Create a sequence mask for filtering padding in a sequence tensor. Args: sequence_length (torch.tensor): Sequence lengths. max_len (int, Optional): Maximum sequence length. Defaults to None. Shapes: - mask: :math:`[B, T_max]` """ if max_len is None: max_len = sequence_length.data.max() seq_range = torch.arange(max_len, dtype=sequence_length.dtype, device=sequence_length.device) # B x T_max mask = seq_range.unsqueeze(0) < sequence_length.unsqueeze(1) return mask DEFAULT_MIN_BIN_WIDTH = 1e-3 DEFAULT_MIN_BIN_HEIGHT = 1e-3 DEFAULT_MIN_DERIVATIVE = 1e-3 def piecewise_rational_quadratic_transform( inputs, unnormalized_widths, unnormalized_heights, unnormalized_derivatives, inverse=False, tails=None, tail_bound=1.0, min_bin_width=DEFAULT_MIN_BIN_WIDTH, min_bin_height=DEFAULT_MIN_BIN_HEIGHT, min_derivative=DEFAULT_MIN_DERIVATIVE, ): if tails is None: spline_fn = rational_quadratic_spline spline_kwargs = {} else: spline_fn = unconstrained_rational_quadratic_spline spline_kwargs = {"tails": tails, "tail_bound": tail_bound} outputs, logabsdet = spline_fn( inputs=inputs, unnormalized_widths=unnormalized_widths, unnormalized_heights=unnormalized_heights, unnormalized_derivatives=unnormalized_derivatives, inverse=inverse, min_bin_width=min_bin_width, min_bin_height=min_bin_height, min_derivative=min_derivative, **spline_kwargs, ) return outputs, logabsdet def searchsorted(bin_locations, inputs, eps=1e-6): bin_locations[..., -1] += eps return torch.sum(inputs[..., None] >= bin_locations, dim=-1) - 1 def unconstrained_rational_quadratic_spline( inputs, unnormalized_widths, unnormalized_heights, unnormalized_derivatives, inverse=False, tails="linear", tail_bound=1.0, min_bin_width=DEFAULT_MIN_BIN_WIDTH, min_bin_height=DEFAULT_MIN_BIN_HEIGHT, min_derivative=DEFAULT_MIN_DERIVATIVE, ): inside_interval_mask = (inputs >= -tail_bound) & (inputs <= tail_bound) outside_interval_mask = ~inside_interval_mask outputs = torch.zeros_like(inputs) logabsdet = torch.zeros_like(inputs) if tails == "linear": unnormalized_derivatives = F.pad(unnormalized_derivatives, pad=(1, 1)) constant = np.log(np.exp(1 - min_derivative) - 1) unnormalized_derivatives[..., 0] = constant unnormalized_derivatives[..., -1] = constant outputs[outside_interval_mask] = inputs[outside_interval_mask] logabsdet[outside_interval_mask] = 0 else: raise RuntimeError("{} tails are not implemented.".format(tails)) outputs[inside_interval_mask], logabsdet[inside_interval_mask] = rational_quadratic_spline( inputs=inputs[inside_interval_mask], unnormalized_widths=unnormalized_widths[inside_interval_mask, :], unnormalized_heights=unnormalized_heights[inside_interval_mask, :], unnormalized_derivatives=unnormalized_derivatives[inside_interval_mask, :], inverse=inverse, left=-tail_bound, right=tail_bound, bottom=-tail_bound, top=tail_bound, min_bin_width=min_bin_width, min_bin_height=min_bin_height, min_derivative=min_derivative, ) return outputs, logabsdet def rational_quadratic_spline( inputs, unnormalized_widths, unnormalized_heights, unnormalized_derivatives, inverse=False, left=0.0, right=1.0, bottom=0.0, top=1.0, min_bin_width=DEFAULT_MIN_BIN_WIDTH, min_bin_height=DEFAULT_MIN_BIN_HEIGHT, min_derivative=DEFAULT_MIN_DERIVATIVE, ): if torch.min(inputs) < left or torch.max(inputs) > right: raise ValueError("Input to a transform is not within its domain") num_bins = unnormalized_widths.shape[-1] if min_bin_width * num_bins > 1.0: raise ValueError("Minimal bin width too large for the number of bins") if min_bin_height * num_bins > 1.0: raise ValueError("Minimal bin height too large for the number of bins") widths = F.softmax(unnormalized_widths, dim=-1) widths = min_bin_width + (1 - min_bin_width * num_bins) * widths cumwidths = torch.cumsum(widths, dim=-1) cumwidths = F.pad(cumwidths, pad=(1, 0), mode="constant", value=0.0) cumwidths = (right - left) * cumwidths + left cumwidths[..., 0] = left cumwidths[..., -1] = right widths = cumwidths[..., 1:] - cumwidths[..., :-1] derivatives = min_derivative + F.softplus(unnormalized_derivatives) heights = F.softmax(unnormalized_heights, dim=-1) heights = min_bin_height + (1 - min_bin_height * num_bins) * heights cumheights = torch.cumsum(heights, dim=-1) cumheights = F.pad(cumheights, pad=(1, 0), mode="constant", value=0.0) cumheights = (top - bottom) * cumheights + bottom cumheights[..., 0] = bottom cumheights[..., -1] = top heights = cumheights[..., 1:] - cumheights[..., :-1] if inverse: bin_idx = searchsorted(cumheights, inputs)[..., None] else: bin_idx = searchsorted(cumwidths, inputs)[..., None] input_cumwidths = cumwidths.gather(-1, bin_idx)[..., 0] input_bin_widths = widths.gather(-1, bin_idx)[..., 0] input_cumheights = cumheights.gather(-1, bin_idx)[..., 0] delta = heights / widths input_delta = delta.gather(-1, bin_idx)[..., 0] input_derivatives = derivatives.gather(-1, bin_idx)[..., 0] input_derivatives_plus_one = derivatives[..., 1:].gather(-1, bin_idx)[..., 0] input_heights = heights.gather(-1, bin_idx)[..., 0] if inverse: a = (inputs - input_cumheights) * ( input_derivatives + input_derivatives_plus_one - 2 * input_delta ) + input_heights * (input_delta - input_derivatives) b = input_heights * input_derivatives - (inputs - input_cumheights) * ( input_derivatives + input_derivatives_plus_one - 2 * input_delta ) c = -input_delta * (inputs - input_cumheights) discriminant = b.pow(2) - 4 * a * c assert (discriminant >= 0).all() root = (2 * c) / (-b - torch.sqrt(discriminant)) outputs = root * input_bin_widths + input_cumwidths theta_one_minus_theta = root * (1 - root) denominator = input_delta + ( (input_derivatives + input_derivatives_plus_one - 2 * input_delta) * theta_one_minus_theta ) derivative_numerator = input_delta.pow(2) * ( input_derivatives_plus_one * root.pow(2) + 2 * input_delta * theta_one_minus_theta + input_derivatives * (1 - root).pow(2) ) logabsdet = torch.log(derivative_numerator) - 2 * torch.log(denominator) return outputs, -logabsdet else: theta = (inputs - input_cumwidths) / input_bin_widths theta_one_minus_theta = theta * (1 - theta) numerator = input_heights * (input_delta * theta.pow(2) + input_derivatives * theta_one_minus_theta) denominator = input_delta + ( (input_derivatives + input_derivatives_plus_one - 2 * input_delta) * theta_one_minus_theta ) outputs = input_cumheights + numerator / denominator derivative_numerator = input_delta.pow(2) * ( input_derivatives_plus_one * theta.pow(2) + 2 * input_delta * theta_one_minus_theta + input_derivatives * (1 - theta).pow(2) ) logabsdet = torch.log(derivative_numerator) - 2 * torch.log(denominator) return outputs, logabsdet from typing import Dict, List, Tuple from torch.utils.data.sampler import WeightedRandomSampler def get_language_weighted_sampler(items: list): language_names = np.array([item[3] for item in items]) unique_language_names = np.unique(language_names).tolist() language_ids = [unique_language_names.index(l) for l in language_names] language_count = np.array([len(np.where(language_names == l)[0]) for l in unique_language_names]) weight_language = 1.0 / language_count dataset_samples_weight = torch.from_numpy(np.array([weight_language[l] for l in language_ids])).double() return WeightedRandomSampler(dataset_samples_weight, len(dataset_samples_weight)) import os import re from glob import glob # def vctk(root_path, meta_files=None, wavs_path="wav48", ignored_speakers=None): # items = [] # with open(f'{root_path}/metadata.csv') as f: # lines = f.read().split("\n") # for line in lines: # fname = line.split("|")[0] # text = line.split("|")[1] # speaker_id = fname.split("_")[0] # # if isinstance(ignored_speakers, list): # # if speaker_id in ignored_speakers: # # continue # # wav_file = os.path.join(root_path, "wavs", speaker_id, fname) # wav_file = os.path.join(root_path, "wavs", fname) # items.append([text, wav_file, "VCTK_" + speaker_id]) # # items.append([text, wav_file, "VCTK_" + speaker_id, "en"]) # # items.append([text, wav_file, "VCTK_" + speaker_id]) # return items # def xvaspeech(root_path, meta_files=None): # num_speakers = 0 # lang = root_path.split("/")[-1] # root_path = "/".join(root_path.split("/")[:-1]) # csv_files = glob(root_path + f'/{lang}_**/metadata.csv', recursive=True) # # print(f'csv_files, {csv_files}') # items = [] # for csv_file in csv_files: # # ======== DEBUG # # if "it_f4_danse" not in csv_file and "it_f4_nate" not in csv_file and "it_sk_malenordcommander" not in csv_file and "it_sk_femalenord" not in csv_file and "it_sk_femalecommander" not in csv_file: # # if "it_f4_danse" not in csv_file and "it_f4_nate" not in csv_file and "it_sk_malenordcommander": # # if "it_f4_nate" not in csv_file and "it_sk_malenordcommander": # # if "de_f4_nate" not in csv_file: # # pass # # else: # # continue # # if "it_" in csv_file and "it_f4_nate" not in csv_file or "en_" in csv_file: # # continue # # ======== # csv_file = csv_file.replace("\\", "/") # if os.path.isfile(csv_file): # txt_file = csv_file # else: # txt_file = os.path.join(root_path, csv_file) # folder = os.path.dirname(txt_file) # # speaker_name_match = (txt_file.split("/female/")[1] if "/female/" in txt_file else txt_file.split("/male/")[1]).split("/")[0] # # if speaker_name_match is None: # # continue # # speaker_name = speaker_name_match.group("speaker_name") # speaker_name = root_path.split("/")[-1] # # ignore speakers # # if isinstance(ignored_speakers, list): # # if speaker_name in ignored_speakers: # # continue # print(" | > {}".format(csv_file)) # has_registered_at_least_one = False # with open(txt_file, "r", encoding="utf-8") as ttf: # for line in ttf: # cols = line.split("|") # wav_file = os.path.join(folder, "wavs", (cols[0] + ".wav") if ".wav" not in cols[0] else cols[0]) # # if not meta_files: # # # wav_file = os.path.join(folder, "wavs", cols[0] + ".wav") # # wav_file = os.path.join(folder, "wavs", (cols[0] + ".wav") if ".wav" not in cols[0] else cols[0]) # # else: # # # wav_file = os.path.join(root_path, folder.replace("metadata.csv", ""), "wavs", cols[0] + ".wav") # # wav_file = os.path.join(root_path, folder.replace("metadata.csv", ""), "wavs", (cols[0] + ".wav") if ".wav" not in cols[0] else cols[0]) # # if os.path.isfile(wav_file): # if os.path.exists(wav_file): # text = cols[1].strip() # items.append([text, wav_file, speaker_name]) # has_registered_at_least_one = True # else: # # M-AI-Labs have some missing samples, so just print the warning # # print("> File %s does not exist!" % (wav_file)) # pass # if has_registered_at_least_one: # num_speakers += 1 # # print(f'mailabs formatter items, {len(items)}') # return items, num_speakers # def mailabs(root_path, meta_files=None, ignored_speakers=None): # # print("=====================", "mailabs") # """Normalizes M-AI-Labs meta data files to TTS format # Args: # root_path (str): root folder of the MAILAB language folder. # meta_files (str): list of meta files to be used in the training. If None, finds all the csv files # recursively. Defaults to None # """ # speaker_regex = re.compile("by_book/(male|female)/(?P[^/]+)/") # if not meta_files: # csv_files = glob(root_path + "/**/metadata.csv", recursive=True) # else: # csv_files = meta_files # # meta_files = [f.strip() for f in meta_files.split(",")] # items = [] # for csv_file in csv_files: # csv_file = csv_file.replace("\\", "/") # if "/mix/" in csv_file: # continue # if os.path.isfile(csv_file): # txt_file = csv_file # else: # txt_file = os.path.join(root_path, csv_file) # folder = os.path.dirname(txt_file) # # print(f'txt_file, {txt_file}') # # print(f'folder, {folder}') # # print(f'speaker_regex, {speaker_regex}') # # determine speaker based on folder structure... # # speaker_name_match = speaker_regex.search(txt_file) # # print(f'speaker_name_match, {speaker_name_match}') # speaker_name_match = (txt_file.split("/female/")[1] if "/female/" in txt_file else txt_file.split("/male/")[1]).split("/")[0] # if speaker_name_match is None: # continue # # speaker_name = speaker_name_match.group("speaker_name") # speaker_name = speaker_name_match # # ignore speakers # if isinstance(ignored_speakers, list): # if speaker_name in ignored_speakers: # continue # print(" | > {}".format(csv_file)) # with open(txt_file, "r", encoding="utf-8") as ttf: # for line in ttf: # cols = line.split("|") # if not meta_files: # # wav_file = os.path.join(folder, "wavs", cols[0] + ".wav") # wav_file = os.path.join(folder, "wavs", (cols[0] + ".wav") if ".wav" not in cols[0] else cols[0]) # else: # # wav_file = os.path.join(root_path, folder.replace("metadata.csv", ""), "wavs", cols[0] + ".wav") # wav_file = os.path.join(root_path, folder.replace("metadata.csv", ""), "wavs", (cols[0] + ".wav") if ".wav" not in cols[0] else cols[0]) # if os.path.isfile(wav_file): # text = cols[1].strip() # items.append([text, wav_file, speaker_name]) # else: # # M-AI-Labs have some missing samples, so just print the warning # # print("> File %s does not exist!" % (wav_file)) # pass # # print(f'mailabs formatter items, {len(items)}') # return items from collections import Counter def split_dataset(items): """Split a dataset into train and eval. Consider speaker distribution in multi-speaker training. Args: items (List[List]): A list of samples. Each sample is a list of `[audio_path, text, speaker_id]`. """ speakers = [item[-1] for item in items] is_multi_speaker = len(set(speakers)) > 1 eval_split_size = min(500, int(len(items) * 0.01)) # eval_split_size = min(10, int(len(items) * 0.01)) # assert eval_split_size > 0, " [!] You do not have enough samples to train. You need at least 100 samples." np.random.seed(0) np.random.shuffle(items) if is_multi_speaker: items_eval = [] speakers = [item[-1] for item in items] speaker_counter = Counter(speakers) while len(items_eval) < eval_split_size: item_idx = np.random.randint(0, len(items)) speaker_to_be_removed = items[item_idx][-1] if speaker_counter[speaker_to_be_removed] > 1: items_eval.append(items[item_idx]) speaker_counter[speaker_to_be_removed] -= 1 del items[item_idx] return items_eval, items return items[:eval_split_size], items[eval_split_size:] from math import exp from torch.autograd import Variable def gaussian(window_size, sigma): gauss = torch.Tensor([exp(-((x - window_size // 2) ** 2) / float(2 * sigma ** 2)) for x in range(window_size)]) return gauss / gauss.sum() def create_window(window_size, channel): _1D_window = gaussian(window_size, 1.5).unsqueeze(1) _2D_window = _1D_window.mm(_1D_window.t()).float().unsqueeze(0).unsqueeze(0) window = Variable(_2D_window.expand(channel, 1, window_size, window_size).contiguous()) return window def _ssim(img1, img2, window, window_size, channel, size_average=True): mu1 = F.conv2d(img1, window, padding=window_size // 2, groups=channel) mu2 = F.conv2d(img2, window, padding=window_size // 2, groups=channel) # TODO: check if you need AMP disabled # with torch.cuda.amp.autocast(enabled=False): mu1_sq = mu1.float().pow(2) mu2_sq = mu2.float().pow(2) mu1_mu2 = mu1 * mu2 sigma1_sq = F.conv2d(img1 * img1, window, padding=window_size // 2, groups=channel) - mu1_sq sigma2_sq = F.conv2d(img2 * img2, window, padding=window_size // 2, groups=channel) - mu2_sq sigma12 = F.conv2d(img1 * img2, window, padding=window_size // 2, groups=channel) - mu1_mu2 C1 = 0.01 ** 2 C2 = 0.03 ** 2 ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / ((mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2)) if size_average: return ssim_map.mean() return ssim_map.mean(1).mean(1).mean(1) def ssim(img1, img2, window_size=11, size_average=True): (_, channel, _, _) = img1.size() window = create_window(window_size, channel).type_as(img1) window = window.type_as(img1) return _ssim(img1, img2, window, window_size, channel, size_average) def make_symbols( characters, phonemes=None, punctuations="!'(),-.:;? ", pad="_", eos="~", bos="^", unique=True, ): # pylint: disable=redefined-outer-name """Function to create symbols and phonemes TODO: create phonemes_to_id and symbols_to_id dicts here.""" _symbols = list(characters) _symbols = [bos] + _symbols if len(bos) > 0 and bos is not None else _symbols _symbols = [eos] + _symbols if len(bos) > 0 and eos is not None else _symbols _symbols = [pad] + _symbols if len(bos) > 0 and pad is not None else _symbols _phonemes = None if phonemes is not None: _phonemes_sorted = ( sorted(list(set(phonemes))) if unique else sorted(list(phonemes)) ) # this is to keep previous models compatible. # Prepend "@" to ARPAbet symbols to ensure uniqueness (some are the same as uppercase letters): # _arpabet = ["@" + s for s in _phonemes_sorted] # Export all symbols: _phonemes = [pad, eos, bos] + list(_phonemes_sorted) + list(punctuations) # _symbols += _arpabet return _symbols, _phonemes # Regular expression matching text enclosed in curly braces: _CURLY_RE = re.compile(r"(.*?)\{(.+?)\}(.*)") _whitespace_re = re.compile(r"\s+") def _should_keep_symbol(s): return s in _symbol_to_id and s not in ["~", "^", "_"] def lowercase(text): return text.lower() def replace_symbols(text, lang="en"): text = text.replace(";", ",") text = text.replace("-", " ") text = text.replace(":", ",") if lang == "en": text = text.replace("&", " and ") elif lang == "fr": text = text.replace("&", " et ") elif lang == "pt": text = text.replace("&", " e ") return text def remove_aux_symbols(text): text = re.sub(r"[\<\>\(\)\[\]\"]+", "", text) return text def collapse_whitespace(text): return re.sub(_whitespace_re, " ", text).strip() def multilingual_cleaners(text): """Pipeline for multilingual text""" text = lowercase(text) text = replace_symbols(text, lang=None) text = remove_aux_symbols(text) text = collapse_whitespace(text) return text def _clean_text(text, cleaner_names): for name in cleaner_names: # cleaner = getattr(cleaners, name) cleaner = multilingual_cleaners if not cleaner: raise Exception("Unknown cleaner: %s" % name) text = cleaner(text) return text def _symbols_to_sequence(syms): return [_symbol_to_id[s] for s in syms if _should_keep_symbol(s)] def _arpabet_to_sequence(text): return _symbols_to_sequence(["@" + s for s in text.split()]) def intersperse(sequence, token): result = [token] * (len(sequence) * 2 + 1) result[1::2] = sequence return result def text_to_sequence( text: str, cleaner_names: List[str], custom_symbols: List[str] = None, tp: Dict = None, add_blank: bool = False ) -> List[int]: """Converts a string of text to a sequence of IDs corresponding to the symbols in the text. If `custom_symbols` is provided, it will override the default symbols. Args: text (str): string to convert to a sequence cleaner_names (List[str]): names of the cleaner functions to run the text through tp (Dict): dictionary of character parameters to use a custom character set. add_blank (bool): option to add a blank token between each token. Returns: List[int]: List of integers corresponding to the symbols in the text """ # pylint: disable=global-statement global _symbol_to_id, _symbols if custom_symbols is not None: _symbols = custom_symbols elif tp: _symbols, _ = make_symbols(**tp) _symbol_to_id = {s: i for i, s in enumerate(_symbols)} sequence = [] # Check for curly braces and treat their contents as ARPAbet: while text: m = _CURLY_RE.match(text) if not m: sequence += _symbols_to_sequence(_clean_text(text, cleaner_names)) break sequence += _symbols_to_sequence(_clean_text(m.group(1), cleaner_names)) sequence += _arpabet_to_sequence(m.group(2)) text = m.group(3) if add_blank: sequence = intersperse(sequence, len(_symbols)) # add a blank token (new), whose id number is len(_symbols) return sequence import librosa.util as librosa_util from scipy.signal import get_window def window_sumsquare(window, n_frames, hop_length=200, win_length=800, n_fft=800, dtype=np.float32, norm=None): """ # from librosa 0.6 Compute the sum-square envelope of a window function at a given hop length. This is used to estimate modulation effects induced by windowing observations in short-time fourier transforms. Parameters ---------- window : string, tuple, number, callable, or list-like Window specification, as in `get_window` n_frames : int > 0 The number of analysis frames hop_length : int > 0 The number of samples to advance between frames win_length : [optional] The length of the window function. By default, this matches `n_fft`. n_fft : int > 0 The length of each analysis frame. dtype : np.dtype The data type of the output Returns ------- wss : np.ndarray, shape=`(n_fft + hop_length * (n_frames - 1))` The sum-squared envelope of the window function """ if win_length is None: win_length = n_fft n = n_fft + hop_length * (n_frames - 1) x = np.zeros(n, dtype=dtype) # Compute the squared window at the desired length win_sq = get_window(window, win_length, fftbins=True) win_sq = librosa_util.normalize(win_sq, norm=norm)**2 win_sq = librosa_util.pad_center(win_sq, n_fft) # Fill the envelope for i in range(n_frames): sample = i * hop_length x[sample:min(n, sample + n_fft)] += win_sq[:max(0, min(n_fft, n - sample))] return x def _pad_data(x, length): _pad = 0 assert x.ndim == 1 return np.pad(x, (0, length - x.shape[0]), mode="constant", constant_values=_pad) def _pad_stop_target(x: np.ndarray, length: int, pad_val=1) -> np.ndarray: """Pad stop target array. Args: x (np.ndarray): Stop target array. length (int): Length after padding. pad_val (int, optional): Padding value. Defaults to 1. Returns: np.ndarray: Padded stop target array. """ assert x.ndim == 1 return np.pad(x, (0, length - x.shape[0]), mode="constant", constant_values=pad_val) def _pad_tensor(x, length): _pad = 0.0 assert x.ndim == 2 x = np.pad(x, [[0, 0], [0, length - x.shape[1]]], mode="constant", constant_values=_pad) return x def prepare_tensor(inputs, out_steps): max_len = max((x.shape[1] for x in inputs)) remainder = max_len % out_steps pad_len = max_len + (out_steps - remainder) if remainder > 0 else max_len return np.stack([_pad_tensor(x, pad_len) for x in inputs]) def prepare_data(inputs): max_len = max((len(x) for x in inputs)) return np.stack([_pad_data(x, max_len) for x in inputs]) def prepare_stop_target(inputs, out_steps): """Pad row vectors with 1.""" max_len = max((x.shape[0] for x in inputs)) remainder = max_len % out_steps pad_len = max_len + (out_steps - remainder) if remainder > 0 else max_len return np.stack([_pad_stop_target(x, pad_len) for x in inputs]) def convert_pad_shape(pad_shape): l = pad_shape[::-1] pad_shape = [item for sublist in l for item in sublist] return pad_shape def generate_path(duration, mask): """ duration: [b, t_x] mask: [b, t_x, t_y] """ device = duration.device b, t_x, t_y = mask.shape cum_duration = torch.cumsum(duration, 1) path = torch.zeros(b, t_x, t_y, dtype=mask.dtype).to(device=device) cum_duration_flat = cum_duration.view(b * t_x) path = sequence_mask(cum_duration_flat, t_y).to(mask.dtype) path = path.view(b, t_x, t_y) path = path - F.pad(path, convert_pad_shape([[0, 0], [1, 0], [0, 0]]))[:, :-1] path = path * mask return path def format_time (seconds): time_str = "" if seconds>60*60*24: days = int(seconds/(60*60*24)) time_str += f'{days}d ' seconds -= days*(60*60*24) if seconds>60*60: hours = int(seconds/(60*60)) time_str += f'{hours}h ' seconds -= hours*(60*60) if seconds>60: minutes = int(seconds/(60)) time_str += f'{minutes}m ' seconds -= minutes*(60) if seconds>0: time_str += f'{int(seconds)}s ' return time_str