File size: 4,235 Bytes
f7fb447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""Functionality for filtering signals."""

import numpy as np
from scipy.signal import bilinear, firwin, kaiserord, lfilter

MIC2CENTER = 3
SOUND_SPEED = 340
FILTER_TRANSITION_WIDTH_HZ = 250.0
FILTER_RIPPLE_DB = 60.0


class NonCoincidentMicsCorrection:
    """Class for correct frequency response in Ambisonics B-format representation."""

    def __init__(
        self,
        sample_rate: int,
        mic2center: float = MIC2CENTER,
        sound_speed: float = SOUND_SPEED,
    ) -> None:
        self.sample_rate = sample_rate
        self.mic2center = mic2center / 100
        self.sound_speed = sound_speed
        self.delay2center = self.mic2center / self.sound_speed

    def _filter(
        # pylint: disable=invalid-name
        self,
        b: np.ndarray,
        a: np.ndarray,
        array: np.ndarray,
    ) -> np.ndarray:
        """Applies filter to array given numerator "b" and denominator "a" from
        analog filter frequency response

        Parameters
        ----------
        b : np.ndarray
            Array containing numerator's coefficients
        a : np.ndarray
            Array containing denominator's coefficients
        array : np.ndarray
            Array to be filtered

        Returns
        -------
        np.ndarray
            Filtered array
        """

        # Analog to digital filter conversion
        zeros, poles = bilinear(b, a, self.sample_rate)

        # Filtering
        array_filtered = lfilter(zeros, poles, array)

        return array_filtered

    def correct_axis(self, axis_signal: np.ndarray) -> np.ndarray:
        """Applies correction filter to axis array signal

        Parameters
        ----------
        axis_signal : np.ndarray
            Array containing axis signal

        Returns
        -------
        np.ndarray
            Axis array signal corrected
        """
        # Filter equations
        # pylint: disable=invalid-name
        b = np.sqrt(6) * np.array(
            [1, 1j * (1 / 3) * self.mic2center, -(1 / 3) * self.delay2center**2]
        )
        # pylint: disable=invalid-name
        a = np.array([1, 1j * (1 / 3) * self.delay2center])

        axis_corrected = self._filter(b, a, axis_signal)

        return axis_corrected

    def correct_omni(self, omni_signal: np.ndarray) -> np.ndarray:
        """Applies correction filter to omnidirectional array signal

        Parameters
        ----------
        omni_signal : np.ndarray
            Array containing omnidirectional signal

        Returns
        -------
        np.ndarray
            Omnidirectional array signal corrected
        """
        # Filter equations
        # pylint: disable=invalid-name
        b = np.array([1, 1j * self.delay2center, -(1 / 3) * self.delay2center**2])
        # pylint: disable=invalid-name
        a = np.array([1, 1j * (1 / 3) * self.delay2center])

        omni_corrected = self._filter(b, a, omni_signal)

        return omni_corrected


def apply_low_pass_filter(
    signal: np.ndarray, cutoff_frequency: int, sample_rate: int
) -> np.ndarray:
    """Filter a signal at the given cutoff with an optimized number of taps
    (order of the filter).

    Args:
        signal (np.ndarray): signal to filter.
        cutoff_frequency (int): cutoff frequency.
        sample_rate (int): sample rate of the signal.

    Returns:
        np.ndarray: filtered signal.
    """
    nyquist_rate = sample_rate / 2.0

    # Compute FIR filter parameters and apply to signal.
    transition_width_normalized = FILTER_TRANSITION_WIDTH_HZ / nyquist_rate
    filter_length, filter_beta = kaiserord(
        FILTER_RIPPLE_DB, transition_width_normalized
    )
    filter_coefficients = firwin(
        filter_length, cutoff_frequency / nyquist_rate, window=("kaiser", filter_beta)
    )

    return lfilter(filter_coefficients, 1.0, signal)


def moving_average_filter(array: np.ndarray, window_size: int) -> np.ndarray:
    """_summary_

    Parameters
    ----------
    array : np.ndarray
        _description_
    window_size : int
        _description_

    Returns
    -------
    np.ndarray
        _description_
    """
    window = np.ones(window_size) / window_size
    return np.convolve(array, window, mode="valid")