yuancwang
init
b725c5a
raw
history blame
No virus
6.96 kB
# Copyright (c) 2023 Amphion.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import torch
from tqdm import tqdm
from collections import OrderedDict
from models.tts.base.tts_inferece import TTSInference
from models.tts.fastspeech2.fs2_dataset import FS2TestDataset, FS2TestCollator
from utils.util import load_config
from utils.io import save_audio
from models.tts.fastspeech2.fs2 import FastSpeech2
from models.vocoders.vocoder_inference import synthesis
from pathlib import Path
from processors.phone_extractor import phoneExtractor
from text.text_token_collation import phoneIDCollation
import numpy as np
import json
class FastSpeech2Inference(TTSInference):
def __init__(self, args, cfg):
TTSInference.__init__(self, args, cfg)
self.args = args
self.cfg = cfg
self.infer_type = args.mode
def _build_model(self):
self.model = FastSpeech2(self.cfg)
return self.model
def load_model(self, state_dict):
raw_dict = state_dict["model"]
clean_dict = OrderedDict()
for k, v in raw_dict.items():
if k.startswith("module."):
clean_dict[k[7:]] = v
else:
clean_dict[k] = v
self.model.load_state_dict(clean_dict)
def _build_test_dataset(self):
return FS2TestDataset, FS2TestCollator
@staticmethod
def _parse_vocoder(vocoder_dir):
r"""Parse vocoder config"""
vocoder_dir = os.path.abspath(vocoder_dir)
ckpt_list = [ckpt for ckpt in Path(vocoder_dir).glob("*.pt")]
# last step (different from the base *int(x.stem)*)
ckpt_list.sort(
key=lambda x: int(x.stem.split("_")[-2].split("-")[-1]), reverse=True
)
ckpt_path = str(ckpt_list[0])
vocoder_cfg = load_config(
os.path.join(vocoder_dir, "args.json"), lowercase=True
)
return vocoder_cfg, ckpt_path
@torch.inference_mode()
def inference_for_batches(self):
y_pred = []
for i, batch in tqdm(enumerate(self.test_dataloader)):
y_pred, mel_lens, _ = self._inference_each_batch(batch)
y_ls = y_pred.chunk(self.test_batch_size)
tgt_ls = mel_lens.chunk(self.test_batch_size)
j = 0
for it, l in zip(y_ls, tgt_ls):
l = l.item()
it = it.squeeze(0)[:l].detach().cpu()
uid = self.test_dataset.metadata[i * self.test_batch_size + j]["Uid"]
torch.save(it, os.path.join(self.args.output_dir, f"{uid}.pt"))
j += 1
vocoder_cfg, vocoder_ckpt = self._parse_vocoder(self.args.vocoder_dir)
res = synthesis(
cfg=vocoder_cfg,
vocoder_weight_file=vocoder_ckpt,
n_samples=None,
pred=[
torch.load(
os.path.join(self.args.output_dir, "{}.pt".format(item["Uid"]))
).numpy()
for item in self.test_dataset.metadata
],
)
for it, wav in zip(self.test_dataset.metadata, res):
uid = it["Uid"]
save_audio(
os.path.join(self.args.output_dir, f"{uid}.wav"),
wav.numpy(),
self.cfg.preprocess.sample_rate,
add_silence=True,
turn_up=True,
)
os.remove(os.path.join(self.args.output_dir, f"{uid}.pt"))
def _inference_each_batch(self, batch_data):
device = self.accelerator.device
control_values = (
self.args.pitch_control,
self.args.energy_control,
self.args.duration_control,
)
for k, v in batch_data.items():
batch_data[k] = v.to(device)
pitch_control, energy_control, duration_control = control_values
output = self.model(
batch_data,
p_control=pitch_control,
e_control=energy_control,
d_control=duration_control,
)
pred_res = output["postnet_output"]
mel_lens = output["mel_lens"].cpu()
return pred_res, mel_lens, 0
def inference_for_single_utterance(self):
text = self.args.text
control_values = (
self.args.pitch_control,
self.args.energy_control,
self.args.duration_control,
)
pitch_control, energy_control, duration_control = control_values
# get phone symbol file
phone_symbol_file = None
if self.cfg.preprocess.phone_extractor != "lexicon":
phone_symbol_file = os.path.join(
self.exp_dir, self.cfg.preprocess.symbols_dict
)
assert os.path.exists(phone_symbol_file)
# convert text to phone sequence
phone_extractor = phoneExtractor(self.cfg)
phone_seq = phone_extractor.extract_phone(text) # phone_seq: list
# convert phone sequence to phone id sequence
phon_id_collator = phoneIDCollation(
self.cfg, symbols_dict_file=phone_symbol_file
)
phone_seq = ["{"] + phone_seq + ["}"]
phone_id_seq = phon_id_collator.get_phone_id_sequence(self.cfg, phone_seq)
# convert phone sequence to phone id sequence
phone_id_seq = np.array(phone_id_seq)
phone_id_seq = torch.from_numpy(phone_id_seq)
# get speaker id if multi-speaker training and use speaker id
speaker_id = None
if self.cfg.preprocess.use_spkid and self.cfg.train.multi_speaker_training:
spk2id_file = os.path.join(self.exp_dir, self.cfg.preprocess.spk2id)
with open(spk2id_file, "r") as f:
spk2id = json.load(f)
speaker_id = spk2id[self.args.speaker_name]
speaker_id = torch.from_numpy(np.array([speaker_id], dtype=np.int32))
else:
speaker_id = torch.Tensor(0).view(-1)
with torch.no_grad():
x_tst = phone_id_seq.to(self.device).unsqueeze(0)
x_tst_lengths = torch.LongTensor([phone_id_seq.size(0)]).to(self.device)
if speaker_id is not None:
speaker_id = speaker_id.to(self.device)
data = {}
data["texts"] = x_tst
data["text_len"] = x_tst_lengths
data["spk_id"] = speaker_id
output = self.model(
data,
p_control=pitch_control,
e_control=energy_control,
d_control=duration_control,
)
pred_res = output["postnet_output"]
vocoder_cfg, vocoder_ckpt = self._parse_vocoder(self.args.vocoder_dir)
audio = synthesis(
cfg=vocoder_cfg,
vocoder_weight_file=vocoder_ckpt,
n_samples=None,
pred=pred_res,
)
return audio[0]