"""Functionality for filtering signals.""" import numpy as np from scipy.signal import bilinear, firwin, kaiserord, lfilter MIC2CENTER = 3 SOUND_SPEED = 340 FILTER_TRANSITION_WIDTH_HZ = 250.0 FILTER_RIPPLE_DB = 60.0 class NonCoincidentMicsCorrection: """Class for correct frequency response in Ambisonics B-format representation.""" def __init__( self, sample_rate: int, mic2center: float = MIC2CENTER, sound_speed: float = SOUND_SPEED, ) -> None: self.sample_rate = sample_rate self.mic2center = mic2center / 100 self.sound_speed = sound_speed self.delay2center = self.mic2center / self.sound_speed def _filter( # pylint: disable=invalid-name self, b: np.ndarray, a: np.ndarray, array: np.ndarray, ) -> np.ndarray: """Applies filter to array given numerator "b" and denominator "a" from analog filter frequency response Parameters ---------- b : np.ndarray Array containing numerator's coefficients a : np.ndarray Array containing denominator's coefficients array : np.ndarray Array to be filtered Returns ------- np.ndarray Filtered array """ # Analog to digital filter conversion zeros, poles = bilinear(b, a, self.sample_rate) # Filtering array_filtered = lfilter(zeros, poles, array) return array_filtered def correct_axis(self, axis_signal: np.ndarray) -> np.ndarray: """Applies correction filter to axis array signal Parameters ---------- axis_signal : np.ndarray Array containing axis signal Returns ------- np.ndarray Axis array signal corrected """ # Filter equations # pylint: disable=invalid-name b = np.sqrt(6) * np.array( [1, 1j * (1 / 3) * self.mic2center, -(1 / 3) * self.delay2center**2] ) # pylint: disable=invalid-name a = np.array([1, 1j * (1 / 3) * self.delay2center]) axis_corrected = self._filter(b, a, axis_signal) return axis_corrected def correct_omni(self, omni_signal: np.ndarray) -> np.ndarray: """Applies correction filter to omnidirectional array signal Parameters ---------- omni_signal : np.ndarray Array containing omnidirectional signal Returns ------- np.ndarray Omnidirectional array signal corrected """ # Filter equations # pylint: disable=invalid-name b = np.array([1, 1j * self.delay2center, -(1 / 3) * self.delay2center**2]) # pylint: disable=invalid-name a = np.array([1, 1j * (1 / 3) * self.delay2center]) omni_corrected = self._filter(b, a, omni_signal) return omni_corrected def apply_low_pass_filter( signal: np.ndarray, cutoff_frequency: int, sample_rate: int ) -> np.ndarray: """Filter a signal at the given cutoff with an optimized number of taps (order of the filter). Args: signal (np.ndarray): signal to filter. cutoff_frequency (int): cutoff frequency. sample_rate (int): sample rate of the signal. Returns: np.ndarray: filtered signal. """ nyquist_rate = sample_rate / 2.0 # Compute FIR filter parameters and apply to signal. transition_width_normalized = FILTER_TRANSITION_WIDTH_HZ / nyquist_rate filter_length, filter_beta = kaiserord( FILTER_RIPPLE_DB, transition_width_normalized ) filter_coefficients = firwin( filter_length, cutoff_frequency / nyquist_rate, window=("kaiser", filter_beta) ) return lfilter(filter_coefficients, 1.0, signal) def moving_average_filter(array: np.ndarray, window_size: int) -> np.ndarray: """_summary_ Parameters ---------- array : np.ndarray _description_ window_size : int _description_ Returns ------- np.ndarray _description_ """ window = np.ones(window_size) / window_size return np.convolve(array, window, mode="valid")