# Copyright (c) 2023 Amphion. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import os import numpy as np import torch import torchaudio def save_feature(process_dir, feature_dir, item, feature, overrides=True): """Save features to path Args: process_dir (str): directory to store features feature_dir (_type_): directory to store one type of features (mel, energy, ...) item (str): uid feature (tensor): feature tensor overrides (bool, optional): whether to override existing files. Defaults to True. """ process_dir = os.path.join(process_dir, feature_dir) os.makedirs(process_dir, exist_ok=True) out_path = os.path.join(process_dir, item + ".npy") if os.path.exists(out_path): if overrides: np.save(out_path, feature) else: np.save(out_path, feature) def save_txt(process_dir, feature_dir, item, feature, overrides=True): process_dir = os.path.join(process_dir, feature_dir) os.makedirs(process_dir, exist_ok=True) out_path = os.path.join(process_dir, item + ".txt") if os.path.exists(out_path): if overrides: f = open(out_path, "w") f.writelines(feature) f.close() else: f = open(out_path, "w") f.writelines(feature) f.close() def save_audio(path, waveform, fs, add_silence=False, turn_up=False, volume_peak=0.9): if turn_up: # continue to turn up to volume_peak ratio = volume_peak / max(waveform.max(), abs(waveform.min())) waveform = waveform * ratio if add_silence: silence_len = fs // 20 silence = np.zeros((silence_len,), dtype=waveform.dtype) result = np.concatenate([silence, waveform, silence]) waveform = result waveform = torch.as_tensor(waveform, dtype=torch.float32, device="cpu") if len(waveform.size()) == 1: waveform = waveform[None, :] elif waveform.size(0) != 1: # Stereo to mono waveform = torch.mean(waveform, dim=0, keepdim=True) torchaudio.save(path, waveform, fs, encoding="PCM_S", bits_per_sample=16) async def async_load_audio(path, sample_rate: int = 24000): r""" Args: path: The source loading path. sample_rate: The target sample rate, will automatically resample if necessary. Returns: waveform: The waveform object. Should be [1 x sequence_len]. """ async def use_torchaudio_load(path): return torchaudio.load(path) waveform, sr = await use_torchaudio_load(path) waveform = torch.mean(waveform, dim=0, keepdim=True) if sr != sample_rate: waveform = torchaudio.functional.resample(waveform, sr, sample_rate) if torch.any(torch.isnan(waveform) or torch.isinf(waveform)): raise ValueError("NaN or Inf found in waveform.") return waveform async def async_save_audio( path, waveform, sample_rate: int = 24000, add_silence: bool = False, volume_peak: float = 0.9, ): r""" Args: path: The target saving path. waveform: The waveform object. Should be [n_channel x sequence_len]. sample_rate: Sample rate. add_silence: If ``true``, concat 0.05s silence to beginning and end. volume_peak: Turn up volume for larger number, vice versa. """ async def use_torchaudio_save(path, waveform, sample_rate): torchaudio.save( path, waveform, sample_rate, encoding="PCM_S", bits_per_sample=16 ) waveform = torch.as_tensor(waveform, device="cpu", dtype=torch.float32) shape = waveform.size()[:-1] ratio = abs(volume_peak) / max(waveform.max(), abs(waveform.min())) waveform = waveform * ratio if add_silence: silence_len = sample_rate // 20 silence = torch.zeros((*shape, silence_len), dtype=waveform.type()) waveform = torch.concatenate((silence, waveform, silence), dim=-1) if waveform.dim() == 1: waveform = waveform[None] await use_torchaudio_save(path, waveform, sample_rate) def load_mel_extrema(cfg, dataset_name, split): dataset_dir = os.path.join( cfg.OUTPUT_PATH, "preprocess/{}_version".format(cfg.data.process_version), dataset_name, ) min_file = os.path.join( dataset_dir, "mel_min_max", split.split("_")[-1], "mel_min.npy", ) max_file = os.path.join( dataset_dir, "mel_min_max", split.split("_")[-1], "mel_max.npy", ) mel_min = np.load(min_file) mel_max = np.load(max_file) return mel_min, mel_max