|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from pathlib import Path |
|
|
|
|
|
import k2 |
|
|
import sentencepiece as spm |
|
|
import torch |
|
|
|
|
|
from icefall.lexicon import Lexicon |
|
|
from icefall.utils import parse_bpe_timestamps_and_texts, parse_timestamps_and_texts |
|
|
|
|
|
ICEFALL_DIR = Path(__file__).resolve().parent.parent |
|
|
|
|
|
|
|
|
def test_parse_bpe_timestamps_and_texts(): |
|
|
lang_dir = ICEFALL_DIR / "egs/librispeech/ASR/data/lang_bpe_500" |
|
|
if not lang_dir.is_dir(): |
|
|
print(f"{lang_dir} does not exist.") |
|
|
return |
|
|
|
|
|
sp = spm.SentencePieceProcessor() |
|
|
sp.load(str(lang_dir / "bpe.model")) |
|
|
|
|
|
text_1 = "HELLO WORLD" |
|
|
token_ids_1 = sp.encode(text_1, out_type=int) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
labels_1 = ( |
|
|
token_ids_1[0:1] * 2 |
|
|
+ token_ids_1[1:3] |
|
|
+ [0] * 2 |
|
|
+ token_ids_1[3:4] * 3 |
|
|
+ [0] * 2 |
|
|
) |
|
|
|
|
|
aux_labels_1 = ( |
|
|
token_ids_1[0:1] |
|
|
+ [0] |
|
|
+ token_ids_1[1:3] |
|
|
+ [0] * 2 |
|
|
+ token_ids_1[3:4] |
|
|
+ [0] * 4 |
|
|
+ [-1] |
|
|
) |
|
|
fsa_1 = k2.linear_fsa(labels_1) |
|
|
fsa_1.aux_labels = torch.tensor(aux_labels_1).to(torch.int32) |
|
|
|
|
|
text_2 = "SAY GOODBYE" |
|
|
token_ids_2 = sp.encode(text_2, out_type=int) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
labels_2 = ( |
|
|
token_ids_2[0:1] + [0] * 2 + token_ids_2[1:2] * 2 + token_ids_2[2:5] + [0] * 2 |
|
|
) |
|
|
|
|
|
aux_labels_2 = ( |
|
|
token_ids_2[0:1] |
|
|
+ [0] * 2 |
|
|
+ token_ids_2[1:2] |
|
|
+ [0] |
|
|
+ token_ids_2[2:5] |
|
|
+ [0] * 2 |
|
|
+ [-1] |
|
|
) |
|
|
fsa_2 = k2.linear_fsa(labels_2) |
|
|
fsa_2.aux_labels = torch.tensor(aux_labels_2).to(torch.int32) |
|
|
|
|
|
fsa_vec = k2.create_fsa_vec([fsa_1, fsa_2]) |
|
|
|
|
|
utt_index_pairs, utt_words = parse_bpe_timestamps_and_texts(fsa_vec, sp) |
|
|
assert utt_index_pairs[0] == [(0, 3), (6, 8)], utt_index_pairs[0] |
|
|
assert utt_words[0] == ["HELLO", "WORLD"], utt_words[0] |
|
|
assert utt_index_pairs[1] == [(0, 0), (3, 7)], utt_index_pairs[1] |
|
|
assert utt_words[1] == ["SAY", "GOODBYE"], utt_words[1] |
|
|
|
|
|
|
|
|
def test_parse_timestamps_and_texts(): |
|
|
lang_dir = ICEFALL_DIR / "egs/librispeech/ASR/data/lang_bpe_500" |
|
|
if not lang_dir.is_dir(): |
|
|
print(f"{lang_dir} does not exist.") |
|
|
return |
|
|
|
|
|
lexicon = Lexicon(lang_dir) |
|
|
|
|
|
sp = spm.SentencePieceProcessor() |
|
|
sp.load(str(lang_dir / "bpe.model")) |
|
|
word_table = lexicon.word_table |
|
|
|
|
|
text_1 = "HELLO WORLD" |
|
|
token_ids_1 = sp.encode(text_1, out_type=int) |
|
|
|
|
|
|
|
|
word_ids_1 = [word_table[s] for s in text_1.split()] |
|
|
|
|
|
labels_1 = ( |
|
|
token_ids_1[0:1] * 2 |
|
|
+ token_ids_1[1:3] |
|
|
+ [0] * 2 |
|
|
+ token_ids_1[3:4] * 3 |
|
|
+ [0] * 2 |
|
|
) |
|
|
|
|
|
aux_labels_1 = [word_ids_1[0:1]] + [[]] * 5 + [word_ids_1[1:2]] + [[]] * 5 |
|
|
|
|
|
fsa_1 = k2.linear_fsa(labels_1) |
|
|
fsa_1.aux_labels = k2.RaggedTensor(aux_labels_1) |
|
|
|
|
|
text_2 = "SAY GOODBYE" |
|
|
token_ids_2 = sp.encode(text_2, out_type=int) |
|
|
|
|
|
|
|
|
word_ids_2 = [word_table[s] for s in text_2.split()] |
|
|
|
|
|
labels_2 = ( |
|
|
token_ids_2[0:1] + [0] * 2 + token_ids_2[1:2] * 2 + token_ids_2[2:5] + [0] * 2 |
|
|
) |
|
|
|
|
|
aux_labels_2 = [word_ids_2[0:1]] + [[]] * 2 + [word_ids_2[1:2]] + [[]] * 7 |
|
|
|
|
|
fsa_2 = k2.linear_fsa(labels_2) |
|
|
fsa_2.aux_labels = k2.RaggedTensor(aux_labels_2) |
|
|
|
|
|
fsa_vec = k2.create_fsa_vec([fsa_1, fsa_2]) |
|
|
|
|
|
utt_index_pairs, utt_words = parse_timestamps_and_texts(fsa_vec, word_table) |
|
|
assert utt_index_pairs[0] == [(0, 3), (6, 8)], utt_index_pairs[0] |
|
|
assert utt_words[0] == ["HELLO", "WORLD"], utt_words[0] |
|
|
assert utt_index_pairs[1] == [(0, 0), (3, 7)], utt_index_pairs[1] |
|
|
assert utt_words[1] == ["SAY", "GOODBYE"], utt_words[1] |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
test_parse_bpe_timestamps_and_texts() |
|
|
test_parse_timestamps_and_texts() |
|
|
|