|
""" |
|
Module with common functions for loading training data and preparing minibatches. |
|
|
|
AI Music Technology Group, Sony Group Corporation |
|
AI Speech and Sound Group, Sony Europe |
|
|
|
This implementation originally belongs to Sony Group Corporation, |
|
which has been introduced in the work "Automatic music mixing with deep learning and out-of-domain data". |
|
Original repo link: https://github.com/sony/FxNorm-automix |
|
""" |
|
|
|
import numpy as np |
|
import os |
|
import sys |
|
import functools |
|
import scipy.io.wavfile as wav |
|
import soundfile as sf |
|
from typing import Tuple |
|
|
|
currentdir = os.path.dirname(os.path.realpath(__file__)) |
|
sys.path.append(currentdir) |
|
from common_audioeffects import AugmentationChain |
|
from common_miscellaneous import uprint |
|
|
|
|
|
def load_wav(file_path, mmap=False, convert_float=False): |
|
""" |
|
Load a WAV file in C_CONTIGUOUS format. |
|
|
|
Args: |
|
file_path: Path to WAV file (16bit, 24bit or 32bit PCM supported) |
|
mmap: If `True`, then we do not load the WAV data into memory but use a memory-mapped representation |
|
|
|
Returns: |
|
fs: Sample rate |
|
samples: Numpy array (np.int16 or np.int32) with audio [n_samples x n_channels] |
|
""" |
|
fs, samples = wav.read(file_path, mmap=mmap) |
|
|
|
|
|
if samples.ndim == 1: |
|
samples = samples[:, np.newaxis] |
|
|
|
|
|
|
|
assert(samples.dtype == np.int16 or samples.dtype == np.int32) |
|
|
|
if convert_float: |
|
conversion_scale = 1. / (1. + np.iinfo(samples.dtype).max) |
|
samples = samples.astype(dtype=np.float32) * conversion_scale |
|
|
|
return fs, samples |
|
|
|
|
|
def save_wav(file_path, fs, samples, subtype='PCM_16'): |
|
""" |
|
Save a WAV file (16bit or 32bit PCM). |
|
|
|
Important note: We save here using the same conversion as is used in |
|
`generate_data`, i.e., we multiply by `1 + np.iinfo(np.int16).max` |
|
or `1 + np.iinfo(np.int32).max` which is a different behavior |
|
than `libsndfile` as described here: |
|
http://www.mega-nerd.com/libsndfile/FAQ.html#Q010 |
|
|
|
Args: |
|
file_path: Path where to store the WAV file |
|
fs: Sample rate |
|
samples: Numpy array (float32 with values in [-1, 1) and shape [n_samples x n_channels]) |
|
subtype: Either `PCM_16` or `PCM_24` or `PCM_32` in order to store as 16bit, 24bit or 32bit PCM file |
|
""" |
|
assert subtype in ['PCM_16', 'PCM_24', 'PCM_32'], subtype |
|
|
|
if subtype == 'PCM_16': |
|
dtype = np.int16 |
|
else: |
|
dtype = np.int32 |
|
|
|
|
|
|
|
samples = samples * (1 + np.iinfo(dtype).max) |
|
if np.min(samples) < np.iinfo(dtype).min or np.max(samples) > np.iinfo(dtype).max: |
|
uprint(f'WARNING: Clipping occurs for {file_path}.') |
|
samples_ = samples / (1 + np.iinfo(dtype).max) |
|
print('max value ', np.max(np.abs(samples_))) |
|
samples = np.clip(samples, np.iinfo(dtype).min, np.iinfo(dtype).max) |
|
samples = samples.astype(dtype) |
|
|
|
|
|
sf.write(file_path, samples, fs, subtype=subtype) |
|
|
|
|
|
def load_files_lists(path): |
|
""" |
|
Auxiliary function to find the paths for all mixtures in a database. |
|
|
|
Args: |
|
path: path to the folder containing the files to list |
|
|
|
Returns: |
|
list_of_directories: list of directories (= list of songs) in `path` |
|
""" |
|
|
|
list_of_directories = [] |
|
for folder in os.listdir(path): |
|
list_of_directories.append(folder) |
|
|
|
return list_of_directories |
|
|
|
|
|
def create_dataset(path, accepted_sampling_rates, sources, mapped_sources, n_channels=-1, load_to_memory=False, |
|
debug=False, verbose=False): |
|
""" |
|
Prepare data in `path` for training/validation/test set generation. |
|
|
|
Args: |
|
path: path to the dataset |
|
accepted_sampling_rates: list of accepted sampling rates |
|
sources: list of sources |
|
mapped_sources: list of mapped sources |
|
n_channels: number of channels |
|
load_to_memory: whether to load to main memory |
|
debug: if `True`, then we load only `NUM_SAMPLES_SMALL_DATASET` |
|
|
|
Raises: |
|
ValueError: mapping of sources not possible is data is not loaded into memory |
|
|
|
Returns: |
|
data: list of dictionaries with function handles (to load the data) |
|
directories: list of directories |
|
""" |
|
NUM_SAMPLES_SMALL_DATASET = 16 |
|
|
|
|
|
if mapped_sources and not load_to_memory: |
|
raise ValueError('Mapping of sources only supported if data is loaded into the memory.') |
|
|
|
|
|
directories = load_files_lists(path) |
|
|
|
|
|
if debug: |
|
data = [dict() for _x in range(np.minimum(NUM_SAMPLES_SMALL_DATASET, len(directories)))] |
|
else: |
|
data = [dict() for _x in range(len(directories))] |
|
|
|
material_length = {} |
|
for i, d in enumerate(directories): |
|
if verbose: |
|
uprint(f'Processing mixture ({i+1} of {len(directories)}): {d}') |
|
|
|
|
|
files = os.listdir(os.path.join(path, d)) |
|
for f in files: |
|
src_name = os.path.splitext(f)[0] |
|
if ((src_name not in sources |
|
and src_name not in mapped_sources)): |
|
if verbose: |
|
uprint(f'\tIgnoring unknown source from file {f}') |
|
else: |
|
if src_name not in sources: |
|
src_name = mapped_sources[src_name] |
|
if verbose: |
|
uprint(f'\tAdding function handle for "{src_name}" from file {f}') |
|
|
|
_data = load_wav(os.path.join(path, d, f), mmap=not load_to_memory) |
|
|
|
|
|
_samplingrate = _data[0] |
|
_n_channels = _data[1].shape[1] |
|
_duration = _data[1].shape[0] / _samplingrate |
|
|
|
|
|
if src_name in material_length: |
|
material_length[src_name] += _duration |
|
else: |
|
material_length[src_name] = _duration |
|
|
|
|
|
if n_channels != -1 and _n_channels != n_channels: |
|
raise ValueError(f'File has {_n_channels} ' |
|
f'channels but expected {n_channels}.') |
|
|
|
if _samplingrate not in accepted_sampling_rates: |
|
raise ValueError(f'File has fs = {_samplingrate}Hz ' |
|
f'but expected {accepted_sampling_rates}Hz.') |
|
|
|
|
|
if src_name in data[i]: |
|
_data = (_data[0], np.vstack((_data[1], |
|
data[i][src_name].keywords['file_path_or_data'][1]))) |
|
data[i][src_name] = functools.partial(generate_data, |
|
file_path_or_data=_data) |
|
|
|
if debug and i == NUM_SAMPLES_SMALL_DATASET-1: |
|
|
|
break |
|
|
|
|
|
idx_empty = [_ for _ in range(len(data)) if len(data[_]) == 0] |
|
for idx in sorted(idx_empty, reverse=True): |
|
del data[idx] |
|
|
|
return data, directories |
|
|
|
def create_dataset_mixing(path, accepted_sampling_rates, sources, mapped_sources, n_channels=-1, load_to_memory=False, |
|
debug=False, pad_wrap_samples=None): |
|
""" |
|
Prepare data in `path` for training/validation/test set generation. |
|
|
|
Args: |
|
path: path to the dataset |
|
accepted_sampling_rates: list of accepted sampling rates |
|
sources: list of sources |
|
mapped_sources: list of mapped sources |
|
n_channels: number of channels |
|
load_to_memory: whether to load to main memory |
|
debug: if `True`, then we load only `NUM_SAMPLES_SMALL_DATASET` |
|
|
|
Raises: |
|
ValueError: mapping of sources not possible is data is not loaded into memory |
|
|
|
Returns: |
|
data: list of dictionaries with function handles (to load the data) |
|
directories: list of directories |
|
""" |
|
NUM_SAMPLES_SMALL_DATASET = 16 |
|
|
|
|
|
if mapped_sources and not load_to_memory: |
|
raise ValueError('Mapping of sources only supported if data is loaded into the memory.') |
|
|
|
|
|
directories = load_files_lists(path) |
|
directories.sort() |
|
|
|
|
|
uprint(f'\nCreating dataset for path={path} ...') |
|
|
|
if debug: |
|
data = [dict() for _x in range(np.minimum(NUM_SAMPLES_SMALL_DATASET, len(directories)))] |
|
else: |
|
data = [dict() for _x in range(len(directories))] |
|
|
|
material_length = {} |
|
for i, d in enumerate(directories): |
|
uprint(f'Processing mixture ({i+1} of {len(directories)}): {d}') |
|
|
|
|
|
files = os.listdir(os.path.join(path, d)) |
|
_data_mix = [] |
|
_stems_name = [] |
|
for f in files: |
|
src_name = os.path.splitext(f)[0] |
|
if ((src_name not in sources |
|
and src_name not in mapped_sources)): |
|
uprint(f'\tIgnoring unknown source from file {f}') |
|
else: |
|
if src_name not in sources: |
|
src_name = mapped_sources[src_name] |
|
uprint(f'\tAdding function handle for "{src_name}" from file {f}') |
|
|
|
_data = load_wav(os.path.join(path, d, f), mmap=not load_to_memory) |
|
|
|
if pad_wrap_samples: |
|
_data = (_data[0], np.pad(_data[1], [(pad_wrap_samples, 0), (0,0)], 'wrap')) |
|
|
|
|
|
_samplingrate = _data[0] |
|
_n_channels = _data[1].shape[1] |
|
_duration = _data[1].shape[0] / _samplingrate |
|
|
|
|
|
if src_name in material_length: |
|
material_length[src_name] += _duration |
|
else: |
|
material_length[src_name] = _duration |
|
|
|
|
|
if n_channels != -1 and _n_channels != n_channels: |
|
if _n_channels == 1: |
|
_data = (_data[0], np.repeat(_data[1], 2, axis=-1)) |
|
print("Converted file to stereo by repeating mono channel") |
|
else: |
|
raise ValueError(f'File has {_n_channels} ' |
|
f'channels but expected {n_channels}.') |
|
|
|
if _samplingrate not in accepted_sampling_rates: |
|
raise ValueError(f'File has fs = {_samplingrate}Hz ' |
|
f'but expected {accepted_sampling_rates}Hz.') |
|
|
|
|
|
if src_name in data[i]: |
|
_data = (_data[0], np.vstack((_data[1], |
|
data[i][src_name].keywords['file_path_or_data'][1]))) |
|
|
|
_data_mix.append(_data) |
|
_stems_name.append(src_name) |
|
|
|
data[i]["-".join(_stems_name)] = functools.partial(generate_data, |
|
file_path_or_data=_data_mix) |
|
|
|
if debug and i == NUM_SAMPLES_SMALL_DATASET-1: |
|
|
|
break |
|
|
|
|
|
idx_empty = [_ for _ in range(len(data)) if len(data[_]) == 0] |
|
for idx in sorted(idx_empty, reverse=True): |
|
del data[idx] |
|
|
|
uprint(f'Finished preparation of dataset. ' |
|
f'Found in total the following material (in {len(data)} directories):') |
|
for src in material_length: |
|
uprint(f'\t{src}: {material_length[src] / 60.0 / 60.0:.2f} hours') |
|
return data, directories |
|
|
|
|
|
def generate_data(file_path_or_data, random_sample_size=None): |
|
""" |
|
Load one stem/several stems specified by `file_path_or_data`. |
|
|
|
Alternatively, can also be the result of `wav.read()` if the data has already been loaded previously. |
|
|
|
If `file_path_or_data` is a tuple/list, then we load several files and will return also a tuple/list. |
|
This is useful for cases where we want to make sure to have the same random chunk for several stems. |
|
|
|
If `random_sample_chunk_size` is not None, then only `random_sample_chunk_size` samples are randomly selected. |
|
|
|
Args: |
|
file_path_or_data: either path to data or the data itself |
|
random_sample_size: if `random_sample_size` is not None, only `random_sample_size` samples are randomly selected |
|
|
|
Returns: |
|
samples: data with size `num_samples x num_channels` or a list of samples |
|
""" |
|
needs_wrapping = False |
|
if isinstance(file_path_or_data, str): |
|
needs_wrapping = True |
|
if ((type(file_path_or_data[0]) is not list |
|
and type(file_path_or_data[0]) is not tuple)): |
|
needs_wrapping = True |
|
if needs_wrapping: |
|
file_path_or_data = (file_path_or_data,) |
|
|
|
|
|
samples = [None] * len(file_path_or_data) |
|
|
|
|
|
for i, fpod in enumerate(file_path_or_data): |
|
if isinstance(fpod, str): |
|
_fs, samples[i] = load_wav(fpod) |
|
else: |
|
_fs, samples[i] = fpod |
|
|
|
|
|
if random_sample_size is not None: |
|
|
|
max_length = random_sample_size |
|
for s in samples: |
|
max_length = np.maximum(max_length, s.shape[0]) |
|
|
|
|
|
|
|
for i, s in enumerate(samples): |
|
if s.shape[0] < max_length: |
|
required_padding = max_length - s.shape[0] |
|
zeros = np.zeros((required_padding // 2 + 1, s.shape[1]), |
|
dtype=s.dtype, order='F') |
|
samples[i] = np.concatenate([zeros, s, zeros]) |
|
|
|
|
|
idx_start = np.random.randint(max_length) |
|
|
|
for i, s in enumerate(samples): |
|
if idx_start + random_sample_size < s.shape[0]: |
|
samples[i] = s[idx_start:idx_start + random_sample_size] |
|
else: |
|
samples[i] = np.concatenate([s[idx_start:], |
|
s[:random_sample_size - (s.shape[0] - idx_start)]]) |
|
|
|
|
|
for i, s in enumerate(samples): |
|
conversion_scale = 1. / (1. + np.iinfo(s.dtype).max) |
|
samples[i] = s.astype(dtype=np.float32) * conversion_scale |
|
|
|
if len(samples) == 1: |
|
return samples[0] |
|
else: |
|
return samples |
|
|
|
|
|
def create_minibatch(data: list, sources: list, |
|
present_prob: dict, overlap_prob: dict, |
|
augmenter: AugmentationChain, augmenter_padding: Tuple[int], |
|
batch_size: int, n_samples: int, n_channels: int, idx_songs: dict): |
|
""" |
|
Create a minibatch. |
|
|
|
This function also handles the case that we do not have a source in one mixture. |
|
This can, e.g., happen for instrumental pieces that do not have vocals. |
|
|
|
Args: |
|
data (list): data to create the minibatch from. |
|
sources (list): list of sources. |
|
present_prob (dict): probability of a source to be present. |
|
overlap_prob (dict): probability of overlap. |
|
augmenter (AugmentationChain): audio effect chain that we want to apply for data augmentation |
|
augmenter_padding (tuple of ints): padding that we should apply to left/right side of data to avoid |
|
boundary effects of `augmenter`. |
|
batch_size (int): number of training samples in one minibatch. |
|
n_samples (int): number of time samples. |
|
n_channels (int): number of channels. |
|
idx_songs (dict): index of songs. |
|
|
|
Returns: |
|
inp (Numpy array): minibatch, input to the network (i.e. the mixture) of size |
|
`batch_size x n_samples x n_channels` |
|
tar (dict with Numpy arrays): dictionary which contains for each source the targets, |
|
each of the `c_contiguous` ndarrays is `batch_size x n_samples x n_channels` |
|
""" |
|
|
|
shp = (batch_size, n_samples, n_channels) |
|
inp = np.zeros(shape=shp, dtype=np.float32, order='C') |
|
tar = {src: np.zeros(shape=shp, dtype=np.float32, order='C') for src in sources} |
|
|
|
|
|
pad_left = None if augmenter_padding[0] == 0 else augmenter_padding[0] |
|
pad_right = None if augmenter_padding[1] == 0 else -augmenter_padding[1] |
|
|
|
def augm(i, s, n): |
|
return augmenter(data[i][s](random_sample_size=n+sum(augmenter_padding)))[pad_left:pad_right] |
|
|
|
|
|
for src in sources: |
|
|
|
for j in range(batch_size): |
|
|
|
_idx_song = idx_songs[src][j] |
|
|
|
|
|
is_present = src not in present_prob or np.random.rand() < present_prob[src] |
|
is_overlap = src in overlap_prob and np.random.rand() < overlap_prob[src] |
|
|
|
|
|
if src in data[_idx_song] and is_present: |
|
tar[src][j, ...] = augm(_idx_song, src, n_samples) |
|
|
|
|
|
if is_overlap: |
|
idx_overlap_ = np.random.randint(len(data)) |
|
if idx_overlap_ != _idx_song and src in data[idx_overlap_]: |
|
tar[src][j, ...] += augm(idx_overlap_, src, n_samples) |
|
|
|
|
|
inp += tar[src] |
|
|
|
|
|
maxabs_amp = np.maximum(1.0, 1e-6 + np.max(np.abs(inp), axis=(1, 2), keepdims=True)) |
|
inp /= maxabs_amp |
|
for src in sources: |
|
tar[src] /= maxabs_amp |
|
|
|
return inp, tar |
|
|
|
def create_minibatch_mixing(data: list, sources: list, inputs: list, outputs: list, |
|
present_prob: dict, overlap_prob: dict, |
|
augmenter: AugmentationChain, augmenter_padding: Tuple[int], augmenter_sources: list, |
|
batch_size: int, n_samples: int, n_channels: int, idx_songs: dict): |
|
""" |
|
Create a minibatch. |
|
|
|
This function also handles the case that we do not have a source in one mixture. |
|
This can, e.g., happen for instrumental pieces that do not have vocals. |
|
|
|
Args: |
|
data (list): data to create the minibatch from. |
|
sources (list): list of sources. |
|
present_prob (dict): probability of a source to be present. |
|
overlap_prob (dict): probability of overlap. |
|
augmenter (AugmentationChain): audio effect chain that we want to apply for data augmentation |
|
augmenter_padding (tuple of ints): padding that we should apply to left/right side of data to avoid |
|
boundary effects of `augmenter`. |
|
augmenter_sources (list): list of sources to augment |
|
batch_size (int): number of training samples in one minibatch. |
|
n_samples (int): number of time samples. |
|
n_channels (int): number of channels. |
|
idx_songs (dict): index of songs. |
|
|
|
Returns: |
|
inp (Numpy array): minibatch, input to the network (i.e. the mixture) of size |
|
`batch_size x n_samples x n_channels` |
|
tar (dict with Numpy arrays): dictionary which contains for each source the targets, |
|
each of the `c_contiguous` ndarrays is `batch_size x n_samples x n_channels` |
|
""" |
|
|
|
shp = (batch_size, n_samples, n_channels) |
|
stems = {src: np.zeros(shape=shp, dtype=np.float32, order='C') for src in inputs} |
|
mix = {src: np.zeros(shape=shp, dtype=np.float32, order='C') for src in outputs} |
|
|
|
|
|
pad_left = None if augmenter_padding[0] == 0 else augmenter_padding[0] |
|
pad_right = None if augmenter_padding[1] == 0 else -augmenter_padding[1] |
|
|
|
def augm(i, n): |
|
s = list(data[i])[0] |
|
input_multitracks = data[i][s](random_sample_size=n+sum(augmenter_padding)) |
|
audio_tags = list(data[i])[0].split("-") |
|
|
|
|
|
for k, tag in enumerate(audio_tags): |
|
if tag in augmenter_sources: |
|
input_multitracks[k] = augmenter(input_multitracks[k])[pad_left:pad_right] |
|
else: |
|
input_multitracks[k] = input_multitracks[k][pad_left:pad_right] |
|
return input_multitracks |
|
|
|
|
|
for src in outputs: |
|
|
|
for j in range(batch_size): |
|
|
|
_idx_song = idx_songs[src][j] |
|
|
|
multitrack_audio = augm(_idx_song, n_samples) |
|
|
|
audio_tags = list(data[_idx_song])[0].split("-") |
|
|
|
for i, tag in enumerate(audio_tags): |
|
if tag in inputs: |
|
stems[tag][j, ...] = multitrack_audio[i] |
|
if tag in outputs: |
|
mix[tag][j, ...] = multitrack_audio[i] |
|
|
|
return stems, mix |
|
|
|
|