""" Audio effects for data augmentation. Several audio effects can be combined into an augmentation chain. Important note: We assume that the parallelization during training is done using multi-processing and not multi-threading. Hence, we do not need the `@sox.sox_context()` decorators as discussed in this [thread](https://github.com/pseeth/soxbindings/issues/4). AI Music Technology Group, Sony Group Corporation AI Speech and Sound Group, Sony Europe This implementation originally belongs to Sony Group Corporation, which has been introduced in the work "Automatic music mixing with deep learning and out-of-domain data". Original repo link: https://github.com/sony/FxNorm-automix This work modifies a few implementations from the original repo to suit the task. """ from itertools import permutations import logging import numpy as np import pymixconsole as pymc from pymixconsole.parameter import Parameter from pymixconsole.parameter_list import ParameterList from pymixconsole.processor import Processor from random import shuffle from scipy.signal import oaconvolve import soxbindings as sox from typing import List, Optional, Tuple, Union from numba import jit # prevent pysox from logging warnings regarding non-opimal timestretch factors logging.getLogger('sox').setLevel(logging.ERROR) # Monkey-Patch `Processor` for convenience # (a) Allow `None` as blocksize if processor can work on variable-length audio def new_init(self, name, parameters, block_size, sample_rate, dtype='float32'): """ Initialize processor. Args: self: Reference to object name (str): Name of processor. parameters (parameter_list): Parameters for this processor. block_size (int): Size of blocks for blockwise processing. Can also be `None` if full audio can be processed at once. sample_rate (int): Sample rate of input audio. Use `None` if effect is independent of this value. dtype (str): data type of samples """ self.name = name self.parameters = parameters self.block_size = block_size self.sample_rate = sample_rate self.dtype = dtype # (b) make code simpler def new_update(self, parameter_name): """ Update processor after randomization of parameters. Args: self: Reference to object. parameter_name (str): Parameter whose value has changed. """ pass # (c) representation for nice print def new_repr(self): """ Create human-readable representation. Args: self: Reference to object. Returns: string representation of object. """ return f'Processor(name={self.name!r}, parameters={self.parameters!r}' Processor.__init__ = new_init Processor.__repr__ = new_repr Processor.update = new_update class AugmentationChain: """Basic audio Fx chain which is used for data augmentation.""" def __init__(self, fxs: Optional[List[Tuple[Union[Processor, 'AugmentationChain'], float, bool]]] = [], shuffle: Optional[bool] = False, parallel: Optional[bool] = False, parallel_weight_factor = None, randomize_param_value=True): """ Create augmentation chain from the dictionary `fxs`. Args: fxs (list of tuples): First tuple element is an instances of `pymc.processor` or `AugmentationChain` that we want to use for data augmentation. Second element gives probability that effect should be applied. Third element defines, whether the processed signal is normalized by the RMS of the input. shuffle (bool): If `True` then order of Fx are changed whenever chain is applied. """ self.fxs = fxs self.shuffle = shuffle self.parallel = parallel self.parallel_weight_factor = parallel_weight_factor self.randomize_param_value = randomize_param_value def apply_processor(self, x, processor: Processor, rms_normalize): """ Pass audio in `x` through `processor` and output the respective processed audio. Args: x (Numpy array): Input audio of shape `n_samples` x `n_channels`. processor (Processor): Audio effect that we want to apply. rms_normalize (bool): If `True`, the processed signal is normalized by the RMS of the signal. Returns: Numpy array: Processed audio of shape `n_samples` x `n_channels` (same size as `x') """ n_samples_input = x.shape[0] if processor.block_size is None: y = processor.process(x) else: # make sure that n_samples is a multiple of `processor.block_size` if x.shape[0] % processor.block_size != 0: n_pad = processor.block_size - x.shape[0] % processor.block_size x = np.pad(x, ((0, n_pad), (0, 0)), mode='reflective') y = np.zeros_like(x) for idx in range(0, x.shape[0], processor.block_size): y[idx:idx+processor.block_size, :] = processor.process(x[idx:idx+processor.block_size, :]) if rms_normalize: # normalize output energy such that it is the same as the input energy scale = np.sqrt(np.mean(np.square(x)) / np.maximum(1e-7, np.mean(np.square(y)))) y *= scale # return audio of same length as x return y[:n_samples_input, :] def apply_same_processor(self, x_list, processor: Processor, rms_normalize): for i in range(len(x_list)): x_list[i] = self.apply_processor(x_list[i], processor, rms_normalize) return x_list def __call__(self, x_list): """ Apply the same augmentation chain to audio tracks in list `x_list`. Args: x_list (list of Numpy array) : List of audio samples of shape `n_samples` x `n_channels`. Returns: y_list (list of Numpy array) : List of processed audio of same shape as `x_list` where the same effects have been applied. """ # randomly shuffle effect order if `self.shuffle` is True if self.shuffle: shuffle(self.fxs) # apply effects with probabilities given in `self.fxs` y_list = x_list.copy() for fx, p, rms_normalize in self.fxs: if np.random.rand() < p: if isinstance(fx, Processor): # randomize all effect parameters (also calls `update()` for each processor) if self.randomize_param_value: fx.randomize() else: fx.update(None) # apply processor y_list = self.apply_same_processor(y_list, fx, rms_normalize) else: y_list = fx(y_list) if self.parallel: # weighting factor of input signal in the range of (0.0 ~ 0.5) weight_in = self.parallel_weight_factor if self.parallel_weight_factor else np.random.rand() / 2. for i in range(len(y_list)): y_list[i] = weight_in*x_list[i] + (1-weight_in)*y_list[i] return y_list def __repr__(self): """ Human-readable representation. Returns: string representation of object. """ return f'AugmentationChain(fxs={self.fxs!r}, shuffle={self.shuffle!r})' # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% DISTORTION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% def hard_clip(x, threshold_dB, drive): """ Hard clip distortion. Args: x: input audio threshold_dB: threshold drive: drive Returns: (Numpy array): distorted audio """ drive_linear = np.power(10., drive / 20.).astype(np.float32) threshold_linear = 10. ** (threshold_dB / 20.) return np.clip(x * drive_linear, -threshold_linear, threshold_linear) def overdrive(x, drive, colour, sample_rate): """ Overdrive distortion. Args: x: input audio drive: Controls the amount of distortion (dB). colour: Controls the amount of even harmonic content in the output(dB) sample_rate: sampling rate Returns: (Numpy array): distorted audio """ scale = np.max(np.abs(x)) if scale > 0.9: clips = True x = x * (0.9 / scale) else: clips = False tfm = sox.Transformer() tfm.overdrive(gain_db=drive, colour=colour) y = tfm.build_array(input_array=x, sample_rate_in=sample_rate).astype(np.float32) if clips: y *= scale / 0.9 # rescale output to original scale return y def hyperbolic_tangent(x, drive): """ Hyperbolic Tanh distortion. Args: x: input audio drive: drive Returns: (Numpy array): distorted audio """ drive_linear = np.power(10., drive / 20.).astype(np.float32) return np.tanh(2. * x * drive_linear) def soft_sine(x, drive): """ Soft sine distortion. Args: x: input audio drive: drive Returns: (Numpy array): distorted audio """ drive_linear = np.power(10., drive / 20.).astype(np.float32) y = np.clip(x * drive_linear, -np.pi/4.0, np.pi/4.0) return np.sin(2. * y) def bit_crusher(x, bits): """ Bit crusher distortion. Args: x: input audio bits: bits Returns: (Numpy array): distorted audio """ return np.rint(x * (2 ** bits)) / (2 ** bits) class Distortion(Processor): """ Distortion processor. Processor parameters: mode (str): Currently supports the following five modes: hard_clip, waveshaper, soft_sine, tanh, bit_crusher. Each mode has different parameters such as threshold, factor, or bits. threshold (float): threshold drive (float): drive factor (float): factor limit_range (float): limit range bits (int): bits """ def __init__(self, sample_rate, name='Distortion', parameters=None): """ Initialize processor. Args: sample_rate (int): sample rate. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name, None, block_size=None, sample_rate=sample_rate) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('mode', 'hard_clip', 'string', options=['hard_clip', 'overdrive', 'soft_sine', 'tanh', 'bit_crusher'])) self.parameters.add(Parameter('threshold', 0.0, 'float', units='dB', maximum=0.0, minimum=-20.0)) self.parameters.add(Parameter('drive', 0.0, 'float', units='dB', maximum=20.0, minimum=0.0)) self.parameters.add(Parameter('colour', 20.0, 'float', maximum=100.0, minimum=0.0)) self.parameters.add(Parameter('bits', 12, 'int', maximum=12, minimum=8)) def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): distorted audio of size `n_samples x n_channels`. """ if self.parameters.mode.value == 'hard_clip': y = hard_clip(x, self.parameters.threshold.value, self.parameters.drive.value) elif self.parameters.mode.value == 'overdrive': y = overdrive(x, self.parameters.drive.value, self.parameters.colour.value, self.sample_rate) elif self.parameters.mode.value == 'soft_sine': y = soft_sine(x, self.parameters.drive.value) elif self.parameters.mode.value == 'tanh': y = hyperbolic_tangent(x, self.parameters.drive.value) elif self.parameters.mode.value == 'bit_crusher': y = bit_crusher(x, self.parameters.bits.value) # If the output has low amplitude, (some distortion settigns can "crush" down the amplitude) # Then it`s normalised to the input's amplitude x_max = np.max(np.abs(x)) + 1e-8 o_max = np.max(np.abs(y)) + 1e-8 if x_max > o_max: y = y*(x_max/o_max) return y # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% EQUALISER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class Equaliser(Processor): """ Five band parametric equaliser (two shelves and three central bands). All gains are set in dB values and range from `MIN_GAIN` dB to `MAX_GAIN` dB. This processor is implemented as cascade of five biquad IIR filters that are implemented using the infamous cookbook formulae from RBJ. Processor parameters: low_shelf_gain (float), low_shelf_freq (float) first_band_gain (float), first_band_freq (float), first_band_q (float) second_band_gain (float), second_band_freq (float), second_band_q (float) third_band_gain (float), third_band_freq (float), third_band_q (float) original from https://github.com/csteinmetz1/pymixconsole/blob/master/pymixconsole/processors/equaliser.py """ def __init__(self, n_channels, sample_rate, gain_range=(-15.0, 15.0), q_range=(0.1, 2.0), bands=['low_shelf', 'first_band', 'second_band', 'third_band', 'high_shelf'], hard_clip=False, name='Equaliser', parameters=None): """ Initialize processor. Args: n_channels (int): Number of audio channels. sample_rate (int): Sample rate of audio. gain_range (tuple of floats): minimum and maximum gain that can be used. q_range (tuple of floats): minimum and maximum q value. hard_clip (bool): Whether we clip to [-1, 1.] after processing. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name, parameters=parameters, block_size=None, sample_rate=sample_rate) self.n_channels = n_channels MIN_GAIN, MAX_GAIN = gain_range MIN_Q, MAX_Q = q_range if not parameters: self.parameters = ParameterList() # low shelf parameters ------- self.parameters.add(Parameter('low_shelf_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) self.parameters.add(Parameter('low_shelf_freq', 80.0, 'float', minimum=30.0, maximum=200.0)) # first band parameters ------ self.parameters.add(Parameter('first_band_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) self.parameters.add(Parameter('first_band_freq', 400.0, 'float', minimum=200.0, maximum=1000.0)) self.parameters.add(Parameter('first_band_q', 0.7, 'float', minimum=MIN_Q, maximum=MAX_Q)) # second band parameters ----- self.parameters.add(Parameter('second_band_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) self.parameters.add(Parameter('second_band_freq', 2000.0, 'float', minimum=1000.0, maximum=3000.0)) self.parameters.add(Parameter('second_band_q', 0.7, 'float', minimum=MIN_Q, maximum=MAX_Q)) # third band parameters ------ self.parameters.add(Parameter('third_band_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) self.parameters.add(Parameter('third_band_freq', 4000.0, 'float', minimum=3000.0, maximum=8000.0)) self.parameters.add(Parameter('third_band_q', 0.7, 'float', minimum=MIN_Q, maximum=MAX_Q)) # high shelf parameters ------ self.parameters.add(Parameter('high_shelf_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) self.parameters.add(Parameter('high_shelf_freq', 8000.0, 'float', minimum=5000.0, maximum=10000.0)) self.bands = bands self.filters = self.setup_filters() self.hard_clip = hard_clip def setup_filters(self): """ Create IIR filters. Returns: IIR filters """ filters = {} for band in self.bands: G = getattr(self.parameters, band + '_gain').value fc = getattr(self.parameters, band + '_freq').value rate = self.sample_rate if band in ['low_shelf', 'high_shelf']: Q = 0.707 filter_type = band else: Q = getattr(self.parameters, band + '_q').value filter_type = 'peaking' filters[band] = pymc.components.iirfilter.IIRfilter(G, Q, fc, rate, filter_type, n_channels=self.n_channels) return filters def update_filter(self, band): """ Update filters. Args: band (str): Band that should be updated. """ self.filters[band].G = getattr(self.parameters, band + '_gain').value self.filters[band].fc = getattr(self.parameters, band + '_freq').value self.filters[band].rate = self.sample_rate if band in ['first_band', 'second_band', 'third_band']: self.filters[band].Q = getattr(self.parameters, band + '_q').value def update(self, parameter_name=None): """ Update processor after randomization of parameters. Args: parameter_name (str): Parameter whose value has changed. """ if parameter_name is not None: bands = ['_'.join(parameter_name.split('_')[:2])] else: bands = self.bands for band in bands: self.update_filter(band) for _band, iirfilter in self.filters.items(): iirfilter.reset_state() def reset_state(self): """Reset state.""" for _band, iirfilter in self.filters.items(): iirfilter.reset_state() def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): equalized audio of size `n_samples x n_channels`. """ for _band, iirfilter in self.filters.items(): iirfilter.reset_state() x = iirfilter.apply_filter(x) if self.hard_clip: x = np.clip(x, -1.0, 1.0) # make sure that we have float32 as IIR filtering returns float64 x = x.astype(np.float32) # make sure that we have two dimensions (if `n_channels == 1`) if x.ndim == 1: x = x[:, np.newaxis] return x # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% COMPRESSOR %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @jit(nopython=True) def compressor_process(x, threshold, attack_time, release_time, ratio, makeup_gain, sample_rate, yL_prev): """ Apply compressor. Args: x (Numpy array): audio data. threshold: threshold in dB. attack_time: attack_time in ms. release_time: release_time in ms. ratio: ratio. makeup_gain: makeup_gain. sample_rate: sample rate. yL_prev: internal state of the envelop gain. Returns: compressed audio. """ M = x.shape[0] x_g = np.zeros(M) x_l = np.zeros(M) y_g = np.zeros(M) y_l = np.zeros(M) c = np.zeros(M) yL_prev = 0. alpha_attack = np.exp(-1/(0.001 * sample_rate * attack_time)) alpha_release = np.exp(-1/(0.001 * sample_rate * release_time)) for i in np.arange(M): if np.abs(x[i]) < 0.000001: x_g[i] = -120.0 else: x_g[i] = 20 * np.log10(np.abs(x[i])) if ratio > 1: if x_g[i] >= threshold: y_g[i] = threshold + (x_g[i] - threshold) / ratio else: y_g[i] = x_g[i] elif ratio < 1: if x_g[i] <= threshold: y_g[i] = threshold + (x_g[i] - threshold) / (1/ratio) else: y_g[i] = x_g[i] x_l[i] = x_g[i] - y_g[i] if x_l[i] > yL_prev: y_l[i] = alpha_attack * yL_prev + (1 - alpha_attack) * x_l[i] else: y_l[i] = alpha_release * yL_prev + (1 - alpha_release) * x_l[i] c[i] = np.power(10.0, (makeup_gain - y_l[i]) / 20.0) yL_prev = y_l[i] y = x * c return y, yL_prev class Compressor(Processor): """ Single band stereo dynamic range compressor. Processor parameters: threshold (float) attack_time (float) release_time (float) ratio (float) makeup_gain (float) """ def __init__(self, sample_rate, name='Compressor', parameters=None): """ Initialize processor. Args: sample_rate (int): Sample rate of input audio. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('threshold', -20.0, 'float', units='dB', minimum=-80.0, maximum=-5.0)) self.parameters.add(Parameter('attack_time', 2.0, 'float', units='ms', minimum=1., maximum=20.0)) self.parameters.add(Parameter('release_time', 100.0, 'float', units='ms', minimum=50.0, maximum=500.0)) self.parameters.add(Parameter('ratio', 4.0, 'float', minimum=4., maximum=40.0)) # we remove makeup_gain parameter inside the Compressor # store internal state (for block-wise processing) self.yL_prev = None def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): compressed audio of size `n_samples x n_channels`. """ if self.yL_prev is None: self.yL_prev = [0.] * x.shape[1] if not self.parameters.threshold.value == 0.0 or not self.parameters.ratio.value == 1.0: y = np.zeros_like(x) for ch in range(x.shape[1]): y[:, ch], self.yL_prev[ch] = compressor_process(x[:, ch], self.parameters.threshold.value, self.parameters.attack_time.value, self.parameters.release_time.value, self.parameters.ratio.value, 0.0, # makeup_gain = 0 self.sample_rate, self.yL_prev[ch]) else: y = x return y def update(self, parameter_name=None): """ Update processor after randomization of parameters. Args: parameter_name (str): Parameter whose value has changed. """ self.yL_prev = None # %%%%%%%%%%%%%%%%%%%%%%%%%% CONVOLUTIONAL REVERB %%%%%%%%%%%%%%%%%%%%%%%%%%%%% class ConvolutionalReverb(Processor): """ Convolutional Reverb. Processor parameters: wet_dry (float): Wet/dry ratio. decay (float): Applies a fade out to the impulse response. pre_delay (float): Value in ms. Shifts the IR in time and allows. A positive value produces a traditional delay between the dry signal and the wet. A negative delay is, in reality, zero delay, but effectively trims off the start of IR, so the reverb response begins at a point further in. """ def __init__(self, impulse_responses, sample_rate, name='ConvolutionalReverb', parameters=None): """ Initialize processor. Args: impulse_responses (list): List with impulse responses created by `common_dataprocessing.create_dataset` sample_rate (int): Sample rate that we should assume (used for fade-out computation) name (str): Name of processor. parameters (parameter_list): Parameters for this processor. Raises: ValueError: if no impulse responses are provided. """ super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) if impulse_responses is None: raise ValueError('List of impulse responses must be provided for ConvolutionalReverb processor.') self.impulse_responses = impulse_responses if not parameters: self.parameters = ParameterList() self.max_ir_num = len(max(impulse_responses, key=len)) self.parameters.add(Parameter('index', 0, 'int', minimum=0, maximum=len(impulse_responses))) self.parameters.add(Parameter('index_ir', 0, 'int', minimum=0, maximum=self.max_ir_num)) self.parameters.add(Parameter('wet', 1.0, 'float', minimum=1.0, maximum=1.0)) self.parameters.add(Parameter('dry', 0.0, 'float', minimum=0.0, maximum=0.0)) self.parameters.add(Parameter('decay', 1.0, 'float', minimum=1.0, maximum=1.0)) self.parameters.add(Parameter('pre_delay', 0, 'int', units='ms', minimum=0, maximum=0)) def update(self, parameter_name=None): """ Update processor after randomization of parameters. Args: parameter_name (str): Parameter whose value has changed. """ # we sample IR with a uniform random distribution according to RT60 values chosen_ir_duration = self.impulse_responses[self.parameters.index.value] chosen_ir_idx = self.parameters.index_ir.value % len(chosen_ir_duration) self.h = np.copy(chosen_ir_duration[chosen_ir_idx]['impulse_response']()) # fade out the impulse based on the decay setting (starting from peak value) if self.parameters.decay.value < 1.: idx_peak = np.argmax(np.max(np.abs(self.h), axis=1), axis=0) fstart = np.minimum(self.h.shape[0], idx_peak + int(self.parameters.decay.value * (self.h.shape[0] - idx_peak))) fstop = np.minimum(self.h.shape[0], fstart + int(0.020*self.sample_rate)) # constant 20 ms fade out flen = fstop - fstart fade = np.arange(1, flen+1, dtype=self.dtype)/flen fade = np.power(0.1, fade * 5) self.h[fstart:fstop, :] *= fade[:, np.newaxis] self.h = self.h[:fstop] def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): reverbed audio of size `n_samples x n_channels`. """ # reshape IR to the correct size n_channels = x.shape[1] if self.h.shape[1] == 1 and n_channels > 1: self.h = np.hstack([self.h] * n_channels) # repeat mono IR for multi-channel input if self.h.shape[1] > 1 and n_channels == 1: self.h = self.h[:, np.random.randint(self.h.shape[1]), np.newaxis] # randomly choose one IR channel if self.parameters.wet.value == 0.0: return x else: # perform convolution to get wet signal y = oaconvolve(x, self.h, mode='full', axes=0) # cut out wet signal (compensating for the delay that the IR is introducing + predelay) idx = np.argmax(np.max(np.abs(self.h), axis=1), axis=0) idx += int(0.001 * np.abs(self.parameters.pre_delay.value) * self.sample_rate) idx = np.clip(idx, 0, self.h.shape[0]-1) y = y[idx:idx+x.shape[0], :] # return weighted sum of dry and wet signal return self.parameters.dry.value * x + self.parameters.wet.value * y # %%%%%%%%%%%%%%%%%%%%%%%%%%%%% HAAS EFFECT %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% def haas_process(x, delay, feedback, wet_channel): """ Add Haas effect to audio. Args: x (Numpy array): input audio. delay: Delay that we apply to one of the channels (in samples). feedback: Feedback value. wet_channel: Which channel we process (`left` or `right`). Returns: (Numpy array): Audio with Haas effect. """ y = np.copy(x) if wet_channel == 'left': y[:, 0] += feedback * np.roll(x[:, 0], delay) elif wet_channel == 'right': y[:, 1] += feedback * np.roll(x[:, 1], delay) return y class Haas(Processor): """ Haas Effect Processor. Randomly selects one channel and applies a short delay to it. Processor parameters: delay (int) feedback (float) wet_channel (string) """ def __init__(self, sample_rate, delay_range=(-0.040, 0.040), name='Haas', parameters=None, ): """ Initialize processor. Args: sample_rate (int): Sample rate of input audio. delay_range (tuple of floats): minimum/maximum delay for Haas effect. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('delay', int(delay_range[1] * sample_rate), 'int', units='samples', minimum=int(delay_range[0] * sample_rate), maximum=int(delay_range[1] * sample_rate))) self.parameters.add(Parameter('feedback', 0.35, 'float', minimum=0.33, maximum=0.66)) self.parameters.add(Parameter('wet_channel', 'left', 'string', options=['left', 'right'])) def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): audio with Haas effect of size `n_samples x n_channels`. """ assert x.shape[1] == 1 or x.shape[1] == 2, 'Haas effect only works with monaural or stereo audio.' if x.shape[1] < 2: x = np.repeat(x, 2, axis=1) y = haas_process(x, self.parameters.delay.value, self.parameters.feedback.value, self.parameters.wet_channel.value) return y def update(self, parameter_name=None): """ Update processor after randomization of parameters. Args: parameter_name (str): Parameter whose value has changed. """ self.reset_state() def reset_state(self): """Reset state.""" self.read_idx = 0 self.write_idx = self.parameters.delay.value self.buffer = np.zeros((65536, 2)) # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PANNER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class Panner(Processor): """ Simple stereo panner. If input is mono, output is stereo. Original edited from https://github.com/csteinmetz1/pymixconsole/blob/master/pymixconsole/processors/panner.py """ def __init__(self, name='Panner', parameters=None): """ Initialize processor. Args: name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ # default processor class constructor super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=None) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('pan', 0.5, 'float', minimum=0., maximum=1.)) self.parameters.add(Parameter('pan_law', '-4.5dB', 'string', options=['-4.5dB', 'linear', 'constant_power'])) # setup the coefficents based on default params self.update() def _calculate_pan_coefficents(self): """ Calculate panning coefficients from the chosen pan law. Based on the set pan law determine the gain value to apply for the left and right channel to achieve panning effect. This operates on the assumption that the input channel is mono. The output data will be stereo at the moment, but could be expanded to a higher channel count format. The panning value is in the range [0, 1], where 0 means the signal is panned completely to the left, and 1 means the signal is apanned copletely to the right. Raises: ValueError: `self.parameters.pan_law` is not supported. """ self.gains = np.zeros(2, dtype=self.dtype) # first scale the linear [0, 1] to [0, pi/2] theta = self.parameters.pan.value * (np.pi/2) if self.parameters.pan_law.value == 'linear': self.gains[0] = ((np.pi/2) - theta) * (2/np.pi) self.gains[1] = theta * (2/np.pi) elif self.parameters.pan_law.value == 'constant_power': self.gains[0] = np.cos(theta) self.gains[1] = np.sin(theta) elif self.parameters.pan_law.value == '-4.5dB': self.gains[0] = np.sqrt(((np.pi/2) - theta) * (2/np.pi) * np.cos(theta)) self.gains[1] = np.sqrt(theta * (2/np.pi) * np.sin(theta)) else: raise ValueError(f'Invalid pan_law {self.parameters.pan_law.value}.') def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): panned audio of size `n_samples x n_channels`. """ assert x.shape[1] == 1 or x.shape[1] == 2, 'Panner only works with monaural or stereo audio.' if x.shape[1] < 2: x = np.repeat(x, 2, axis=1) return x * self.gains def update(self, parameter_name=None): """ Update processor after randomization of parameters. Args: parameter_name (str): Parameter whose value has changed. """ self._calculate_pan_coefficents() def reset_state(self): """Reset state.""" self._output_buffer = np.empty([self.block_size, 2]) self.update() # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% STEREO IMAGER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class MidSideImager(Processor): def __init__(self, name='IMAGER', parameters=None): super().__init__(name, parameters=parameters, block_size=None, sample_rate=None) if not parameters: self.parameters = ParameterList() # values of 0.0~1.0 indicate making the signal more centered while 1.0~2.0 means making the signal more wider self.parameters.add(Parameter("bal", 0.0, "float", processor=self, minimum=0.0, maximum=2.0)) def process(self, data): """ # input shape : [signal length, 2] ### note! stereo imager won't work if the input signal is a mono signal (left==right) ### if you want to apply stereo imager to a mono signal, first stereoize it with Haas effects """ # to mid-side channels mid, side = self.lr_to_ms(data[:,0], data[:,1]) # apply mid-side weights according to energy mid_e, side_e = np.sum(mid**2), np.sum(side**2) total_e = mid_e + side_e # apply weights max_side_multiplier = np.sqrt(total_e / (side_e + 1e-3)) # compute current multiply factor cur_bal = round(getattr(self.parameters, "bal").value, 3) side_gain = cur_bal if cur_bal <= 1. else max_side_multiplier * (cur_bal-1) # multiply weighting factor new_side = side * side_gain new_side_e = side_e * (side_gain ** 2) left_mid_e = total_e - new_side_e mid_gain = np.sqrt(left_mid_e / (mid_e + 1e-3)) new_mid = mid * mid_gain # convert back to left-right channels left, right = self.ms_to_lr(new_mid, new_side) imaged = np.stack([left, right], 1) return imaged # left-right channeled signal to mid-side signal def lr_to_ms(self, left, right): mid = left + right side = left - right return mid, side # mid-side channeled signal to left-right signal def ms_to_lr(self, mid, side): left = (mid + side) / 2 right = (mid - side) / 2 return left, right def update(self, parameter_name=None): return parameter_name # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% GAIN %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class Gain(Processor): """ Gain Processor. Applies gain in dB and can also randomly inverts polarity. Processor parameters: gain (float): Gain that should be applied (dB scale). invert (bool): If True, then we also invert the waveform. """ def __init__(self, name='Gain', parameters=None): """ Initialize processor. Args: name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name, parameters=parameters, block_size=None, sample_rate=None) if not parameters: self.parameters = ParameterList() # self.parameters.add(Parameter('gain', 1.0, 'float', units='dB', minimum=-12.0, maximum=6.0)) self.parameters.add(Parameter('gain', 1.0, 'float', units='dB', minimum=-6.0, maximum=9.0)) self.parameters.add(Parameter('invert', False, 'bool')) def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): gain-augmented audio of size `n_samples x n_channels`. """ gain = 10 ** (self.parameters.gain.value / 20.) if self.parameters.invert.value: gain = -gain return gain * x # %%%%%%%%%%%%%%%%%%%%%%% SIMPLE CHANNEL SWAP %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class SwapChannels(Processor): """ Swap channels in multi-channel audio. Processor parameters: index (int) Selects the permutation that we are using. Please note that "no permutation" is one of the permutations in `self.permutations` at index `0`. """ def __init__(self, n_channels, name='SwapChannels', parameters=None): """ Initialize processor. Args: n_channels (int): Number of channels in audio that we want to process. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=None) self.permutations = tuple(permutations(range(n_channels), n_channels)) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('index', 0, 'int', minimum=0, maximum=len(self.permutations))) def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): channel-swapped audio of size `n_samples x n_channels`. """ return x[:, self.permutations[self.parameters.index.value]] # %%%%%%%%%%%%%%%%%%%%%%% Monauralize %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class Monauralize(Processor): """ Monauralizes audio (i.e., removes spatial information). Process parameters: seed_channel (int): channel that we use for overwriting the others. """ def __init__(self, n_channels, name='Monauralize', parameters=None): """ Initialize processor. Args: n_channels (int): Number of channels in audio that we want to process. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=None) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('seed_channel', 0, 'int', minimum=0, maximum=n_channels)) def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): monauralized audio of size `n_samples x n_channels`. """ return np.tile(x[:, [self.parameters.seed_channel.value]], (1, x.shape[1])) # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PITCH SHIFT %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class PitchShift(Processor): """ Simple pitch shifter using SoX and soxbindings (https://github.com/pseeth/soxbindings). Processor parameters: steps (float): Pitch shift as positive/negative semitones quick (bool): If True, this effect will run faster but with lower sound quality. """ def __init__(self, sample_rate, fix_length=True, name='PitchShift', parameters=None): """ Initialize processor. Args: sample_rate (int): Sample rate of input audio. fix_length (bool): If True, then output has same length as input. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('steps', 0.0, 'float', minimum=-6., maximum=6.)) self.parameters.add(Parameter('quick', False, 'bool')) self.fix_length = fix_length self.clips = False def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): pitch-shifted audio of size `n_samples x n_channels`. """ if self.parameters.steps.value == 0.0: y = x else: scale = np.max(np.abs(x)) if scale > 0.9: clips = True x = x * (0.9 / scale) else: clips = False tfm = sox.Transformer() tfm.pitch(self.parameters.steps.value, quick=bool(self.parameters.quick.value)) y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32) if clips: y *= scale / 0.9 # rescale output to original scale if self.fix_length: n_samples_input = x.shape[0] n_samples_output = y.shape[0] if n_samples_input < n_samples_output: idx1 = (n_samples_output - n_samples_input) // 2 idx2 = idx1 + n_samples_input y = y[idx1:idx2] elif n_samples_input > n_samples_output: n_pad = n_samples_input - n_samples_output y = np.pad(y, ((n_pad//2, n_pad - n_pad//2), (0, 0))) return y # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% TIME STRETCH %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class TimeStretch(Processor): """ Simple time stretcher using SoX and soxbindings (https://github.com/pseeth/soxbindings). Processor parameters: factor (float): Time stretch factor. quick (bool): If True, this effect will run faster but with lower sound quality. stretch_type (str): Algorithm used for stretching (`tempo` or `stretch`). audio_type (str): Sets which time segments are most optmial when finding the best overlapping points for time stretching. """ def __init__(self, sample_rate, fix_length=True, name='TimeStretch', parameters=None): """ Initialize processor. Args: sample_rate (int): Sample rate of input audio. fix_length (bool): If True, then output has same length as input. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('factor', 1.0, 'float', minimum=1/1.33, maximum=1.33)) self.parameters.add(Parameter('quick', False, 'bool')) self.parameters.add(Parameter('stretch_type', 'tempo', 'string', options=['tempo', 'stretch'])) self.parameters.add(Parameter('audio_type', 'l', 'string', options=['m', 's', 'l'])) self.fix_length = fix_length def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): time-stretched audio of size `n_samples x n_channels`. """ if self.parameters.factor.value == 1.0: y = x else: scale = np.max(np.abs(x)) if scale > 0.9: clips = True x = x * (0.9 / scale) else: clips = False tfm = sox.Transformer() if self.parameters.stretch_type.value == 'stretch': tfm.stretch(self.parameters.factor.value) elif self.parameters.stretch_type.value == 'tempo': tfm.tempo(self.parameters.factor.value, audio_type=self.parameters.audio_type.value, quick=bool(self.parameters.quick.value)) y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32) if clips: y *= scale / 0.9 # rescale output to original scale if self.fix_length: n_samples_input = x.shape[0] n_samples_output = y.shape[0] if n_samples_input < n_samples_output: idx1 = (n_samples_output - n_samples_input) // 2 idx2 = idx1 + n_samples_input y = y[idx1:idx2] elif n_samples_input > n_samples_output: n_pad = n_samples_input - n_samples_output y = np.pad(y, ((n_pad//2, n_pad - n_pad//2), (0, 0))) return y # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PLAYBACK SPEED %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class PlaybackSpeed(Processor): """ Simple playback speed effect using SoX and soxbindings (https://github.com/pseeth/soxbindings). Processor parameters: factor (float): Playback speed factor. """ def __init__(self, sample_rate, fix_length=True, name='PlaybackSpeed', parameters=None): """ Initialize processor. Args: sample_rate (int): Sample rate of input audio. fix_length (bool): If True, then output has same length as input. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('factor', 1.0, 'float', minimum=1./1.33, maximum=1.33)) self.fix_length = fix_length def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): resampled audio of size `n_samples x n_channels`. """ if self.parameters.factor.value == 1.0: y = x else: scale = np.max(np.abs(x)) if scale > 0.9: clips = True x = x * (0.9 / scale) else: clips = False tfm = sox.Transformer() tfm.speed(self.parameters.factor.value) y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32) if clips: y *= scale / 0.9 # rescale output to original scale if self.fix_length: n_samples_input = x.shape[0] n_samples_output = y.shape[0] if n_samples_input < n_samples_output: idx1 = (n_samples_output - n_samples_input) // 2 idx2 = idx1 + n_samples_input y = y[idx1:idx2] elif n_samples_input > n_samples_output: n_pad = n_samples_input - n_samples_output y = np.pad(y, ((n_pad//2, n_pad - n_pad//2), (0, 0))) return y # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% BEND %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class Bend(Processor): """ Simple bend effect using SoX and soxbindings (https://github.com/pseeth/soxbindings). Processor parameters: n_bends (int): Number of segments or intervals to pitch shift """ def __init__(self, sample_rate, pitch_range=(-600, 600), fix_length=True, name='Bend', parameters=None): """ Initialize processor. Args: sample_rate (int): Sample rate of input audio. pitch_range (tuple of ints): min and max pitch bending ranges in cents fix_length (bool): If True, then output has same length as input. name (str): Name of processor. parameters (parameter_list): Parameters for this processor. """ super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter('n_bends', 2, 'int', minimum=2, maximum=10)) self.pitch_range_min, self.pitch_range_max = pitch_range def process(self, x): """ Process audio. Args: x (Numpy array): input audio of size `n_samples x n_channels`. Returns: (Numpy array): pitch-bended audio of size `n_samples x n_channels`. """ n_bends = self.parameters.n_bends.value max_length = x.shape[0] / self.sample_rate # Generates random non-overlapping segments delta = 1. / self.sample_rate boundaries = np.sort(delta + np.random.rand(n_bends-1) * (max_length - delta)) start, end = np.zeros(n_bends), np.zeros(n_bends) start[0] = delta for i, b in enumerate(boundaries): end[i] = b start[i+1] = b end[-1] = max_length # randomly sample pitch-shifts in cents cents = np.random.randint(self.pitch_range_min, self.pitch_range_max+1, n_bends) # remove segment if cent value is zero or start == end (as SoX does not allow such values) idx_keep = np.logical_and(cents != 0, start != end) n_bends, start, end, cents = sum(idx_keep), start[idx_keep], end[idx_keep], cents[idx_keep] scale = np.max(np.abs(x)) if scale > 0.9: clips = True x = x * (0.9 / scale) else: clips = False tfm = sox.Transformer() tfm.bend(n_bends=int(n_bends), start_times=list(start), end_times=list(end), cents=list(cents)) y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32) if clips: y *= scale / 0.9 # rescale output to original scale return y # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% ALGORITHMIC REVERB %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% class AlgorithmicReverb(Processor): def __init__(self, name="algoreverb", parameters=None, sample_rate=44100, **kwargs): super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate, **kwargs) if not parameters: self.parameters = ParameterList() self.parameters.add(Parameter("room_size", 0.5, "float", minimum=0.05, maximum=0.85)) self.parameters.add(Parameter("damping", 0.1, "float", minimum=0.0, maximum=1.0)) self.parameters.add(Parameter("dry_mix", 0.9, "float", minimum=0.0, maximum=1.0)) self.parameters.add(Parameter("wet_mix", 0.1, "float", minimum=0.0, maximum=1.0)) self.parameters.add(Parameter("width", 0.7, "float", minimum=0.0, maximum=1.0)) # Tuning self.stereospread = 23 self.scalegain = 0.2 def process(self, data): if data.ndim >= 2: dataL = data[:,0] if data.shape[1] == 2: dataR = data[:,1] else: dataR = data[:,0] else: dataL = data dataR = data output = np.zeros((data.shape[0], 2)) xL, xR = self.process_filters(dataL.copy(), dataR.copy()) wet1_g = self.parameters.wet_mix.value * ((self.parameters.width.value/2) + 0.5) wet2_g = self.parameters.wet_mix.value * ((1-self.parameters.width.value)/2) dry_g = self.parameters.dry_mix.value output[:,0] = (wet1_g * xL) + (wet2_g * xR) + (dry_g * dataL) output[:,1] = (wet1_g * xR) + (wet2_g * xL) + (dry_g * dataR) return output def process_filters(self, dataL, dataR): xL = self.combL1.process(dataL.copy() * self.scalegain) xL += self.combL2.process(dataL.copy() * self.scalegain) xL += self.combL3.process(dataL.copy() * self.scalegain) xL += self.combL4.process(dataL.copy() * self.scalegain) xL = self.combL5.process(dataL.copy() * self.scalegain) xL += self.combL6.process(dataL.copy() * self.scalegain) xL += self.combL7.process(dataL.copy() * self.scalegain) xL += self.combL8.process(dataL.copy() * self.scalegain) xR = self.combR1.process(dataR.copy() * self.scalegain) xR += self.combR2.process(dataR.copy() * self.scalegain) xR += self.combR3.process(dataR.copy() * self.scalegain) xR += self.combR4.process(dataR.copy() * self.scalegain) xR = self.combR5.process(dataR.copy() * self.scalegain) xR += self.combR6.process(dataR.copy() * self.scalegain) xR += self.combR7.process(dataR.copy() * self.scalegain) xR += self.combR8.process(dataR.copy() * self.scalegain) yL1 = self.allpassL1.process(xL) yL2 = self.allpassL2.process(yL1) yL3 = self.allpassL3.process(yL2) yL4 = self.allpassL4.process(yL3) yR1 = self.allpassR1.process(xR) yR2 = self.allpassR2.process(yR1) yR3 = self.allpassR3.process(yR2) yR4 = self.allpassR4.process(yR3) return yL4, yR4 def update(self, parameter_name): rs = self.parameters.room_size.value dp = self.parameters.damping.value ss = self.stereospread # initialize allpass and feedback comb-filters # (with coefficients optimized for fs=44.1kHz) self.allpassL1 = pymc.components.allpass.Allpass(556, rs, self.block_size) self.allpassR1 = pymc.components.allpass.Allpass(556+ss, rs, self.block_size) self.allpassL2 = pymc.components.allpass.Allpass(441, rs, self.block_size) self.allpassR2 = pymc.components.allpass.Allpass(441+ss, rs, self.block_size) self.allpassL3 = pymc.components.allpass.Allpass(341, rs, self.block_size) self.allpassR3 = pymc.components.allpass.Allpass(341+ss, rs, self.block_size) self.allpassL4 = pymc.components.allpass.Allpass(225, rs, self.block_size) self.allpassR4 = pymc.components.allpass.Allpass(255+ss, rs, self.block_size) self.combL1 = pymc.components.comb.Comb(1116, dp, rs, self.block_size) self.combR1 = pymc.components.comb.Comb(1116+ss, dp, rs, self.block_size) self.combL2 = pymc.components.comb.Comb(1188, dp, rs, self.block_size) self.combR2 = pymc.components.comb.Comb(1188+ss, dp, rs, self.block_size) self.combL3 = pymc.components.comb.Comb(1277, dp, rs, self.block_size) self.combR3 = pymc.components.comb.Comb(1277+ss, dp, rs, self.block_size) self.combL4 = pymc.components.comb.Comb(1356, dp, rs, self.block_size) self.combR4 = pymc.components.comb.Comb(1356+ss, dp, rs, self.block_size) self.combL5 = pymc.components.comb.Comb(1422, dp, rs, self.block_size) self.combR5 = pymc.components.comb.Comb(1422+ss, dp, rs, self.block_size) self.combL6 = pymc.components.comb.Comb(1491, dp, rs, self.block_size) self.combR6 = pymc.components.comb.Comb(1491+ss, dp, rs, self.block_size) self.combL7 = pymc.components.comb.Comb(1557, dp, rs, self.block_size) self.combR7 = pymc.components.comb.Comb(1557+ss, dp, rs, self.block_size) self.combL8 = pymc.components.comb.Comb(1617, dp, rs, self.block_size) self.combR8 = pymc.components.comb.Comb(1617+ss, dp, rs, self.block_size)