ecapa-aam / feature_extraction_ecapa.py
yangwang825's picture
Create feature_extraction_ecapa.py
efdbdac
import numpy as np
from typing import List, Optional, Union
from transformers.feature_extraction_sequence_utils import SequenceFeatureExtractor
from transformers.feature_extraction_utils import BatchFeature
from transformers.utils import PaddingStrategy, TensorType, logging
logger = logging.get_logger(__name__)
class EcapaFeatureExtractor(SequenceFeatureExtractor):
model_input_names = ["input_values", "attention_mask"]
def __init__(
self,
feature_size=1,
sampling_rate=16000,
padding_value=0.0,
return_attention_mask=False,
do_normalize=True,
**kwargs,
):
super().__init__(feature_size=feature_size, sampling_rate=sampling_rate, padding_value=padding_value, **kwargs)
self.return_attention_mask = return_attention_mask
self.do_normalize = do_normalize
@staticmethod
def zero_mean_unit_var_norm(
input_values: List[np.ndarray], attention_mask: List[np.ndarray], padding_value: float = 0.0
) -> List[np.ndarray]:
"""
Every array in the list is normalized to have zero mean and unit variance
"""
if attention_mask is not None:
attention_mask = np.array(attention_mask, np.int32)
normed_input_values = []
for vector, length in zip(input_values, attention_mask.sum(-1)):
normed_slice = (vector - vector[:length].mean()) / np.sqrt(vector[:length].var() + 1e-7)
if length < normed_slice.shape[0]:
normed_slice[length:] = padding_value
normed_input_values.append(normed_slice)
else:
normed_input_values = [(x - x.mean()) / np.sqrt(x.var() + 1e-7) for x in input_values]
return normed_input_values
def __call__(
self,
raw_speech: Union[np.ndarray, List[float], List[np.ndarray], List[List[float]]],
padding: Union[bool, str, PaddingStrategy] = False,
max_length: Optional[int] = None,
truncation: bool = False,
pad_to_multiple_of: Optional[int] = None,
return_attention_mask: Optional[bool] = None,
return_tensors: Optional[Union[str, TensorType]] = None,
sampling_rate: Optional[int] = None,
**kwargs,
) -> BatchFeature:
if sampling_rate is not None:
if sampling_rate != self.sampling_rate:
raise ValueError(
f"The model corresponding to this feature extractor: {self} was trained using a sampling rate of"
f" {self.sampling_rate}. Please make sure that the provided `raw_speech` input was sampled with"
f" {self.sampling_rate} and not {sampling_rate}."
)
else:
logger.warning(
"It is strongly recommended to pass the ``sampling_rate`` argument to this function. "
"Failing to do so can result in silent errors that might be hard to debug."
)
is_batched_numpy = isinstance(raw_speech, np.ndarray) and len(raw_speech.shape) > 1
if is_batched_numpy and len(raw_speech.shape) > 2:
raise ValueError(f"Only mono-channel audio is supported for input to {self}")
is_batched = is_batched_numpy or (
isinstance(raw_speech, (list, tuple)) and (isinstance(raw_speech[0], (np.ndarray, tuple, list)))
)
# always return batch
if not is_batched:
raw_speech = [raw_speech]
# convert into correct format for padding
encoded_inputs = BatchFeature({"input_values": raw_speech})
padded_inputs = self.pad(
encoded_inputs,
padding=padding,
max_length=max_length,
truncation=truncation,
pad_to_multiple_of=pad_to_multiple_of,
return_attention_mask=return_attention_mask,
)
# convert input values to correct format
input_values = padded_inputs["input_values"]
if not isinstance(input_values[0], np.ndarray):
padded_inputs["input_values"] = [np.asarray(array, dtype=np.float32) for array in input_values]
elif (
not isinstance(input_values, np.ndarray)
and isinstance(input_values[0], np.ndarray)
and input_values[0].dtype is np.dtype(np.float64)
):
padded_inputs["input_values"] = [array.astype(np.float32) for array in input_values]
elif isinstance(input_values, np.ndarray) and input_values.dtype is np.dtype(np.float64):
padded_inputs["input_values"] = input_values.astype(np.float32)
# convert attention_mask to correct format
attention_mask = padded_inputs.get("attention_mask")
if attention_mask is not None:
padded_inputs["attention_mask"] = [np.asarray(array, dtype=np.int32) for array in attention_mask]
# zero-mean and unit-variance normalization
if self.do_normalize:
attention_mask = (
attention_mask
if self._get_padding_strategies(padding, max_length=max_length) is not PaddingStrategy.DO_NOT_PAD
else None
)
padded_inputs["input_values"] = self.zero_mean_unit_var_norm(
padded_inputs["input_values"], attention_mask=attention_mask, padding_value=self.padding_value
)
if return_tensors is not None:
padded_inputs = padded_inputs.convert_to_tensors(return_tensors)
return padded_inputs