import numpy as np from typing import List, Optional, Union from transformers.feature_extraction_sequence_utils import SequenceFeatureExtractor from transformers.feature_extraction_utils import BatchFeature from transformers.utils import PaddingStrategy, TensorType, logging logger = logging.get_logger(__name__) class SvectorFeatureExtractor(SequenceFeatureExtractor): model_input_names = ["input_values", "attention_mask"] def __init__( self, feature_size=1, sampling_rate=16000, padding_value=0.0, return_attention_mask=False, do_normalize=True, **kwargs, ): super().__init__(feature_size=feature_size, sampling_rate=sampling_rate, padding_value=padding_value, **kwargs) self.return_attention_mask = return_attention_mask self.do_normalize = do_normalize @staticmethod def zero_mean_unit_var_norm( input_values: List[np.ndarray], attention_mask: List[np.ndarray], padding_value: float = 0.0 ) -> List[np.ndarray]: """ Every array in the list is normalized to have zero mean and unit variance """ if attention_mask is not None: attention_mask = np.array(attention_mask, np.int32) normed_input_values = [] for vector, length in zip(input_values, attention_mask.sum(-1)): normed_slice = (vector - vector[:length].mean()) / np.sqrt(vector[:length].var() + 1e-7) if length < normed_slice.shape[0]: normed_slice[length:] = padding_value normed_input_values.append(normed_slice) else: normed_input_values = [(x - x.mean()) / np.sqrt(x.var() + 1e-7) for x in input_values] return normed_input_values def __call__( self, raw_speech: Union[np.ndarray, List[float], List[np.ndarray], List[List[float]]], padding: Union[bool, str, PaddingStrategy] = False, max_length: Optional[int] = None, truncation: bool = False, pad_to_multiple_of: Optional[int] = None, return_attention_mask: Optional[bool] = None, return_tensors: Optional[Union[str, TensorType]] = None, sampling_rate: Optional[int] = None, **kwargs, ) -> BatchFeature: if sampling_rate is not None: if sampling_rate != self.sampling_rate: raise ValueError( f"The model corresponding to this feature extractor: {self} was trained using a sampling rate of" f" {self.sampling_rate}. Please make sure that the provided `raw_speech` input was sampled with" f" {self.sampling_rate} and not {sampling_rate}." ) else: logger.warning( "It is strongly recommended to pass the ``sampling_rate`` argument to this function. " "Failing to do so can result in silent errors that might be hard to debug." ) is_batched_numpy = isinstance(raw_speech, np.ndarray) and len(raw_speech.shape) > 1 if is_batched_numpy and len(raw_speech.shape) > 2: raise ValueError(f"Only mono-channel audio is supported for input to {self}") is_batched = is_batched_numpy or ( isinstance(raw_speech, (list, tuple)) and (isinstance(raw_speech[0], (np.ndarray, tuple, list))) ) # always return batch if not is_batched: raw_speech = [raw_speech] # convert into correct format for padding encoded_inputs = BatchFeature({"input_values": raw_speech}) padded_inputs = self.pad( encoded_inputs, padding=padding, max_length=max_length, truncation=truncation, pad_to_multiple_of=pad_to_multiple_of, return_attention_mask=return_attention_mask, ) # convert input values to correct format input_values = padded_inputs["input_values"] if not isinstance(input_values[0], np.ndarray): padded_inputs["input_values"] = [np.asarray(array, dtype=np.float32) for array in input_values] elif ( not isinstance(input_values, np.ndarray) and isinstance(input_values[0], np.ndarray) and input_values[0].dtype is np.dtype(np.float64) ): padded_inputs["input_values"] = [array.astype(np.float32) for array in input_values] elif isinstance(input_values, np.ndarray) and input_values.dtype is np.dtype(np.float64): padded_inputs["input_values"] = input_values.astype(np.float32) # convert attention_mask to correct format attention_mask = padded_inputs.get("attention_mask") if attention_mask is not None: padded_inputs["attention_mask"] = [np.asarray(array, dtype=np.int32) for array in attention_mask] # zero-mean and unit-variance normalization if self.do_normalize: attention_mask = ( attention_mask if self._get_padding_strategies(padding, max_length=max_length) is not PaddingStrategy.DO_NOT_PAD else None ) padded_inputs["input_values"] = self.zero_mean_unit_var_norm( padded_inputs["input_values"], attention_mask=attention_mask, padding_value=self.padding_value ) if return_tensors is not None: padded_inputs = padded_inputs.convert_to_tensors(return_tensors) return padded_inputs