import torch import torchaudio import torch.nn.functional as F import numpy as np from scipy.signal import get_window import librosa.util as librosa_util from librosa.util import pad_center, tiny from librosa.filters import mel as librosa_mel_fn import io # spectrogram to mel class STFT(torch.nn.Module): """adapted from Prem Seetharaman's https://github.com/pseeth/pytorch-stft""" def __init__(self, filter_length, hop_length, win_length, window="hann"): super(STFT, self).__init__() self.filter_length = filter_length self.hop_length = hop_length self.win_length = win_length self.window = window self.forward_transform = None scale = self.filter_length / self.hop_length fourier_basis = np.fft.fft(np.eye(self.filter_length)) cutoff = int((self.filter_length / 2 + 1)) fourier_basis = np.vstack( [np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])] ) forward_basis = torch.FloatTensor(fourier_basis[:, None, :]) inverse_basis = torch.FloatTensor( np.linalg.pinv(scale * fourier_basis).T[:, None, :] ) if window is not None: assert filter_length >= win_length # get window and zero center pad it to filter_length fft_window = get_window(window, win_length, fftbins=True) fft_window = pad_center(fft_window, filter_length) fft_window = torch.from_numpy(fft_window).float() # window the bases forward_basis *= fft_window inverse_basis *= fft_window self.register_buffer("forward_basis", forward_basis.float()) self.register_buffer("inverse_basis", inverse_basis.float()) def transform(self, input_data): num_batches = input_data.size(0) num_samples = input_data.size(1) self.num_samples = num_samples # similar to librosa, reflect-pad the input input_data = input_data.view(num_batches, 1, num_samples) input_data = F.pad( input_data.unsqueeze(1), (int(self.filter_length / 2), int(self.filter_length / 2), 0, 0), mode="reflect", ) input_data = input_data.squeeze(1) forward_transform = F.conv1d( input_data, torch.autograd.Variable(self.forward_basis, requires_grad=False), stride=self.hop_length, padding=0, ).cpu() cutoff = int((self.filter_length / 2) + 1) real_part = forward_transform[:, :cutoff, :] imag_part = forward_transform[:, cutoff:, :] magnitude = torch.sqrt(real_part**2 + imag_part**2) phase = torch.autograd.Variable(torch.atan2(imag_part.data, real_part.data)) return magnitude, phase def inverse(self, magnitude, phase): recombine_magnitude_phase = torch.cat( [magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1 ) inverse_transform = F.conv_transpose1d( recombine_magnitude_phase, torch.autograd.Variable(self.inverse_basis, requires_grad=False), stride=self.hop_length, padding=0, ) if self.window is not None: window_sum = window_sumsquare( self.window, magnitude.size(-1), hop_length=self.hop_length, win_length=self.win_length, n_fft=self.filter_length, dtype=np.float32, ) # remove modulation effects approx_nonzero_indices = torch.from_numpy( np.where(window_sum > tiny(window_sum))[0] ) window_sum = torch.autograd.Variable( torch.from_numpy(window_sum), requires_grad=False ) window_sum = window_sum inverse_transform[:, :, approx_nonzero_indices] /= window_sum[ approx_nonzero_indices ] # scale by hop ratio inverse_transform *= float(self.filter_length) / self.hop_length inverse_transform = inverse_transform[:, :, int(self.filter_length / 2) :] inverse_transform = inverse_transform[:, :, : -int(self.filter_length / 2) :] return inverse_transform def forward(self, input_data): self.magnitude, self.phase = self.transform(input_data) reconstruction = self.inverse(self.magnitude, self.phase) return reconstruction def window_sumsquare( window, n_frames, hop_length, win_length, n_fft, dtype=np.float32, norm=None, ): """ # from librosa 0.6 Compute the sum-square envelope of a window function at a given hop length. This is used to estimate modulation effects induced by windowing observations in short-time fourier transforms. Parameters ---------- window : string, tuple, number, callable, or list-like Window specification, as in `get_window` n_frames : int > 0 The number of analysis frames hop_length : int > 0 The number of samples to advance between frames win_length : [optional] The length of the window function. By default, this matches `n_fft`. n_fft : int > 0 The length of each analysis frame. dtype : np.dtype The data type of the output Returns ------- wss : np.ndarray, shape=`(n_fft + hop_length * (n_frames - 1))` The sum-squared envelope of the window function """ if win_length is None: win_length = n_fft n = n_fft + hop_length * (n_frames - 1) x = np.zeros(n, dtype=dtype) # Compute the squared window at the desired length win_sq = get_window(window, win_length, fftbins=True) win_sq = librosa_util.normalize(win_sq, norm=norm) ** 2 win_sq = librosa_util.pad_center(win_sq, n_fft) # Fill the envelope for i in range(n_frames): sample = i * hop_length x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))] return x def griffin_lim(magnitudes, stft_fn, n_iters=30): """ PARAMS ------ magnitudes: spectrogram magnitudes stft_fn: STFT class with transform (STFT) and inverse (ISTFT) methods """ angles = np.angle(np.exp(2j * np.pi * np.random.rand(*magnitudes.size()))) angles = angles.astype(np.float32) angles = torch.autograd.Variable(torch.from_numpy(angles)) signal = stft_fn.inverse(magnitudes, angles).squeeze(1) for i in range(n_iters): _, angles = stft_fn.transform(signal) signal = stft_fn.inverse(magnitudes, angles).squeeze(1) return signal def dynamic_range_compression(x, normalize_fun=torch.log, C=1, clip_val=1e-5): """ PARAMS ------ C: compression factor """ return normalize_fun(torch.clamp(x, min=clip_val) * C) def dynamic_range_decompression(x, C=1): """ PARAMS ------ C: compression factor used to compress """ return torch.exp(x) / C class TacotronSTFT(torch.nn.Module): def __init__( self, filter_length, hop_length, win_length, n_mel_channels, sampling_rate, mel_fmin, mel_fmax, ): super(TacotronSTFT, self).__init__() self.n_mel_channels = n_mel_channels self.sampling_rate = sampling_rate self.stft_fn = STFT(filter_length, hop_length, win_length) mel_basis = librosa_mel_fn( sampling_rate, filter_length, n_mel_channels, mel_fmin, mel_fmax ) mel_basis = torch.from_numpy(mel_basis).float() self.register_buffer("mel_basis", mel_basis) def spectral_normalize(self, magnitudes, normalize_fun): output = dynamic_range_compression(magnitudes, normalize_fun) return output def spectral_de_normalize(self, magnitudes): output = dynamic_range_decompression(magnitudes) return output def mel_spectrogram(self, y, normalize_fun=torch.log): """Computes mel-spectrograms from a batch of waves PARAMS ------ y: Variable(torch.FloatTensor) with shape (B, T) in range [-1, 1] RETURNS ------- mel_output: torch.FloatTensor of shape (B, n_mel_channels, T) """ assert torch.min(y.data) >= -1, torch.min(y.data) assert torch.max(y.data) <= 1, torch.max(y.data) magnitudes, phases = self.stft_fn.transform(y) magnitudes = magnitudes.data mel_output = torch.matmul(self.mel_basis, magnitudes) mel_output = self.spectral_normalize(mel_output, normalize_fun) energy = torch.norm(magnitudes, dim=1) log_magnitudes = self.spectral_normalize(magnitudes, normalize_fun) return mel_output, log_magnitudes, energy def pad_wav(waveform, segment_length): waveform_length = waveform.shape[-1] assert waveform_length > 100, "Waveform is too short, %s" % waveform_length if segment_length is None or waveform_length == segment_length: return waveform elif waveform_length > segment_length: return waveform[:,:segment_length] elif waveform_length < segment_length: temp_wav = np.zeros((1, segment_length)) temp_wav[:, :waveform_length] = waveform return temp_wav def normalize_wav(waveform): waveform = waveform - np.mean(waveform) waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) return waveform * 0.5 def _pad_spec(fbank, target_length=1024): n_frames = fbank.shape[0] p = target_length - n_frames # cut and pad if p > 0: m = torch.nn.ZeroPad2d((0, 0, 0, p)) fbank = m(fbank) elif p < 0: fbank = fbank[0:target_length, :] if fbank.size(-1) % 2 != 0: fbank = fbank[..., :-1] return fbank def get_mel_from_wav(audio, _stft): audio = torch.clip(torch.FloatTensor(audio).unsqueeze(0), -1, 1) audio = torch.autograd.Variable(audio, requires_grad=False) melspec, log_magnitudes_stft, energy = _stft.mel_spectrogram(audio) melspec = torch.squeeze(melspec, 0).numpy().astype(np.float32) log_magnitudes_stft = ( torch.squeeze(log_magnitudes_stft, 0).numpy().astype(np.float32) ) energy = torch.squeeze(energy, 0).numpy().astype(np.float32) return melspec, log_magnitudes_stft, energy def read_wav_file_io(bytes): # waveform, sr = librosa.load(filename, sr=None, mono=True) # 4 times slower waveform, sr = torchaudio.load(bytes, format='mp4') # Faster!!! waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=16000) # waveform = waveform.numpy()[0, ...] # waveform = normalize_wav(waveform) # waveform = waveform[None, ...] # waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) # waveform = 0.5 * waveform return waveform def load_audio(bytes, sample_rate=16000): waveform, sr = torchaudio.load(bytes, format='mp4') waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=sample_rate) return waveform def read_wav_file(filename): # waveform, sr = librosa.load(filename, sr=None, mono=True) # 4 times slower waveform, sr = torchaudio.load(filename) # Faster!!! waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=16000) waveform = waveform.numpy()[0, ...] waveform = normalize_wav(waveform) waveform = waveform[None, ...] waveform = waveform / np.max(np.abs(waveform)) waveform = 0.5 * waveform return waveform def norm_wav_tensor(waveform: torch.FloatTensor): waveform = waveform.numpy()[0, ...] waveform = normalize_wav(waveform) waveform = waveform[None, ...] waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) waveform = 0.5 * waveform return waveform def wav_to_fbank(filename, target_length=1024, fn_STFT=None): if fn_STFT is None: fn_STFT = TacotronSTFT( 1024, # filter_length 160, # hop_length 1024, # win_length 64, # n_mel 16000, # sample_rate 0, # fmin 8000, # fmax ) # mixup waveform = read_wav_file(filename, target_length * 160) # hop size is 160 waveform = waveform[0, ...] waveform = torch.FloatTensor(waveform) fbank, log_magnitudes_stft, energy = get_mel_from_wav(waveform, fn_STFT) fbank = torch.FloatTensor(fbank.T) log_magnitudes_stft = torch.FloatTensor(log_magnitudes_stft.T) fbank, log_magnitudes_stft = _pad_spec(fbank, target_length), _pad_spec( log_magnitudes_stft, target_length ) return fbank, log_magnitudes_stft, waveform def wav_tensor_to_fbank(waveform, target_length=512, fn_STFT=None): if fn_STFT is None: fn_STFT = TacotronSTFT( 1024, # filter_length 160, # hop_length 1024, # win_length 256, # n_mel 16000, # sample_rate 0, # fmin 8000, # fmax ) # In practice used fbank, log_magnitudes_stft, energy = get_mel_from_wav(waveform, fn_STFT) fbank = torch.FloatTensor(fbank.T) log_magnitudes_stft = torch.FloatTensor(log_magnitudes_stft.T) fbank, log_magnitudes_stft = _pad_spec(fbank, target_length), _pad_spec( log_magnitudes_stft, target_length ) return fbank