jhtonyKoo's picture
Upload 61 files
2777fde
"""
Module with common functions for loading training data and preparing minibatches.
AI Music Technology Group, Sony Group Corporation
AI Speech and Sound Group, Sony Europe
This implementation originally belongs to Sony Group Corporation,
which has been introduced in the work "Automatic music mixing with deep learning and out-of-domain data".
Original repo link: https://github.com/sony/FxNorm-automix
"""
import numpy as np
import os
import sys
import functools
import scipy.io.wavfile as wav
import soundfile as sf
from typing import Tuple
currentdir = os.path.dirname(os.path.realpath(__file__))
sys.path.append(currentdir)
from common_audioeffects import AugmentationChain
from common_miscellaneous import uprint
def load_wav(file_path, mmap=False, convert_float=False):
"""
Load a WAV file in C_CONTIGUOUS format.
Args:
file_path: Path to WAV file (16bit, 24bit or 32bit PCM supported)
mmap: If `True`, then we do not load the WAV data into memory but use a memory-mapped representation
Returns:
fs: Sample rate
samples: Numpy array (np.int16 or np.int32) with audio [n_samples x n_channels]
"""
fs, samples = wav.read(file_path, mmap=mmap)
# ensure that we have a 2d array (monaural files are just loaded as vectors)
if samples.ndim == 1:
samples = samples[:, np.newaxis]
# make sure that we have loaded an integer PCM WAV file as we assume this later
# when we scale the amplitude
assert(samples.dtype == np.int16 or samples.dtype == np.int32)
if convert_float:
conversion_scale = 1. / (1. + np.iinfo(samples.dtype).max)
samples = samples.astype(dtype=np.float32) * conversion_scale
return fs, samples
def save_wav(file_path, fs, samples, subtype='PCM_16'):
"""
Save a WAV file (16bit or 32bit PCM).
Important note: We save here using the same conversion as is used in
`generate_data`, i.e., we multiply by `1 + np.iinfo(np.int16).max`
or `1 + np.iinfo(np.int32).max` which is a different behavior
than `libsndfile` as described here:
http://www.mega-nerd.com/libsndfile/FAQ.html#Q010
Args:
file_path: Path where to store the WAV file
fs: Sample rate
samples: Numpy array (float32 with values in [-1, 1) and shape [n_samples x n_channels])
subtype: Either `PCM_16` or `PCM_24` or `PCM_32` in order to store as 16bit, 24bit or 32bit PCM file
"""
assert subtype in ['PCM_16', 'PCM_24', 'PCM_32'], subtype
if subtype == 'PCM_16':
dtype = np.int16
else:
dtype = np.int32
# convert to int16 (check for clipping)
samples = samples * (1 + np.iinfo(dtype).max)
if np.min(samples) < np.iinfo(dtype).min or np.max(samples) > np.iinfo(dtype).max:
uprint(f'WARNING: Clipping occurs for {file_path}.')
samples_ = samples / (1 + np.iinfo(dtype).max)
print('max value ', np.max(np.abs(samples_)))
samples = np.clip(samples, np.iinfo(dtype).min, np.iinfo(dtype).max)
samples = samples.astype(dtype)
# store WAV file
sf.write(file_path, samples, fs, subtype=subtype)
def load_files_lists(path):
"""
Auxiliary function to find the paths for all mixtures in a database.
Args:
path: path to the folder containing the files to list
Returns:
list_of_directories: list of directories (= list of songs) in `path`
"""
# get directories in `path`
list_of_directories = []
for folder in os.listdir(path):
list_of_directories.append(folder)
return list_of_directories
def create_dataset(path, accepted_sampling_rates, sources, mapped_sources, n_channels=-1, load_to_memory=False,
debug=False, verbose=False):
"""
Prepare data in `path` for training/validation/test set generation.
Args:
path: path to the dataset
accepted_sampling_rates: list of accepted sampling rates
sources: list of sources
mapped_sources: list of mapped sources
n_channels: number of channels
load_to_memory: whether to load to main memory
debug: if `True`, then we load only `NUM_SAMPLES_SMALL_DATASET`
Raises:
ValueError: mapping of sources not possible is data is not loaded into memory
Returns:
data: list of dictionaries with function handles (to load the data)
directories: list of directories
"""
NUM_SAMPLES_SMALL_DATASET = 16
# source mapping currently only works if we load everything into the memory
if mapped_sources and not load_to_memory:
raise ValueError('Mapping of sources only supported if data is loaded into the memory.')
# get directories for dataset
directories = load_files_lists(path)
# load all songs for dataset
if debug:
data = [dict() for _x in range(np.minimum(NUM_SAMPLES_SMALL_DATASET, len(directories)))]
else:
data = [dict() for _x in range(len(directories))]
material_length = {} # in seconds
for i, d in enumerate(directories):
if verbose:
uprint(f'Processing mixture ({i+1} of {len(directories)}): {d}')
# add names of all files in this folder
files = os.listdir(os.path.join(path, d))
for f in files:
src_name = os.path.splitext(f)[0]
if ((src_name not in sources
and src_name not in mapped_sources)):
if verbose:
uprint(f'\tIgnoring unknown source from file {f}')
else:
if src_name not in sources:
src_name = mapped_sources[src_name]
if verbose:
uprint(f'\tAdding function handle for "{src_name}" from file {f}')
_data = load_wav(os.path.join(path, d, f), mmap=not load_to_memory)
# determine properties from loaded data
_samplingrate = _data[0]
_n_channels = _data[1].shape[1]
_duration = _data[1].shape[0] / _samplingrate
# collect statistics about data for each source
if src_name in material_length:
material_length[src_name] += _duration
else:
material_length[src_name] = _duration
# make sure that sample rate and number of channels matches
if n_channels != -1 and _n_channels != n_channels:
raise ValueError(f'File has {_n_channels} '
f'channels but expected {n_channels}.')
if _samplingrate not in accepted_sampling_rates:
raise ValueError(f'File has fs = {_samplingrate}Hz '
f'but expected {accepted_sampling_rates}Hz.')
# if we already loaded data for this source then append data
if src_name in data[i]:
_data = (_data[0], np.vstack((_data[1],
data[i][src_name].keywords['file_path_or_data'][1])))
data[i][src_name] = functools.partial(generate_data,
file_path_or_data=_data)
if debug and i == NUM_SAMPLES_SMALL_DATASET-1:
# load only first `NUM_SAMPLES_SMALL_DATASET` songs
break
# delete all entries where we did not find an source file
idx_empty = [_ for _ in range(len(data)) if len(data[_]) == 0]
for idx in sorted(idx_empty, reverse=True):
del data[idx]
return data, directories
def create_dataset_mixing(path, accepted_sampling_rates, sources, mapped_sources, n_channels=-1, load_to_memory=False,
debug=False, pad_wrap_samples=None):
"""
Prepare data in `path` for training/validation/test set generation.
Args:
path: path to the dataset
accepted_sampling_rates: list of accepted sampling rates
sources: list of sources
mapped_sources: list of mapped sources
n_channels: number of channels
load_to_memory: whether to load to main memory
debug: if `True`, then we load only `NUM_SAMPLES_SMALL_DATASET`
Raises:
ValueError: mapping of sources not possible is data is not loaded into memory
Returns:
data: list of dictionaries with function handles (to load the data)
directories: list of directories
"""
NUM_SAMPLES_SMALL_DATASET = 16
# source mapping currently only works if we load everything into the memory
if mapped_sources and not load_to_memory:
raise ValueError('Mapping of sources only supported if data is loaded into the memory.')
# get directories for dataset
directories = load_files_lists(path)
directories.sort()
# load all songs for dataset
uprint(f'\nCreating dataset for path={path} ...')
if debug:
data = [dict() for _x in range(np.minimum(NUM_SAMPLES_SMALL_DATASET, len(directories)))]
else:
data = [dict() for _x in range(len(directories))]
material_length = {} # in seconds
for i, d in enumerate(directories):
uprint(f'Processing mixture ({i+1} of {len(directories)}): {d}')
# add names of all files in this folder
files = os.listdir(os.path.join(path, d))
_data_mix = []
_stems_name = []
for f in files:
src_name = os.path.splitext(f)[0]
if ((src_name not in sources
and src_name not in mapped_sources)):
uprint(f'\tIgnoring unknown source from file {f}')
else:
if src_name not in sources:
src_name = mapped_sources[src_name]
uprint(f'\tAdding function handle for "{src_name}" from file {f}')
_data = load_wav(os.path.join(path, d, f), mmap=not load_to_memory)
if pad_wrap_samples:
_data = (_data[0], np.pad(_data[1], [(pad_wrap_samples, 0), (0,0)], 'wrap'))
# determine properties from loaded data
_samplingrate = _data[0]
_n_channels = _data[1].shape[1]
_duration = _data[1].shape[0] / _samplingrate
# collect statistics about data for each source
if src_name in material_length:
material_length[src_name] += _duration
else:
material_length[src_name] = _duration
# make sure that sample rate and number of channels matches
if n_channels != -1 and _n_channels != n_channels:
if _n_channels == 1: # Converts mono to stereo with repeated channels
_data = (_data[0], np.repeat(_data[1], 2, axis=-1))
print("Converted file to stereo by repeating mono channel")
else:
raise ValueError(f'File has {_n_channels} '
f'channels but expected {n_channels}.')
if _samplingrate not in accepted_sampling_rates:
raise ValueError(f'File has fs = {_samplingrate}Hz '
f'but expected {accepted_sampling_rates}Hz.')
# if we already loaded data for this source then append data
if src_name in data[i]:
_data = (_data[0], np.vstack((_data[1],
data[i][src_name].keywords['file_path_or_data'][1])))
_data_mix.append(_data)
_stems_name.append(src_name)
data[i]["-".join(_stems_name)] = functools.partial(generate_data,
file_path_or_data=_data_mix)
if debug and i == NUM_SAMPLES_SMALL_DATASET-1:
# load only first `NUM_SAMPLES_SMALL_DATASET` songs
break
# delete all entries where we did not find an source file
idx_empty = [_ for _ in range(len(data)) if len(data[_]) == 0]
for idx in sorted(idx_empty, reverse=True):
del data[idx]
uprint(f'Finished preparation of dataset. '
f'Found in total the following material (in {len(data)} directories):')
for src in material_length:
uprint(f'\t{src}: {material_length[src] / 60.0 / 60.0:.2f} hours')
return data, directories
def generate_data(file_path_or_data, random_sample_size=None):
"""
Load one stem/several stems specified by `file_path_or_data`.
Alternatively, can also be the result of `wav.read()` if the data has already been loaded previously.
If `file_path_or_data` is a tuple/list, then we load several files and will return also a tuple/list.
This is useful for cases where we want to make sure to have the same random chunk for several stems.
If `random_sample_chunk_size` is not None, then only `random_sample_chunk_size` samples are randomly selected.
Args:
file_path_or_data: either path to data or the data itself
random_sample_size: if `random_sample_size` is not None, only `random_sample_size` samples are randomly selected
Returns:
samples: data with size `num_samples x num_channels` or a list of samples
"""
needs_wrapping = False
if isinstance(file_path_or_data, str):
needs_wrapping = True # single file path -> wrap
if ((type(file_path_or_data[0]) is not list
and type(file_path_or_data[0]) is not tuple)):
needs_wrapping = True # single data -> wrap
if needs_wrapping:
file_path_or_data = (file_path_or_data,)
# create list where we store all samples
samples = [None] * len(file_path_or_data)
# load samples from wav file
for i, fpod in enumerate(file_path_or_data):
if isinstance(fpod, str):
_fs, samples[i] = load_wav(fpod)
else:
_fs, samples[i] = fpod
# if `random_sample_chunk_size` is not None, then only select subset
if random_sample_size is not None:
# get maximum length of all stems (at least `random_sample_chunk_size`)
max_length = random_sample_size
for s in samples:
max_length = np.maximum(max_length, s.shape[0])
# make sure that we can select enough audio and that all have the same length `max_length`
# (for short loops, `random_sample_chunk_size` can be larger than `s.shape[0]`)
for i, s in enumerate(samples):
if s.shape[0] < max_length:
required_padding = max_length - s.shape[0]
zeros = np.zeros((required_padding // 2 + 1, s.shape[1]),
dtype=s.dtype, order='F')
samples[i] = np.concatenate([zeros, s, zeros])
# select random part of audio
idx_start = np.random.randint(max_length)
for i, s in enumerate(samples):
if idx_start + random_sample_size < s.shape[0]:
samples[i] = s[idx_start:idx_start + random_sample_size]
else:
samples[i] = np.concatenate([s[idx_start:],
s[:random_sample_size - (s.shape[0] - idx_start)]])
# convert from `int16/int32` to `float32` precision (this will also make a copy)
for i, s in enumerate(samples):
conversion_scale = 1. / (1. + np.iinfo(s.dtype).max)
samples[i] = s.astype(dtype=np.float32) * conversion_scale
if len(samples) == 1:
return samples[0]
else:
return samples
def create_minibatch(data: list, sources: list,
present_prob: dict, overlap_prob: dict,
augmenter: AugmentationChain, augmenter_padding: Tuple[int],
batch_size: int, n_samples: int, n_channels: int, idx_songs: dict):
"""
Create a minibatch.
This function also handles the case that we do not have a source in one mixture.
This can, e.g., happen for instrumental pieces that do not have vocals.
Args:
data (list): data to create the minibatch from.
sources (list): list of sources.
present_prob (dict): probability of a source to be present.
overlap_prob (dict): probability of overlap.
augmenter (AugmentationChain): audio effect chain that we want to apply for data augmentation
augmenter_padding (tuple of ints): padding that we should apply to left/right side of data to avoid
boundary effects of `augmenter`.
batch_size (int): number of training samples in one minibatch.
n_samples (int): number of time samples.
n_channels (int): number of channels.
idx_songs (dict): index of songs.
Returns:
inp (Numpy array): minibatch, input to the network (i.e. the mixture) of size
`batch_size x n_samples x n_channels`
tar (dict with Numpy arrays): dictionary which contains for each source the targets,
each of the `c_contiguous` ndarrays is `batch_size x n_samples x n_channels`
"""
# initialize numpy arrays which keep input/targets
shp = (batch_size, n_samples, n_channels)
inp = np.zeros(shape=shp, dtype=np.float32, order='C')
tar = {src: np.zeros(shape=shp, dtype=np.float32, order='C') for src in sources}
# use padding to avoid boundary effects of augmenter
pad_left = None if augmenter_padding[0] == 0 else augmenter_padding[0]
pad_right = None if augmenter_padding[1] == 0 else -augmenter_padding[1]
def augm(i, s, n):
return augmenter(data[i][s](random_sample_size=n+sum(augmenter_padding)))[pad_left:pad_right]
# create mini-batch
for src in sources:
for j in range(batch_size):
# get song index for this source
_idx_song = idx_songs[src][j]
# determine whether this source is present/whether we overlap
is_present = src not in present_prob or np.random.rand() < present_prob[src]
is_overlap = src in overlap_prob and np.random.rand() < overlap_prob[src]
# if song contains source, then add it to input/targetg]
if src in data[_idx_song] and is_present:
tar[src][j, ...] = augm(_idx_song, src, n_samples)
# overlap source with same source from randomly choosen other song
if is_overlap:
idx_overlap_ = np.random.randint(len(data))
if idx_overlap_ != _idx_song and src in data[idx_overlap_]:
tar[src][j, ...] += augm(idx_overlap_, src, n_samples)
# compute input
inp += tar[src]
# make sure that all have not too large amplitude (check only mixture)
maxabs_amp = np.maximum(1.0, 1e-6 + np.max(np.abs(inp), axis=(1, 2), keepdims=True))
inp /= maxabs_amp
for src in sources:
tar[src] /= maxabs_amp
return inp, tar
def create_minibatch_mixing(data: list, sources: list, inputs: list, outputs: list,
present_prob: dict, overlap_prob: dict,
augmenter: AugmentationChain, augmenter_padding: Tuple[int], augmenter_sources: list,
batch_size: int, n_samples: int, n_channels: int, idx_songs: dict):
"""
Create a minibatch.
This function also handles the case that we do not have a source in one mixture.
This can, e.g., happen for instrumental pieces that do not have vocals.
Args:
data (list): data to create the minibatch from.
sources (list): list of sources.
present_prob (dict): probability of a source to be present.
overlap_prob (dict): probability of overlap.
augmenter (AugmentationChain): audio effect chain that we want to apply for data augmentation
augmenter_padding (tuple of ints): padding that we should apply to left/right side of data to avoid
boundary effects of `augmenter`.
augmenter_sources (list): list of sources to augment
batch_size (int): number of training samples in one minibatch.
n_samples (int): number of time samples.
n_channels (int): number of channels.
idx_songs (dict): index of songs.
Returns:
inp (Numpy array): minibatch, input to the network (i.e. the mixture) of size
`batch_size x n_samples x n_channels`
tar (dict with Numpy arrays): dictionary which contains for each source the targets,
each of the `c_contiguous` ndarrays is `batch_size x n_samples x n_channels`
"""
# initialize numpy arrays which keep input/targets
shp = (batch_size, n_samples, n_channels)
stems = {src: np.zeros(shape=shp, dtype=np.float32, order='C') for src in inputs}
mix = {src: np.zeros(shape=shp, dtype=np.float32, order='C') for src in outputs}
# use padding to avoid boundary effects of augmenter
pad_left = None if augmenter_padding[0] == 0 else augmenter_padding[0]
pad_right = None if augmenter_padding[1] == 0 else -augmenter_padding[1]
def augm(i, n):
s = list(data[i])[0]
input_multitracks = data[i][s](random_sample_size=n+sum(augmenter_padding))
audio_tags = list(data[i])[0].split("-")
# Only applies augmentation to inputs, not output.
for k, tag in enumerate(audio_tags):
if tag in augmenter_sources:
input_multitracks[k] = augmenter(input_multitracks[k])[pad_left:pad_right]
else:
input_multitracks[k] = input_multitracks[k][pad_left:pad_right]
return input_multitracks
# create mini-batch
for src in outputs:
for j in range(batch_size):
# get song index for this source
_idx_song = idx_songs[src][j]
multitrack_audio = augm(_idx_song, n_samples)
audio_tags = list(data[_idx_song])[0].split("-")
for i, tag in enumerate(audio_tags):
if tag in inputs:
stems[tag][j, ...] = multitrack_audio[i]
if tag in outputs:
mix[tag][j, ...] = multitrack_audio[i]
return stems, mix