|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import itertools |
|
|
import os |
|
|
import random |
|
|
import tempfile |
|
|
import unittest |
|
|
|
|
|
import numpy as np |
|
|
from datasets import Audio, load_dataset |
|
|
|
|
|
from transformers import ClvpFeatureExtractor |
|
|
from transformers.testing_utils import ( |
|
|
check_json_file_has_correct_format, |
|
|
cleanup, |
|
|
require_torch, |
|
|
slow, |
|
|
torch_device, |
|
|
) |
|
|
from transformers.utils.import_utils import is_torch_available |
|
|
|
|
|
from ...test_sequence_feature_extraction_common import SequenceFeatureExtractionTestMixin |
|
|
|
|
|
|
|
|
if is_torch_available(): |
|
|
import torch |
|
|
|
|
|
global_rng = random.Random() |
|
|
|
|
|
|
|
|
|
|
|
def floats_list(shape, scale=1.0, rng=None, name=None): |
|
|
"""Creates a random float32 tensor""" |
|
|
if rng is None: |
|
|
rng = global_rng |
|
|
|
|
|
values = [] |
|
|
for batch_idx in range(shape[0]): |
|
|
values.append([]) |
|
|
for _ in range(shape[1]): |
|
|
values[-1].append(rng.random() * scale) |
|
|
|
|
|
return values |
|
|
|
|
|
|
|
|
@require_torch |
|
|
class ClvpFeatureExtractionTester: |
|
|
def __init__( |
|
|
self, |
|
|
parent, |
|
|
batch_size=7, |
|
|
min_seq_length=400, |
|
|
max_seq_length=2000, |
|
|
feature_size=10, |
|
|
hop_length=160, |
|
|
chunk_length=8, |
|
|
padding_value=0.0, |
|
|
sampling_rate=4_000, |
|
|
return_attention_mask=False, |
|
|
): |
|
|
self.parent = parent |
|
|
self.batch_size = batch_size |
|
|
self.min_seq_length = min_seq_length |
|
|
self.max_seq_length = max_seq_length |
|
|
self.seq_length_diff = (self.max_seq_length - self.min_seq_length) // (self.batch_size - 1) |
|
|
self.padding_value = padding_value |
|
|
self.sampling_rate = sampling_rate |
|
|
self.return_attention_mask = return_attention_mask |
|
|
self.feature_size = feature_size |
|
|
self.chunk_length = chunk_length |
|
|
self.hop_length = hop_length |
|
|
|
|
|
def prepare_feat_extract_dict(self): |
|
|
return { |
|
|
"feature_size": self.feature_size, |
|
|
"hop_length": self.hop_length, |
|
|
"chunk_length": self.chunk_length, |
|
|
"padding_value": self.padding_value, |
|
|
"sampling_rate": self.sampling_rate, |
|
|
"return_attention_mask": self.return_attention_mask, |
|
|
} |
|
|
|
|
|
|
|
|
def prepare_inputs_for_common(self, equal_length=False, numpify=False): |
|
|
def _flatten(list_of_lists): |
|
|
return list(itertools.chain(*list_of_lists)) |
|
|
|
|
|
if equal_length: |
|
|
speech_inputs = [floats_list((self.max_seq_length, self.feature_size)) for _ in range(self.batch_size)] |
|
|
else: |
|
|
|
|
|
speech_inputs = [ |
|
|
floats_list((x, self.feature_size)) |
|
|
for x in range(self.min_seq_length, self.max_seq_length, self.seq_length_diff) |
|
|
] |
|
|
if numpify: |
|
|
speech_inputs = [np.asarray(x) for x in speech_inputs] |
|
|
return speech_inputs |
|
|
|
|
|
|
|
|
@require_torch |
|
|
class ClvpFeatureExtractionTest(SequenceFeatureExtractionTestMixin, unittest.TestCase): |
|
|
feature_extraction_class = ClvpFeatureExtractor |
|
|
|
|
|
def setUp(self): |
|
|
self.feat_extract_tester = ClvpFeatureExtractionTester(self) |
|
|
|
|
|
def tearDown(self): |
|
|
super().tearDown() |
|
|
|
|
|
cleanup(torch_device) |
|
|
|
|
|
|
|
|
def test_feat_extract_from_and_save_pretrained(self): |
|
|
feat_extract_first = self.feature_extraction_class(**self.feat_extract_dict) |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname: |
|
|
saved_file = feat_extract_first.save_pretrained(tmpdirname)[0] |
|
|
check_json_file_has_correct_format(saved_file) |
|
|
feat_extract_second = self.feature_extraction_class.from_pretrained(tmpdirname) |
|
|
|
|
|
dict_first = feat_extract_first.to_dict() |
|
|
dict_second = feat_extract_second.to_dict() |
|
|
mel_1 = feat_extract_first.mel_filters |
|
|
mel_2 = feat_extract_second.mel_filters |
|
|
self.assertTrue(np.allclose(mel_1, mel_2)) |
|
|
self.assertEqual(dict_first, dict_second) |
|
|
|
|
|
|
|
|
def test_feat_extract_to_json_file(self): |
|
|
feat_extract_first = self.feature_extraction_class(**self.feat_extract_dict) |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname: |
|
|
json_file_path = os.path.join(tmpdirname, "feat_extract.json") |
|
|
feat_extract_first.to_json_file(json_file_path) |
|
|
feat_extract_second = self.feature_extraction_class.from_json_file(json_file_path) |
|
|
|
|
|
dict_first = feat_extract_first.to_dict() |
|
|
dict_second = feat_extract_second.to_dict() |
|
|
mel_1 = feat_extract_first.mel_filters |
|
|
mel_2 = feat_extract_second.mel_filters |
|
|
self.assertTrue(np.allclose(mel_1, mel_2)) |
|
|
self.assertEqual(dict_first, dict_second) |
|
|
|
|
|
def test_call(self): |
|
|
|
|
|
feature_extractor = self.feature_extraction_class(**self.feat_extract_tester.prepare_feat_extract_dict()) |
|
|
|
|
|
speech_inputs = [floats_list((1, x))[0] for x in range(800, 1400, 200)] |
|
|
np_speech_inputs = [np.asarray(speech_input) for speech_input in speech_inputs] |
|
|
|
|
|
|
|
|
input_features = feature_extractor(np_speech_inputs, padding="max_length", return_tensors="np").input_features |
|
|
self.assertTrue(input_features.ndim == 3) |
|
|
self.assertTrue(input_features.shape[-2] == feature_extractor.feature_size) |
|
|
|
|
|
|
|
|
encoded_sequences_1 = feature_extractor(speech_inputs[0], return_tensors="np").input_features |
|
|
encoded_sequences_2 = feature_extractor(np_speech_inputs[0], return_tensors="np").input_features |
|
|
self.assertTrue(np.allclose(encoded_sequences_1, encoded_sequences_2, atol=1e-3)) |
|
|
|
|
|
|
|
|
encoded_sequences_1 = feature_extractor(speech_inputs, return_tensors="np").input_features |
|
|
encoded_sequences_2 = feature_extractor(np_speech_inputs, return_tensors="np").input_features |
|
|
for enc_seq_1, enc_seq_2 in zip(encoded_sequences_1, encoded_sequences_2): |
|
|
self.assertTrue(np.allclose(enc_seq_1, enc_seq_2, atol=1e-3)) |
|
|
|
|
|
|
|
|
speech_inputs = [floats_list((1, x))[0] for x in (800, 800, 800)] |
|
|
np_speech_inputs = np.asarray(speech_inputs) |
|
|
encoded_sequences_1 = feature_extractor(speech_inputs, return_tensors="np").input_features |
|
|
encoded_sequences_2 = feature_extractor(np_speech_inputs, return_tensors="np").input_features |
|
|
for enc_seq_1, enc_seq_2 in zip(encoded_sequences_1, encoded_sequences_2): |
|
|
self.assertTrue(np.allclose(enc_seq_1, enc_seq_2, atol=1e-3)) |
|
|
|
|
|
|
|
|
speech_inputs = [floats_list((1, x))[0] for x in range(200, (feature_extractor.n_samples + 500), 200)] |
|
|
np_speech_inputs = [np.asarray(speech_input) for speech_input in speech_inputs] |
|
|
|
|
|
speech_inputs_truncated = [x[: feature_extractor.n_samples] for x in speech_inputs] |
|
|
np_speech_inputs_truncated = [np.asarray(speech_input) for speech_input in speech_inputs_truncated] |
|
|
|
|
|
encoded_sequences_1 = feature_extractor(np_speech_inputs, return_tensors="np").input_features |
|
|
encoded_sequences_2 = feature_extractor(np_speech_inputs_truncated, return_tensors="np").input_features |
|
|
for enc_seq_1, enc_seq_2 in zip(encoded_sequences_1, encoded_sequences_2): |
|
|
self.assertTrue(np.allclose(enc_seq_1, enc_seq_2, atol=1e-3)) |
|
|
|
|
|
|
|
|
def test_double_precision_pad(self): |
|
|
import torch |
|
|
|
|
|
feature_extractor = self.feature_extraction_class(**self.feat_extract_tester.prepare_feat_extract_dict()) |
|
|
np_speech_inputs = np.random.rand(100, 32).astype(np.float64) |
|
|
py_speech_inputs = np_speech_inputs.tolist() |
|
|
|
|
|
for inputs in [py_speech_inputs, np_speech_inputs]: |
|
|
np_processed = feature_extractor.pad([{"input_features": inputs}], return_tensors="np") |
|
|
self.assertTrue(np_processed.input_features.dtype == np.float32) |
|
|
pt_processed = feature_extractor.pad([{"input_features": inputs}], return_tensors="pt") |
|
|
self.assertTrue(pt_processed.input_features.dtype == torch.float32) |
|
|
|
|
|
def _load_datasamples(self, num_samples): |
|
|
ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation") |
|
|
ds = ds.cast_column("audio", Audio(sampling_rate=22050)) |
|
|
|
|
|
speech_samples = ds.sort("id").select(range(num_samples))[:num_samples]["audio"] |
|
|
|
|
|
return [x["array"] for x in speech_samples], [x["sampling_rate"] for x in speech_samples] |
|
|
|
|
|
@slow |
|
|
def test_integration(self): |
|
|
|
|
|
EXPECTED_INPUT_FEATURES = torch.tensor( |
|
|
[ |
|
|
0.9271, 1.1405, 1.4419, 1.2470, 1.2438, 1.1787, 1.0595, 1.0570, 1.1070, |
|
|
1.2205, 1.2376, 1.2997, 1.1131, 1.0843, 1.0459, 1.1858, 1.2323, 1.3582, |
|
|
1.3401, 1.3770, 1.4173, 1.3381, 1.2291, 1.0854, 1.2116, 1.1873, 1.2178, |
|
|
1.2137, 1.3001, 1.4274 |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
input_speech, sr = self._load_datasamples(1) |
|
|
|
|
|
feature_extractor = ClvpFeatureExtractor.from_pretrained("susnato/clvp_dev") |
|
|
input_features = feature_extractor(input_speech, sampling_rate=sr[0], return_tensors="pt").input_features |
|
|
self.assertEqual(input_features.shape, (1, 80, 517)) |
|
|
torch.testing.assert_close(input_features[0, 0, :30], EXPECTED_INPUT_FEATURES, rtol=1e-4, atol=1e-4) |
|
|
|