Spaces:
Running
on
A10G
Running
on
A10G
# Copyright (c) 2023 Amphion. | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
from pathlib import Path | |
from typing import List, Tuple | |
import os | |
import numpy as np | |
import torch | |
from text.symbol_table import SymbolTable | |
from text import text_to_sequence | |
''' | |
TextToken: map text to id | |
''' | |
# TextTokenCollator is modified from | |
# https://github.com/lifeiteng/vall-e/blob/9c69096d603ce13174fb5cb025f185e2e9b36ac7/valle/data/collation.py | |
class TextTokenCollator: | |
def __init__( | |
self, | |
text_tokens: List[str], | |
add_eos: bool = True, | |
add_bos: bool = True, | |
pad_symbol: str = "<pad>", | |
bos_symbol: str = "<bos>", | |
eos_symbol: str = "<eos>", | |
): | |
self.pad_symbol = pad_symbol | |
self.add_eos = add_eos | |
self.add_bos = add_bos | |
self.bos_symbol = bos_symbol | |
self.eos_symbol = eos_symbol | |
unique_tokens = [pad_symbol] | |
if add_bos: | |
unique_tokens.append(bos_symbol) | |
if add_eos: | |
unique_tokens.append(eos_symbol) | |
unique_tokens.extend(sorted(text_tokens)) | |
self.token2idx = {token: idx for idx, token in enumerate(unique_tokens)} | |
self.idx2token = unique_tokens | |
def index( | |
self, tokens_list: List[str] | |
) -> Tuple[torch.Tensor, torch.Tensor]: | |
seqs, seq_lens = [], [] | |
for tokens in tokens_list: | |
assert ( | |
all([True if s in self.token2idx else False for s in tokens]) | |
is True | |
) | |
seq = ( | |
([self.bos_symbol] if self.add_bos else []) | |
+ list(tokens) | |
+ ([self.eos_symbol] if self.add_eos else []) | |
) | |
seqs.append(seq) | |
seq_lens.append(len(seq)) | |
max_len = max(seq_lens) | |
for k, (seq, seq_len) in enumerate(zip(seqs, seq_lens)): | |
seq.extend([self.pad_symbol] * (max_len - seq_len)) | |
tokens = torch.from_numpy( | |
np.array( | |
[[self.token2idx[token] for token in seq] for seq in seqs], | |
dtype=np.int64, | |
) | |
) | |
tokens_lens = torch.IntTensor(seq_lens) | |
return tokens, tokens_lens | |
def __call__(self, text): | |
tokens_seq = [p for p in text] | |
seq = ( | |
([self.bos_symbol] if self.add_bos else []) | |
+ tokens_seq | |
+ ([self.eos_symbol] if self.add_eos else []) | |
) | |
token_ids = [self.token2idx[token] for token in seq] | |
token_lens = len(tokens_seq) + self.add_eos + self.add_bos | |
return token_ids, token_lens | |
def get_text_token_collater(text_tokens_file: str) -> TextTokenCollator: | |
text_tokens_path = Path(text_tokens_file) | |
unique_tokens = SymbolTable.from_file(text_tokens_path) | |
collater = TextTokenCollator( | |
unique_tokens.symbols, add_bos=True, add_eos=True | |
) | |
token2idx = collater.token2idx | |
return collater, token2idx | |
class phoneIDCollation: | |
def __init__(self, cfg, dataset=None, symbols_dict_file=None) -> None: | |
if cfg.preprocess.phone_extractor != 'lexicon': | |
### get text token collator | |
if symbols_dict_file is None: | |
assert dataset is not None | |
symbols_dict_file = os.path.join( | |
cfg.preprocess.processed_dir, | |
dataset, | |
cfg.preprocess.symbols_dict | |
) | |
self.text_token_colloator, token2idx = get_text_token_collater(symbols_dict_file) | |
# # unique_tokens = SymbolTable.from_file(symbols_dict_path) | |
# # text_tokenizer = TextToken(unique_tokens.symbols, add_bos=True, add_eos=True) | |
# # update phone symbols dict file with pad_symbol or optional tokens (add_bos and add_eos) in TextTokenCollator | |
# phone_symbol_dict = SymbolTable() | |
# for s in sorted(list(set(token2idx.keys()))): | |
# phone_symbol_dict.add(s) | |
# phone_symbol_dict.to_file(symbols_dict_file) | |
def get_phone_id_sequence(self, cfg, phones_seq): | |
if cfg.preprocess.phone_extractor == 'lexicon': | |
phones_seq = ' '.join(phones_seq) | |
sequence = text_to_sequence(phones_seq, cfg.preprocess.text_cleaners) | |
else: | |
sequence, seq_len = self.text_token_colloator(phones_seq) | |
return sequence | |