Spaces:
Sleeping
Sleeping
File size: 4,235 Bytes
f7fb447 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
"""Functionality for filtering signals."""
import numpy as np
from scipy.signal import bilinear, firwin, kaiserord, lfilter
MIC2CENTER = 3
SOUND_SPEED = 340
FILTER_TRANSITION_WIDTH_HZ = 250.0
FILTER_RIPPLE_DB = 60.0
class NonCoincidentMicsCorrection:
"""Class for correct frequency response in Ambisonics B-format representation."""
def __init__(
self,
sample_rate: int,
mic2center: float = MIC2CENTER,
sound_speed: float = SOUND_SPEED,
) -> None:
self.sample_rate = sample_rate
self.mic2center = mic2center / 100
self.sound_speed = sound_speed
self.delay2center = self.mic2center / self.sound_speed
def _filter(
# pylint: disable=invalid-name
self,
b: np.ndarray,
a: np.ndarray,
array: np.ndarray,
) -> np.ndarray:
"""Applies filter to array given numerator "b" and denominator "a" from
analog filter frequency response
Parameters
----------
b : np.ndarray
Array containing numerator's coefficients
a : np.ndarray
Array containing denominator's coefficients
array : np.ndarray
Array to be filtered
Returns
-------
np.ndarray
Filtered array
"""
# Analog to digital filter conversion
zeros, poles = bilinear(b, a, self.sample_rate)
# Filtering
array_filtered = lfilter(zeros, poles, array)
return array_filtered
def correct_axis(self, axis_signal: np.ndarray) -> np.ndarray:
"""Applies correction filter to axis array signal
Parameters
----------
axis_signal : np.ndarray
Array containing axis signal
Returns
-------
np.ndarray
Axis array signal corrected
"""
# Filter equations
# pylint: disable=invalid-name
b = np.sqrt(6) * np.array(
[1, 1j * (1 / 3) * self.mic2center, -(1 / 3) * self.delay2center**2]
)
# pylint: disable=invalid-name
a = np.array([1, 1j * (1 / 3) * self.delay2center])
axis_corrected = self._filter(b, a, axis_signal)
return axis_corrected
def correct_omni(self, omni_signal: np.ndarray) -> np.ndarray:
"""Applies correction filter to omnidirectional array signal
Parameters
----------
omni_signal : np.ndarray
Array containing omnidirectional signal
Returns
-------
np.ndarray
Omnidirectional array signal corrected
"""
# Filter equations
# pylint: disable=invalid-name
b = np.array([1, 1j * self.delay2center, -(1 / 3) * self.delay2center**2])
# pylint: disable=invalid-name
a = np.array([1, 1j * (1 / 3) * self.delay2center])
omni_corrected = self._filter(b, a, omni_signal)
return omni_corrected
def apply_low_pass_filter(
signal: np.ndarray, cutoff_frequency: int, sample_rate: int
) -> np.ndarray:
"""Filter a signal at the given cutoff with an optimized number of taps
(order of the filter).
Args:
signal (np.ndarray): signal to filter.
cutoff_frequency (int): cutoff frequency.
sample_rate (int): sample rate of the signal.
Returns:
np.ndarray: filtered signal.
"""
nyquist_rate = sample_rate / 2.0
# Compute FIR filter parameters and apply to signal.
transition_width_normalized = FILTER_TRANSITION_WIDTH_HZ / nyquist_rate
filter_length, filter_beta = kaiserord(
FILTER_RIPPLE_DB, transition_width_normalized
)
filter_coefficients = firwin(
filter_length, cutoff_frequency / nyquist_rate, window=("kaiser", filter_beta)
)
return lfilter(filter_coefficients, 1.0, signal)
def moving_average_filter(array: np.ndarray, window_size: int) -> np.ndarray:
"""_summary_
Parameters
----------
array : np.ndarray
_description_
window_size : int
_description_
Returns
-------
np.ndarray
_description_
"""
window = np.ones(window_size) / window_size
return np.convolve(array, window, mode="valid")
|