OpenSound's picture
Upload 33 files
71de706 verified
import typing
import julius
import numpy as np
import torch
from . import util
class DSPMixin:
_original_batch_size = None
_original_num_channels = None
_padded_signal_length = None
def _preprocess_signal_for_windowing(self, window_duration, hop_duration):
self._original_batch_size = self.batch_size
self._original_num_channels = self.num_channels
window_length = int(window_duration * self.sample_rate)
hop_length = int(hop_duration * self.sample_rate)
if window_length % hop_length != 0:
factor = window_length // hop_length
window_length = factor * hop_length
self.zero_pad(hop_length, hop_length)
self._padded_signal_length = self.signal_length
return window_length, hop_length
def windows(
self, window_duration: float, hop_duration: float, preprocess: bool = True
):
"""Generator which yields windows of specified duration from signal with a specified
hop length.
Parameters
----------
window_duration : float
Duration of every window in seconds.
hop_duration : float
Hop between windows in seconds.
preprocess : bool, optional
Whether to preprocess the signal, so that the first sample is in
the middle of the first window, by default True
Yields
------
AudioSignal
Each window is returned as an AudioSignal.
"""
if preprocess:
window_length, hop_length = self._preprocess_signal_for_windowing(
window_duration, hop_duration
)
self.audio_data = self.audio_data.reshape(-1, 1, self.signal_length)
for b in range(self.batch_size):
i = 0
start_idx = i * hop_length
while True:
start_idx = i * hop_length
i += 1
end_idx = start_idx + window_length
if end_idx > self.signal_length:
break
yield self[b, ..., start_idx:end_idx]
def collect_windows(
self, window_duration: float, hop_duration: float, preprocess: bool = True
):
"""Reshapes signal into windows of specified duration from signal with a specified
hop length. Window are placed along the batch dimension. Use with
:py:func:`audiotools.core.dsp.DSPMixin.overlap_and_add` to reconstruct the
original signal.
Parameters
----------
window_duration : float
Duration of every window in seconds.
hop_duration : float
Hop between windows in seconds.
preprocess : bool, optional
Whether to preprocess the signal, so that the first sample is in
the middle of the first window, by default True
Returns
-------
AudioSignal
AudioSignal unfolded with shape ``(nb * nch * num_windows, 1, window_length)``
"""
if preprocess:
window_length, hop_length = self._preprocess_signal_for_windowing(
window_duration, hop_duration
)
# self.audio_data: (nb, nch, nt).
unfolded = torch.nn.functional.unfold(
self.audio_data.reshape(-1, 1, 1, self.signal_length),
kernel_size=(1, window_length),
stride=(1, hop_length),
)
# unfolded: (nb * nch, window_length, num_windows).
# -> (nb * nch * num_windows, 1, window_length)
unfolded = unfolded.permute(0, 2, 1).reshape(-1, 1, window_length)
self.audio_data = unfolded
return self
def overlap_and_add(self, hop_duration: float):
"""Function which takes a list of windows and overlap adds them into a
signal the same length as ``audio_signal``.
Parameters
----------
hop_duration : float
How much to shift for each window
(overlap is window_duration - hop_duration) in seconds.
Returns
-------
AudioSignal
overlap-and-added signal.
"""
hop_length = int(hop_duration * self.sample_rate)
window_length = self.signal_length
nb, nch = self._original_batch_size, self._original_num_channels
unfolded = self.audio_data.reshape(nb * nch, -1, window_length).permute(0, 2, 1)
folded = torch.nn.functional.fold(
unfolded,
output_size=(1, self._padded_signal_length),
kernel_size=(1, window_length),
stride=(1, hop_length),
)
norm = torch.ones_like(unfolded, device=unfolded.device)
norm = torch.nn.functional.fold(
norm,
output_size=(1, self._padded_signal_length),
kernel_size=(1, window_length),
stride=(1, hop_length),
)
folded = folded / norm
folded = folded.reshape(nb, nch, -1)
self.audio_data = folded
self.trim(hop_length, hop_length)
return self
def low_pass(
self, cutoffs: typing.Union[torch.Tensor, np.ndarray, float], zeros: int = 51
):
"""Low-passes the signal in-place. Each item in the batch
can have a different low-pass cutoff, if the input
to this signal is an array or tensor. If a float, all
items are given the same low-pass filter.
Parameters
----------
cutoffs : typing.Union[torch.Tensor, np.ndarray, float]
Cutoff in Hz of low-pass filter.
zeros : int, optional
Number of taps to use in low-pass filter, by default 51
Returns
-------
AudioSignal
Low-passed AudioSignal.
"""
cutoffs = util.ensure_tensor(cutoffs, 2, self.batch_size)
cutoffs = cutoffs / self.sample_rate
filtered = torch.empty_like(self.audio_data)
for i, cutoff in enumerate(cutoffs):
lp_filter = julius.LowPassFilter(cutoff.cpu(), zeros=zeros).to(self.device)
filtered[i] = lp_filter(self.audio_data[i])
self.audio_data = filtered
self.stft_data = None
return self
def high_pass(
self, cutoffs: typing.Union[torch.Tensor, np.ndarray, float], zeros: int = 51
):
"""High-passes the signal in-place. Each item in the batch
can have a different high-pass cutoff, if the input
to this signal is an array or tensor. If a float, all
items are given the same high-pass filter.
Parameters
----------
cutoffs : typing.Union[torch.Tensor, np.ndarray, float]
Cutoff in Hz of high-pass filter.
zeros : int, optional
Number of taps to use in high-pass filter, by default 51
Returns
-------
AudioSignal
High-passed AudioSignal.
"""
cutoffs = util.ensure_tensor(cutoffs, 2, self.batch_size)
cutoffs = cutoffs / self.sample_rate
filtered = torch.empty_like(self.audio_data)
for i, cutoff in enumerate(cutoffs):
hp_filter = julius.HighPassFilter(cutoff.cpu(), zeros=zeros).to(self.device)
filtered[i] = hp_filter(self.audio_data[i])
self.audio_data = filtered
self.stft_data = None
return self
def mask_frequencies(
self,
fmin_hz: typing.Union[torch.Tensor, np.ndarray, float],
fmax_hz: typing.Union[torch.Tensor, np.ndarray, float],
val: float = 0.0,
):
"""Masks frequencies between ``fmin_hz`` and ``fmax_hz``, and fills them
with the value specified by ``val``. Useful for implementing SpecAug.
The min and max can be different for every item in the batch.
Parameters
----------
fmin_hz : typing.Union[torch.Tensor, np.ndarray, float]
Lower end of band to mask out.
fmax_hz : typing.Union[torch.Tensor, np.ndarray, float]
Upper end of band to mask out.
val : float, optional
Value to fill in, by default 0.0
Returns
-------
AudioSignal
Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the
masked audio data.
"""
# SpecAug
mag, phase = self.magnitude, self.phase
fmin_hz = util.ensure_tensor(fmin_hz, ndim=mag.ndim)
fmax_hz = util.ensure_tensor(fmax_hz, ndim=mag.ndim)
assert torch.all(fmin_hz < fmax_hz)
# build mask
nbins = mag.shape[-2]
bins_hz = torch.linspace(0, self.sample_rate / 2, nbins, device=self.device)
bins_hz = bins_hz[None, None, :, None].repeat(
self.batch_size, 1, 1, mag.shape[-1]
)
mask = (fmin_hz <= bins_hz) & (bins_hz < fmax_hz)
mask = mask.to(self.device)
mag = mag.masked_fill(mask, val)
phase = phase.masked_fill(mask, val)
self.stft_data = mag * torch.exp(1j * phase)
return self
def mask_timesteps(
self,
tmin_s: typing.Union[torch.Tensor, np.ndarray, float],
tmax_s: typing.Union[torch.Tensor, np.ndarray, float],
val: float = 0.0,
):
"""Masks timesteps between ``tmin_s`` and ``tmax_s``, and fills them
with the value specified by ``val``. Useful for implementing SpecAug.
The min and max can be different for every item in the batch.
Parameters
----------
tmin_s : typing.Union[torch.Tensor, np.ndarray, float]
Lower end of timesteps to mask out.
tmax_s : typing.Union[torch.Tensor, np.ndarray, float]
Upper end of timesteps to mask out.
val : float, optional
Value to fill in, by default 0.0
Returns
-------
AudioSignal
Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the
masked audio data.
"""
# SpecAug
mag, phase = self.magnitude, self.phase
tmin_s = util.ensure_tensor(tmin_s, ndim=mag.ndim)
tmax_s = util.ensure_tensor(tmax_s, ndim=mag.ndim)
assert torch.all(tmin_s < tmax_s)
# build mask
nt = mag.shape[-1]
bins_t = torch.linspace(0, self.signal_duration, nt, device=self.device)
bins_t = bins_t[None, None, None, :].repeat(
self.batch_size, 1, mag.shape[-2], 1
)
mask = (tmin_s <= bins_t) & (bins_t < tmax_s)
mag = mag.masked_fill(mask, val)
phase = phase.masked_fill(mask, val)
self.stft_data = mag * torch.exp(1j * phase)
return self
def mask_low_magnitudes(
self, db_cutoff: typing.Union[torch.Tensor, np.ndarray, float], val: float = 0.0
):
"""Mask away magnitudes below a specified threshold, which
can be different for every item in the batch.
Parameters
----------
db_cutoff : typing.Union[torch.Tensor, np.ndarray, float]
Decibel value for which things below it will be masked away.
val : float, optional
Value to fill in for masked portions, by default 0.0
Returns
-------
AudioSignal
Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the
masked audio data.
"""
mag = self.magnitude
log_mag = self.log_magnitude()
db_cutoff = util.ensure_tensor(db_cutoff, ndim=mag.ndim)
mask = log_mag < db_cutoff
mag = mag.masked_fill(mask, val)
self.magnitude = mag
return self
def shift_phase(self, shift: typing.Union[torch.Tensor, np.ndarray, float]):
"""Shifts the phase by a constant value.
Parameters
----------
shift : typing.Union[torch.Tensor, np.ndarray, float]
What to shift the phase by.
Returns
-------
AudioSignal
Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the
masked audio data.
"""
shift = util.ensure_tensor(shift, ndim=self.phase.ndim)
self.phase = self.phase + shift
return self
def corrupt_phase(self, scale: typing.Union[torch.Tensor, np.ndarray, float]):
"""Corrupts the phase randomly by some scaled value.
Parameters
----------
scale : typing.Union[torch.Tensor, np.ndarray, float]
Standard deviation of noise to add to the phase.
Returns
-------
AudioSignal
Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the
masked audio data.
"""
scale = util.ensure_tensor(scale, ndim=self.phase.ndim)
self.phase = self.phase + scale * torch.randn_like(self.phase)
return self
def preemphasis(self, coef: float = 0.85):
"""Applies pre-emphasis to audio signal.
Parameters
----------
coef : float, optional
How much pre-emphasis to apply, lower values do less. 0 does nothing.
by default 0.85
Returns
-------
AudioSignal
Pre-emphasized signal.
"""
kernel = torch.tensor([1, -coef, 0]).view(1, 1, -1).to(self.device)
x = self.audio_data.reshape(-1, 1, self.signal_length)
x = torch.nn.functional.conv1d(x, kernel, padding=1)
self.audio_data = x.reshape(*self.audio_data.shape)
return self