# Copyright (c) 2023 Amphion. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import math import random import os import json import numpy as np import parselmouth import torch import torchaudio from tqdm import tqdm from audiomentations import TimeStretch from pedalboard import ( Pedalboard, HighShelfFilter, LowShelfFilter, PeakFilter, PitchShift, ) from utils.util import has_existed PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT = 0.0 PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT = 1.0 PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT = 1.0 PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT = 1.0 PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT = 1.0 def wav_to_Sound(wav, sr: int) -> parselmouth.Sound: """Convert a waveform to a parselmouth.Sound object Args: wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) sr (int, optional): sampling rate. Returns: parselmouth.Sound: a parselmouth.Sound object """ assert wav.shape == (1, len(wav[0])), "wav must be of shape (1, n_samples)" sound = None if isinstance(wav, np.ndarray): sound = parselmouth.Sound(wav[0], sampling_frequency=sr) elif isinstance(wav, torch.Tensor): sound = parselmouth.Sound(wav[0].numpy(), sampling_frequency=sr) assert sound is not None, "wav must be either np.ndarray or torch.Tensor" return sound def get_pitch_median(wav, sr: int): """Get the median pitch of a waveform Args: wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) sr (int, optional): sampling rate. Returns: parselmouth.Pitch, float: a parselmouth.Pitch object and the median pitch """ if not isinstance(wav, parselmouth.Sound): sound = wav_to_Sound(wav, sr) else: sound = wav pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT # To Pitch: Time step(s)(standard value: 0.0), Pitch floor (Hz)(standard value: 75), Pitch ceiling (Hz)(standard value: 600.0) pitch = parselmouth.praat.call(sound, "To Pitch", 0.8 / 75, 75, 600) # Get quantile: From time (s), To time (s), Quantile(0.5 is then the 50% quantile, i.e., the median), Units (Hertz or Bark) pitch_median = parselmouth.praat.call(pitch, "Get quantile", 0.0, 0.0, 0.5, "Hertz") return pitch, pitch_median def change_gender( sound, pitch=None, formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, new_pitch_median: float = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT, pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, ) -> parselmouth.Sound: """Invoke change gender function in praat Args: sound (parselmouth.Sound): a parselmouth.Sound object pitch (parselmouth.Pitch, optional): a parselmouth.Pitch object. Defaults to None. formant_shift_ratio (float, optional): formant shift ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch. Less than 1.0 means lower pitch. new_pitch_median (float, optional): new pitch median. pitch_range_ratio (float, optional): pitch range ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch range. Less than 1.0 means lower pitch range. duration_factor (float, optional): duration factor. A value of 1.0 means no change. Greater than 1.0 means longer duration. Less than 1.0 means shorter duration. Returns: parselmouth.Sound: a parselmouth.Sound object """ if pitch is None: new_sound = parselmouth.praat.call( sound, "Change gender", 75, 600, formant_shift_ratio, new_pitch_median, pitch_range_ratio, duration_factor, ) else: new_sound = parselmouth.praat.call( (sound, pitch), "Change gender", formant_shift_ratio, new_pitch_median, pitch_range_ratio, duration_factor, ) return new_sound def apply_formant_and_pitch_shift( sound: parselmouth.Sound, formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, pitch_shift_ratio: float = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT, pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, ) -> parselmouth.Sound: """use Praat "Changer gender" command to manipulate pitch and formant "Change gender": Praat -> Sound Object -> Convert -> Change gender refer to Help of Praat for more details # https://github.com/YannickJadoul/Parselmouth/issues/25#issuecomment-608632887 might help """ pitch = None new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT if pitch_shift_ratio != 1.0: pitch, pitch_median = get_pitch_median(sound, sound.sampling_frequency) new_pitch_median = pitch_median * pitch_shift_ratio # refer to https://github.com/praat/praat/issues/1926#issuecomment-974909408 pitch_minimum = parselmouth.praat.call( pitch, "Get minimum", 0.0, 0.0, "Hertz", "Parabolic" ) new_median = pitch_median * pitch_shift_ratio scaled_minimum = pitch_minimum * pitch_shift_ratio result_minimum = new_median + (scaled_minimum - new_median) * pitch_range_ratio if result_minimum < 0: new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT if math.isnan(new_pitch_median): new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT new_sound = change_gender( sound, pitch, formant_shift_ratio, new_pitch_median, pitch_range_ratio, duration_factor, ) return new_sound # Function used in EQ def pedalboard_equalizer(wav: np.ndarray, sr: int) -> np.ndarray: """Use pedalboard to do equalizer""" board = Pedalboard() cutoff_low_freq = 60 cutoff_high_freq = 10000 q_min = 2 q_max = 5 random_all_freq = True num_filters = 10 if random_all_freq: key_freqs = [random.uniform(1, 12000) for _ in range(num_filters)] else: key_freqs = [ power_ratio(float(z) / (num_filters - 1), cutoff_low_freq, cutoff_high_freq) for z in range(num_filters) ] q_values = [ power_ratio(random.uniform(0, 1), q_min, q_max) for _ in range(num_filters) ] gains = [random.uniform(-12, 12) for _ in range(num_filters)] # low-shelving filter board.append( LowShelfFilter( cutoff_frequency_hz=key_freqs[0], gain_db=gains[0], q=q_values[0] ) ) # peaking filters for i in range(1, 9): board.append( PeakFilter( cutoff_frequency_hz=key_freqs[i], gain_db=gains[i], q=q_values[i] ) ) # high-shelving filter board.append( HighShelfFilter( cutoff_frequency_hz=key_freqs[9], gain_db=gains[9], q=q_values[9] ) ) # Apply the pedalboard to the audio processed_audio = board(wav, sr) return processed_audio def power_ratio(r: float, a: float, b: float): return a * math.pow((b / a), r) def audiomentations_time_stretch(wav: np.ndarray, sr: int) -> np.ndarray: """Use audiomentations to do time stretch""" transform = TimeStretch( min_rate=0.8, max_rate=1.25, leave_length_unchanged=False, p=1.0 ) augmented_wav = transform(wav, sample_rate=sr) return augmented_wav def formant_and_pitch_shift( sound: parselmouth.Sound, fs: bool, ps: bool ) -> parselmouth.Sound: """ """ formant_shift_ratio = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT pitch_shift_ratio = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT assert fs != ps, "fs, ps are mutually exclusive" if fs: formant_shift_ratio = random.uniform(1.0, 1.4) use_reciprocal = random.uniform(-1, 1) > 0 if use_reciprocal: formant_shift_ratio = 1.0 / formant_shift_ratio # only use praat to change formant new_sound = apply_formant_and_pitch_shift( sound, formant_shift_ratio=formant_shift_ratio, ) return new_sound if ps: board = Pedalboard() board.append(PitchShift(random.uniform(-12, 12))) wav_numpy = sound.values wav_numpy = board(wav_numpy, sound.sampling_frequency) # use pedalboard to change pitch new_sound = parselmouth.Sound( wav_numpy, sampling_frequency=sound.sampling_frequency ) return new_sound def wav_manipulation( wav: torch.Tensor, sr: int, aug_type: str = "None", formant_shift: bool = False, pitch_shift: bool = False, time_stretch: bool = False, equalizer: bool = False, ) -> torch.Tensor: assert aug_type == "None" or aug_type in [ "formant_shift", "pitch_shift", "time_stretch", "equalizer", ], "aug_type must be one of formant_shift, pitch_shift, time_stretch, equalizer" assert aug_type == "None" or ( formant_shift == False and pitch_shift == False and time_stretch == False and equalizer == False ), "if aug_type is specified, other argument must be False" if aug_type != "None": if aug_type == "formant_shift": formant_shift = True if aug_type == "pitch_shift": pitch_shift = True if aug_type == "equalizer": equalizer = True if aug_type == "time_stretch": time_stretch = True wav_numpy = wav.numpy() if equalizer: wav_numpy = pedalboard_equalizer(wav_numpy, sr) if time_stretch: wav_numpy = audiomentations_time_stretch(wav_numpy, sr) sound = wav_to_Sound(wav_numpy, sr) if formant_shift or pitch_shift: sound = formant_and_pitch_shift(sound, formant_shift, pitch_shift) wav = torch.from_numpy(sound.values).float() # shape (1, n_samples) return wav def augment_dataset(cfg, dataset) -> list: """Augment dataset with formant_shift, pitch_shift, time_stretch, equalizer Args: cfg (dict): configuration dataset (str): dataset name Returns: list: augmented dataset names """ # load metadata dataset_path = os.path.join(cfg.preprocess.processed_dir, dataset) split = ["train", "test"] if "eval" not in dataset else ["test"] augment_datasets = [] aug_types = [ "formant_shift" if cfg.preprocess.use_formant_shift else None, "pitch_shift" if cfg.preprocess.use_pitch_shift else None, "time_stretch" if cfg.preprocess.use_time_stretch else None, "equalizer" if cfg.preprocess.use_equalizer else None, ] aug_types = filter(None, aug_types) for aug_type in aug_types: print("Augmenting {} with {}...".format(dataset, aug_type)) new_dataset = dataset + "_" + aug_type augment_datasets.append(new_dataset) new_dataset_path = os.path.join(cfg.preprocess.processed_dir, new_dataset) for dataset_type in split: metadata_path = os.path.join(dataset_path, "{}.json".format(dataset_type)) augmented_metadata = [] new_metadata_path = os.path.join( new_dataset_path, "{}.json".format(dataset_type) ) os.makedirs(new_dataset_path, exist_ok=True) new_dataset_wav_dir = os.path.join(new_dataset_path, "wav") os.makedirs(new_dataset_wav_dir, exist_ok=True) if has_existed(new_metadata_path): continue with open(metadata_path, "r") as f: metadata = json.load(f) for utt in tqdm(metadata): original_wav_path = utt["Path"] original_wav, sr = torchaudio.load(original_wav_path) new_wav = wav_manipulation(original_wav, sr, aug_type=aug_type) new_wav_path = os.path.join(new_dataset_wav_dir, utt["Uid"] + ".wav") torchaudio.save(new_wav_path, new_wav, sr) new_utt = { "Dataset": utt["Dataset"] + "_" + aug_type, "index": utt["index"], "Singer": utt["Singer"], "Uid": utt["Uid"], "Path": new_wav_path, "Duration": utt["Duration"], } augmented_metadata.append(new_utt) new_metadata_path = os.path.join( new_dataset_path, "{}.json".format(dataset_type) ) with open(new_metadata_path, "w") as f: json.dump(augmented_metadata, f, indent=4, ensure_ascii=False) return augment_datasets