|
import numpy as np |
|
import iSparrow.preprocessor_base as ppb |
|
|
|
|
|
class Preprocessor(ppb.PreprocessorBase): |
|
|
|
def __init__( |
|
self, |
|
sample_rate: int = 32000, |
|
sample_secs: float = 5.0, |
|
resample_type: str = "kaiser_fast", |
|
**kwargs |
|
): |
|
|
|
super().__init__( |
|
"google_perch_lite", |
|
sample_rate=sample_rate, |
|
sample_secs=sample_secs, |
|
resample_type=resample_type, |
|
**kwargs |
|
) |
|
|
|
def process_audio_data(self, rawdata: np.array) -> np.array: |
|
|
|
|
|
if self.actual_sampling_rate != self.sample_rate: |
|
raise RuntimeError( |
|
"Sampling rate is not the desired one. Desired sampling rate: {self.sample_rate}, actual sampling rate: {self.actual_sampling_rate}" |
|
) |
|
|
|
seconds = self.sample_secs |
|
minlen = 1.5 |
|
|
|
self.chunks = [] |
|
|
|
for i in range( |
|
0, len(rawdata), int((seconds - self.overlap) * self.sample_rate) |
|
): |
|
|
|
split = rawdata[i : (i + int(seconds * self.actual_sampling_rate))] |
|
|
|
|
|
if len(split) < int(minlen * self.actual_sampling_rate): |
|
break |
|
|
|
|
|
if len(split) < int(self.actual_sampling_rate * seconds): |
|
temp = np.zeros((int(self.actual_sampling_rate * seconds))) |
|
temp[: len(split)] = split |
|
split = temp |
|
|
|
self.chunks.append(split) |
|
|
|
print( |
|
"process audio data google: complete, read ", |
|
str(len(self.chunks)), |
|
"chunks.", |
|
flush=True, |
|
) |
|
|
|
return self.chunks |
|
|
|
@classmethod |
|
def from_cfg(cls, cfg: dict): |
|
|
|
|
|
allowed = [ |
|
"sample_rate", |
|
"sample_secs", |
|
"resample_type", |
|
"duration", |
|
"actual_sampling_rate", |
|
] |
|
|
|
if len([key for key in cfg if key not in allowed]) > 0: |
|
raise RuntimeError("Erroneous keyword arguments in preprocessor config") |
|
|
|
return cls(**cfg) |
|
|