Spaces:
Running
Running
import os | |
import random | |
from typing import Optional, Callable | |
import json | |
import glob | |
import csv | |
import numpy as np | |
import torch | |
import librosa | |
import pyloudnorm as pyln | |
from pedalboard import Pedalboard, Limiter, Gain, Compressor, Clipping | |
from .dataset import ( | |
MusdbTrainDataset, | |
MusdbValidDataset, | |
apply_limitaug, | |
# apply_limitaug_loudnorm, | |
) | |
from utils import ( | |
load_wav_arbitrary_position_stereo, | |
load_wav_specific_position_stereo, | |
db2linear, | |
str2bool, | |
) | |
class DelimitTrainDataset(MusdbTrainDataset): | |
def __init__( | |
self, | |
target: str = "all", | |
root: str = None, | |
seq_duration: Optional[float] = 6.0, | |
samples_per_track: int = 64, | |
source_augmentations: Optional[Callable] = lambda audio: audio, | |
sample_rate: int = 44100, | |
seed: int = 42, | |
limitaug_method: str = "limitaug", | |
limitaug_mode: str = "normal_L", | |
limitaug_custom_target_lufs: float = None, | |
limitaug_custom_target_lufs_std: float = None, | |
target_loudnorm_lufs: float = -14.0, | |
target_limitaug_mode: str = None, | |
target_limitaug_custom_target_lufs: float = None, | |
target_limitaug_custom_target_lufs_std: float = None, | |
custom_limiter_attack_range: list = [2.0, 2.0], | |
custom_limiter_release_range: list = [200.0, 200.0], | |
*args, | |
**kwargs, | |
) -> None: | |
super().__init__( | |
target=target, | |
root=root, | |
seq_duration=seq_duration, | |
samples_per_track=samples_per_track, | |
source_augmentations=source_augmentations, | |
sample_rate=sample_rate, | |
seed=seed, | |
limitaug_method=limitaug_method, | |
limitaug_mode=limitaug_mode, | |
limitaug_custom_target_lufs=limitaug_custom_target_lufs, | |
limitaug_custom_target_lufs_std=limitaug_custom_target_lufs_std, | |
target_loudnorm_lufs=target_loudnorm_lufs, | |
custom_limiter_attack_range=custom_limiter_attack_range, | |
custom_limiter_release_range=custom_limiter_release_range, | |
*args, | |
**kwargs, | |
) | |
self.target_limitaug_mode = target_limitaug_mode | |
self.target_limitaug_custom_target_lufs = (target_limitaug_custom_target_lufs,) | |
self.target_limitaug_custom_target_lufs_std = ( | |
target_limitaug_custom_target_lufs_std, | |
) | |
self.limitaug_mode_statistics["target_custom"] = [ | |
target_limitaug_custom_target_lufs, | |
target_limitaug_custom_target_lufs_std, | |
] | |
""" | |
Parameters | |
---------- | |
limitaug_method : str | |
choose from ["linear_gain_increase", "limitaug", "limitaug_then_loudnorm", "only_loudnorm"] | |
limitaug_mode : str | |
choose from ["uniform", "normal", "normal_L", "normal_XL", "normal_short_term", "normal_L_short_term", "normal_XL_short_term", "custom"] | |
limitaug_custom_target_lufs : float | |
valid only when | |
limitaug_mode == "custom" | |
target_loudnorm_lufs : float | |
valid only when | |
limitaug_method == 'limitaug_then_loudnorm' or 'only_loudnorm' | |
default is -14. | |
To the best of my knowledge, Spotify and Youtube music is using -14 as a reference loudness normalization level. | |
No special reason for the choice of -14 as target_loudnorm_lufs. | |
target : str | |
target name of the source to be separated, defaults to ``vocals``. | |
root : str | |
root path of MUSDB | |
seq_duration : float | |
training is performed in chunks of ``seq_duration`` (in seconds, | |
defaults to ``None`` which loads the full audio track | |
samples_per_track : int | |
sets the number of samples, yielded from each track per epoch. | |
Defaults to 64 | |
source_augmentations : list[callables] | |
provide list of augmentation function that take a multi-channel | |
audio file of shape (src, samples) as input and output. Defaults to | |
no-augmentations (input = output) | |
seed : int | |
control randomness of dataset iterations | |
args, kwargs : additional keyword arguments | |
used to add further control for the musdb dataset | |
initialization function. | |
""" | |
# Get a limitaug result without target (individual stem source) | |
def get_limitaug_mixture(self, mixture): | |
if self.limitaug_method == "limitaug": | |
self.board[1].release_ms = random.uniform(30.0, 200.0) | |
target_lufs = self.sample_target_lufs() | |
mixture_limited, mixture_lufs = apply_limitaug( | |
mixture, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=target_lufs, | |
) | |
elif self.limitaug_method == "limitaug_then_loudnorm": | |
self.board[1].release_ms = random.uniform(30.0, 200.0) | |
target_lufs = self.sample_target_lufs() | |
mixture_limited, mixture_lufs = ( | |
apply_limitaug( | |
mixture, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=target_lufs, | |
target_loudnorm_lufs=self.target_loudnorm_lufs, | |
), | |
) | |
# Apply LimitAug using Custom Limiter | |
elif self.limitaug_method == "custom_limiter_limitaug": | |
# Change attack time of First compressor of the Limiter | |
self.board[1].attack_ms = random.uniform( | |
self.custom_limiter_attack_range[0], self.custom_limiter_attack_range[1] | |
) | |
# Change release time of First compressor of the Limiter | |
self.board[1].release_ms = random.uniform( | |
self.custom_limiter_release_range[0], | |
self.custom_limiter_release_range[1], | |
) | |
# Change release time of Second compressor of the Limiter | |
self.board[2].release_ms = random.uniform(30.0, 200.0) | |
target_lufs = self.sample_target_lufs() | |
mixture_limited, mixture_lufs = apply_limitaug( | |
mixture, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=target_lufs, | |
target_loudnorm_lufs=self.target_loudnorm_lufs, | |
) | |
# When we want to force NN to output an appropriately compressed target output | |
if self.target_limitaug_mode: | |
mixture_target_lufs = random.gauss( | |
self.limitaug_mode_statistics[self.target_limitaug_mode][0], | |
self.limitaug_mode_statistics[self.target_limitaug_mode][1], | |
) | |
mixture, target_lufs = apply_limitaug( | |
mixture, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=mixture_target_lufs, | |
loudness=mixture_lufs, | |
) | |
if np.isinf(mixture_lufs): | |
mixture_loudnorm = mixture | |
else: | |
augmented_gain = self.target_loudnorm_lufs - mixture_lufs | |
mixture_loudnorm = mixture * db2linear(augmented_gain, eps=0.0) | |
return mixture_limited, mixture_loudnorm | |
def __getitem__(self, index): | |
audio_sources = [] | |
for k, source in enumerate(self.sources): | |
# memorize index of target source | |
if source == self.target: # if source is 'vocals' | |
track_path = self.train_list[ | |
index // self.samples_per_track | |
] # we want to use # training samples per each track. | |
audio_path = f"{track_path}/{source}.wav" | |
audio = load_wav_arbitrary_position_stereo( | |
audio_path, self.sample_rate, self.seq_duration | |
) | |
else: | |
track_path = random.choice(self.train_list) | |
audio_path = f"{track_path}/{source}.wav" | |
audio = load_wav_arbitrary_position_stereo( | |
audio_path, self.sample_rate, self.seq_duration | |
) | |
audio = self.source_augmentations(audio) | |
audio_sources.append(audio) | |
stems = np.stack(audio_sources, axis=0) | |
# apply linear mix over source index=0 | |
# and here, linear mixture is a target unlike in MusdbTrainDataset | |
mixture = stems.sum(0) | |
# target_lufs = self.sample_target_lufs() | |
mixture_limited, mixture_loudnorm = self.get_limitaug_mixture(mixture) | |
# # We will give mixture_limited as an input and mixture_loudnorm as a target to the model. | |
mixture_limited = np.clip(mixture_limited, -1.0, 1.0) | |
mixture_limited = torch.as_tensor(mixture_limited, dtype=torch.float32) | |
mixture_loudnorm = torch.as_tensor(mixture_loudnorm, dtype=torch.float32) | |
return mixture_limited, mixture_loudnorm | |
class OzoneTrainDataset(DelimitTrainDataset): | |
def __init__( | |
self, | |
target: str = "all", | |
root: str = None, | |
ozone_root: str = None, | |
use_fixed: float = 0.1, # ratio of fixed samples | |
seq_duration: Optional[float] = 6.0, | |
samples_per_track: int = 64, | |
source_augmentations: Optional[Callable] = lambda audio: audio, | |
sample_rate: int = 44100, | |
seed: int = 42, | |
limitaug_method: str = "limitaug", | |
limitaug_mode: str = "normal_L", | |
limitaug_custom_target_lufs: float = None, | |
limitaug_custom_target_lufs_std: float = None, | |
target_loudnorm_lufs: float = -14.0, | |
target_limitaug_mode: str = None, | |
target_limitaug_custom_target_lufs: float = None, | |
target_limitaug_custom_target_lufs_std: float = None, | |
custom_limiter_attack_range: list = [2.0, 2.0], | |
custom_limiter_release_range: list = [200.0, 200.0], | |
*args, | |
**kwargs, | |
) -> None: | |
super().__init__( | |
target, | |
root, | |
seq_duration, | |
samples_per_track, | |
source_augmentations, | |
sample_rate, | |
seed, | |
limitaug_method, | |
limitaug_mode, | |
limitaug_custom_target_lufs, | |
limitaug_custom_target_lufs_std, | |
target_loudnorm_lufs, | |
target_limitaug_mode, | |
target_limitaug_custom_target_lufs, | |
target_limitaug_custom_target_lufs_std, | |
custom_limiter_attack_range, | |
custom_limiter_release_range, | |
*args, | |
**kwargs, | |
) | |
self.ozone_root = ozone_root | |
self.use_fixed = use_fixed | |
self.list_train_fixed = glob.glob(f"{self.ozone_root}/ozone_train_fixed/*.wav") | |
# self.list_train_random = glob.glob( | |
# f"{self.ozone_root}/ozone_train_random/*.wav" | |
# ) | |
# self.dict_train_random = {} | |
self.list_dict_train_random = [] | |
# Load information of pre-generated random training examples | |
list_csv_files = glob.glob(f"{self.ozone_root}/ozone_train_random_*.csv") | |
list_csv_files.sort() | |
for csv_file in list_csv_files: | |
with open(csv_file, "r") as f: | |
reader = csv.reader(f) | |
next(reader) | |
for row in reader: | |
self.list_dict_train_random.append( | |
{ | |
row[0]: { | |
"max_threshold": float(row[1]), | |
"max_character": float(row[2]), | |
"vocals": { | |
"name": row[3], | |
"start_sec": float(row[4]), | |
"gain": float(row[5]), | |
"channelswap": str2bool(row[6]), | |
}, | |
"bass": { | |
"name": row[7], | |
"start_sec": float(row[8]), | |
"gain": float(row[9]), | |
"channelswap": str2bool(row[10]), | |
}, | |
"drums": { | |
"name": row[11], | |
"start_sec": float(row[12]), | |
"gain": float(row[13]), | |
"channelswap": str2bool(row[14]), | |
}, | |
"other": { | |
"name": row[15], | |
"start_sec": float(row[16]), | |
"gain": float(row[17]), | |
"channelswap": str2bool(row[18]), | |
}, | |
} | |
} | |
) | |
# self.dict_train_random[row[0]] = { | |
# "max_threshold": float(row[1]), | |
# "max_character": float(row[2]), | |
# "vocals": { | |
# "name": row[3], | |
# "start_sec": float(row[4]), | |
# "gain": float(row[5]), | |
# "channelswap": str2bool(row[6]), | |
# }, | |
# "bass": { | |
# "name": row[7], | |
# "start_sec": float(row[8]), | |
# "gain": float(row[9]), | |
# "channelswap": str2bool(row[10]), | |
# }, | |
# "drums": { | |
# "name": row[11], | |
# "start_sec": float(row[12]), | |
# "gain": float(row[13]), | |
# "channelswap": str2bool(row[14]), | |
# }, | |
# "other": { | |
# "name": row[15], | |
# "start_sec": float(row[16]), | |
# "gain": float(row[17]), | |
# "channelswap": str2bool(row[18]), | |
# }, | |
# } | |
def __getitem__(self, idx): | |
use_fixed_prob = random.random() | |
if use_fixed_prob <= self.use_fixed: | |
# Fixed examples | |
audio_path = random.choice(self.list_train_fixed) | |
song_name = os.path.basename(audio_path).replace(".wav", "") | |
mixture_limited, start_pos_sec = load_wav_arbitrary_position_stereo( | |
audio_path, self.sample_rate, self.seq_duration, return_pos=True | |
) | |
audio_sources = [] | |
track_path = f"{self.root}/train/{song_name}" | |
for source in self.sources: | |
audio_path = f"{track_path}/{source}.wav" | |
audio = load_wav_specific_position_stereo( | |
audio_path, | |
self.sample_rate, | |
self.seq_duration, | |
start_position=start_pos_sec, | |
) | |
audio_sources.append(audio) | |
else: | |
# Random examples | |
# Load mixture_limited (pre-generated) | |
# audio_path = random.choice(self.list_train_random) | |
dict_seg = random.choice(self.list_dict_train_random) | |
seg_name = list(dict_seg.keys())[0] | |
audio_path = f"{self.ozone_root}/ozone_train_random/{seg_name}.wav" | |
dict_seg_info = dict_seg[seg_name] | |
# seg_name = os.path.basename(audio_path).replace(".wav", "") | |
mixture_limited, sr = librosa.load( | |
audio_path, sr=self.sample_rate, mono=False | |
) | |
# Load mixture_unlimited (from the original musdb18, using metadata) | |
audio_sources = [] | |
# dict_seg_info = self.dict_train_random[seg_name] | |
for source in self.sources: | |
# dict_seg_info = self.dict_train_random[seg_name] | |
dict_seg_source_info = dict_seg_info[source] | |
audio_path = ( | |
f"{self.root}/train/{dict_seg_source_info['name']}/{source}.wav" | |
) | |
audio = load_wav_specific_position_stereo( | |
audio_path, | |
self.sample_rate, | |
self.seq_duration, | |
start_position=dict_seg_source_info["start_sec"], | |
) | |
# apply augmentations | |
audio = audio * dict_seg_source_info["gain"] | |
if dict_seg_source_info["channelswap"]: | |
audio = np.flip(audio, axis=0) | |
audio_sources.append(audio) | |
stems = np.stack(audio_sources, axis=0) | |
mixture = stems.sum(axis=0) | |
mixture_lufs = self.meter.integrated_loudness(mixture.T) | |
if np.isinf(mixture_lufs): | |
mixture_loudnorm = mixture | |
else: | |
augmented_gain = self.target_loudnorm_lufs - mixture_lufs | |
mixture_loudnorm = mixture * db2linear(augmented_gain, eps=0.0) | |
return mixture_limited, mixture_loudnorm | |
# def __len__(self): | |
# return 100 | |
class DelimitValidDataset(MusdbValidDataset): | |
def __init__( | |
self, | |
target: str = "vocals", | |
root: str = None, | |
delimit_valid_root: str = None, | |
valid_target_lufs: float = -8.05, # From the Table 1 of the paper, the average loudness of commerical music. | |
target_loudnorm_lufs: float = -14.0, | |
delimit_valid_L_root: str = None, # This will be used when using the target as compressed (normal_L) mixture. | |
use_custom_limiter: bool = False, | |
custom_limiter_attack_range: list = [0.1, 10.0], | |
custom_limiter_release_range: list = [30.0, 200.0], | |
*args, | |
**kwargs, | |
) -> None: | |
super().__init__(target=target, root=root, *args, **kwargs) | |
self.delimit_valid_root = delimit_valid_root | |
if self.delimit_valid_root: | |
with open(f"{self.delimit_valid_root}/valid_loudness.json", "r") as f: | |
self.dict_valid_loudness = json.load(f) | |
self.delimit_valid_L_root = delimit_valid_L_root | |
if self.delimit_valid_L_root: | |
with open(f"{self.delimit_valid_L_root}/valid_loudness.json", "r") as f: | |
self.dict_valid_L_loudness = json.load(f) | |
self.valid_target_lufs = valid_target_lufs | |
self.target_loudnorm_lufs = target_loudnorm_lufs | |
self.meter = pyln.Meter(self.sample_rate) | |
self.use_custom_limiter = use_custom_limiter | |
if self.use_custom_limiter: | |
print("using Custom limiter limitaug for validation!!") | |
self.custom_limiter_attack_range = custom_limiter_attack_range | |
self.custom_limiter_release_range = custom_limiter_release_range | |
self.board = Pedalboard( | |
[ | |
Gain(gain_db=0.0), | |
Compressor( | |
threshold_db=-10.0, ratio=4.0, attack_ms=2.0, release_ms=200.0 | |
), # attack_ms and release_ms will be changed later. | |
Compressor( | |
threshold_db=0.0, | |
ratio=1000.0, | |
attack_ms=0.001, | |
release_ms=100.0, | |
), | |
Gain(gain_db=3.75), | |
Clipping(threshold_db=0.0), | |
] | |
) # This implementation is the same as JUCE Limiter. | |
# However, we want the first compressor to have a variable attack and release time. | |
# Therefore, we use the Custom Limiter instead of the JUCE Limiter. | |
else: | |
self.board = Pedalboard( | |
[Gain(gain_db=0.0), Limiter(threshold_db=0.0, release_ms=100.0)] | |
) # Currently, we are using a limiter with a release time of 100ms. | |
def __getitem__(self, index): | |
audio_sources = [] | |
target_ind = None | |
for k, source in enumerate(self.sources): | |
# memorize index of target source | |
if source == self.target: # if source is 'vocals' | |
target_ind = k | |
track_path = self.valid_list[index] | |
song_name = os.path.basename(track_path) | |
audio_path = f"{track_path}/{source}.wav" | |
# audio = utils.load_wav_stereo(audio_path, self.sample_rate) | |
audio = librosa.load(audio_path, mono=False, sr=self.sample_rate)[0] | |
else: | |
track_path = self.valid_list[index] | |
song_name = os.path.basename(track_path) | |
audio_path = f"{track_path}/{source}.wav" | |
# audio = utils.load_wav_stereo(audio_path, self.sample_rate) | |
audio = librosa.load(audio_path, mono=False, sr=self.sample_rate)[0] | |
audio = torch.as_tensor(audio, dtype=torch.float32) | |
audio_sources.append(audio) | |
stems = np.stack(audio_sources, axis=0) | |
# apply linear mix over source index=0 | |
# and here, linear mixture is a target unlike in MusdbTrainDataset | |
mixture = stems.sum(0) | |
if ( | |
self.delimit_valid_root | |
): # If there exists a pre-processed delimit valid dataset | |
audio_path = f"{self.delimit_valid_root}/valid/{song_name}.wav" | |
mixture_limited = librosa.load(audio_path, mono=False, sr=self.sample_rate)[ | |
0 | |
] | |
mixture_lufs = self.dict_valid_loudness[song_name] | |
else: | |
if self.use_custom_limiter: | |
custom_limiter_attack = random.uniform( | |
self.custom_limiter_attack_range[0], | |
self.custom_limiter_attack_range[1], | |
) | |
self.board[1].attack_ms = custom_limiter_attack | |
custom_limiter_release = random.uniform( | |
self.custom_limiter_release_range[0], | |
self.custom_limiter_release_range[1], | |
) | |
self.board[1].release_ms = custom_limiter_release | |
mixture_limited, mixture_lufs = apply_limitaug( | |
mixture, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=self.valid_target_lufs, | |
) | |
else: | |
mixture_limited, mixture_lufs = apply_limitaug( | |
mixture, | |
self.board, | |
self.meter, | |
self.sample_rate, | |
target_lufs=self.valid_target_lufs, | |
# target_loudnorm_lufs=self.target_loudnorm_lufs, | |
) # mixture_limited is a limiter applied mixture | |
# We will give mixture_limited as an input and mixture_loudnorm as a target to the model. | |
if self.delimit_valid_L_root: | |
audio_L_path = f"{self.delimit_valid_L_root}/valid/{song_name}.wav" | |
mixture_loudnorm = librosa.load( | |
audio_L_path, mono=False, sr=self.sample_rate | |
)[0] | |
mixture_lufs = self.dict_valid_L_loudness[song_name] | |
mixture = mixture_loudnorm | |
augmented_gain = self.target_loudnorm_lufs - mixture_lufs | |
mixture_loudnorm = mixture * db2linear(augmented_gain) | |
if self.use_custom_limiter: | |
return ( | |
mixture_limited, | |
mixture_loudnorm, | |
song_name, | |
mixture_lufs, | |
custom_limiter_attack, | |
custom_limiter_release, | |
) | |
else: | |
return mixture_limited, mixture_loudnorm, song_name, mixture_lufs | |
class OzoneValidDataset(MusdbValidDataset): | |
def __init__( | |
self, | |
target: str = "all", | |
root: str = None, | |
ozone_root: str = None, | |
target_loudnorm_lufs: float = -14.0, | |
*args, | |
**kwargs, | |
) -> None: | |
super().__init__(target=target, root=root, *args, **kwargs) | |
self.ozone_root = ozone_root | |
self.target_loudnorm_lufs = target_loudnorm_lufs | |
with open(f"{self.ozone_root}/valid_loudness.json", "r") as f: | |
self.dict_valid_loudness = json.load(f) | |
def __getitem__(self, index): | |
audio_sources = [] | |
track_path = self.valid_list[index] | |
song_name = os.path.basename(track_path) | |
for k, source in enumerate(self.sources): | |
audio_path = f"{track_path}/{source}.wav" | |
# audio = utils.load_wav_stereo(audio_path, self.sample_rate) | |
audio = librosa.load(audio_path, mono=False, sr=self.sample_rate)[0] | |
audio_sources.append(audio) | |
stems = np.stack(audio_sources, axis=0) | |
mixture = stems.sum(0) | |
audio_path = f"{self.ozone_root}/ozone_train_fixed/{song_name}.wav" | |
mixture_limited = librosa.load(audio_path, mono=False, sr=self.sample_rate)[0] | |
mixture_lufs = self.dict_valid_loudness[song_name] | |
augmented_gain = self.target_loudnorm_lufs - mixture_lufs | |
mixture_loudnorm = mixture * db2linear(augmented_gain) | |
return mixture_limited, mixture_loudnorm, song_name, mixture_lufs | |