Spaces:
Running
Running
# Copyright (c) 2023 Amphion. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import torch | |
import random | |
import numpy as np | |
from torch.nn import functional as F | |
from torch.nn.utils.rnn import pad_sequence | |
from utils.data_utils import * | |
from models.vocoders.vocoder_dataset import VocoderDataset | |
class DiffusionVocoderDataset(VocoderDataset): | |
def __init__(self, cfg, dataset, is_valid=False): | |
""" | |
Args: | |
cfg: config | |
dataset: dataset name | |
is_valid: whether to use train or valid dataset | |
""" | |
super().__init__(cfg, dataset, is_valid) | |
eval_index = random.randint(0, len(self.metadata) - 1) | |
eval_utt_info = self.metadata[eval_index] | |
eval_utt = "{}_{}".format(eval_utt_info["Dataset"], eval_utt_info["Uid"]) | |
self.eval_audio = np.load(self.utt2audio_path[eval_utt]) | |
if cfg.preprocess.use_mel: | |
self.eval_mel = np.load(self.utt2mel_path[eval_utt]) | |
if cfg.preprocess.use_frame_pitch: | |
self.eval_pitch = np.load(self.utt2frame_pitch_path[eval_utt]) | |
def __getitem__(self, index): | |
utt_info = self.metadata[index] | |
dataset = utt_info["Dataset"] | |
uid = utt_info["Uid"] | |
utt = "{}_{}".format(dataset, uid) | |
single_feature = dict() | |
if self.cfg.preprocess.use_mel: | |
mel = np.load(self.utt2mel_path[utt]) | |
assert mel.shape[0] == self.cfg.preprocess.n_mel | |
if "target_len" not in single_feature.keys(): | |
single_feature["target_len"] = mel.shape[1] | |
if single_feature["target_len"] <= self.cfg.preprocess.cut_mel_frame: | |
mel = np.pad( | |
mel, | |
((0, 0), (0, self.cfg.preprocess.cut_mel_frame - mel.shape[-1])), | |
mode="constant", | |
) | |
else: | |
if "start" not in single_feature.keys(): | |
start = random.randint( | |
0, mel.shape[-1] - self.cfg.preprocess.cut_mel_frame | |
) | |
end = start + self.cfg.preprocess.cut_mel_frame | |
single_feature["start"] = start | |
single_feature["end"] = end | |
mel = mel[:, single_feature["start"] : single_feature["end"]] | |
single_feature["mel"] = mel | |
if self.cfg.preprocess.use_frame_pitch: | |
frame_pitch = np.load(self.utt2frame_pitch_path[utt]) | |
if "target_len" not in single_feature.keys(): | |
single_feature["target_len"] = len(frame_pitch) | |
aligned_frame_pitch = align_length( | |
frame_pitch, single_feature["target_len"] | |
) | |
if single_feature["target_len"] <= self.cfg.preprocess.cut_mel_frame: | |
aligned_frame_pitch = np.pad( | |
aligned_frame_pitch, | |
( | |
( | |
0, | |
self.cfg.preprocess.cut_mel_frame | |
* self.cfg.preprocess.hop_size | |
- audio.shape[-1], | |
) | |
), | |
mode="constant", | |
) | |
else: | |
if "start" not in single_feature.keys(): | |
start = random.randint( | |
0, | |
aligned_frame_pitch.shape[-1] | |
- self.cfg.preprocess.cut_mel_frame, | |
) | |
end = start + self.cfg.preprocess.cut_mel_frame | |
single_feature["start"] = start | |
single_feature["end"] = end | |
aligned_frame_pitch = aligned_frame_pitch[ | |
single_feature["start"] : single_feature["end"] | |
] | |
single_feature["frame_pitch"] = aligned_frame_pitch | |
if self.cfg.preprocess.use_audio: | |
audio = np.load(self.utt2audio_path[utt]) | |
assert "target_len" in single_feature.keys() | |
if ( | |
audio.shape[-1] | |
<= self.cfg.preprocess.cut_mel_frame * self.cfg.preprocess.hop_size | |
): | |
audio = np.pad( | |
audio, | |
( | |
( | |
0, | |
self.cfg.preprocess.cut_mel_frame | |
* self.cfg.preprocess.hop_size | |
- audio.shape[-1], | |
) | |
), | |
mode="constant", | |
) | |
else: | |
if "start" not in single_feature.keys(): | |
audio = audio[ | |
0 : self.cfg.preprocess.cut_mel_frame | |
* self.cfg.preprocess.hop_size | |
] | |
else: | |
audio = audio[ | |
single_feature["start"] | |
* self.cfg.preprocess.hop_size : single_feature["end"] | |
* self.cfg.preprocess.hop_size, | |
] | |
single_feature["audio"] = audio | |
return single_feature | |
class DiffusionVocoderCollator(object): | |
"""Zero-pads model inputs and targets based on number of frames per step""" | |
def __init__(self, cfg): | |
self.cfg = cfg | |
def __call__(self, batch): | |
packed_batch_features = dict() | |
# mel: [b, n_mels, frame] | |
# frame_pitch: [b, frame] | |
# audios: [b, frame * hop_size] | |
for key in batch[0].keys(): | |
if key in ["target_len", "start", "end"]: | |
continue | |
else: | |
values = [torch.from_numpy(b[key]) for b in batch] | |
packed_batch_features[key] = pad_sequence( | |
values, batch_first=True, padding_value=0 | |
) | |
return packed_batch_features | |