File size: 4,526 Bytes
f7fb447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""Input preprocessing module."""
from abc import ABC, abstractmethod
from enum import Enum

import numpy as np
from scipy.signal import fftconvolve

from aira.engine.filtering import NonCoincidentMicsCorrection
from aira.utils import convert_ambisonics_a_to_b


# pylint: disable=too-few-public-methods
class InputMode(Enum):
    """Enum class for accessing the existing `InputMode`s"""

    LSS = "lss"
    AFORMAT = "aformat"
    BFORMAT = "bformat"


# pylint: disable=too-few-public-methods
class InputProcessor(ABC):
    """Base interface for inputs processors"""

    @abstractmethod
    def process(self, input_dict: dict) -> dict:
        """Abstract method to be overwritten by concrete implementations of
        input processing."""


# pylint: disable=too-few-public-methods
class LSSInputProcessor(InputProcessor):
    """Processing when input data is in LSS mode"""

    def process(self, input_dict: dict) -> dict:
        """Gets impulse response arrays from Long Sine Sweep (LSS) measurements. The new
        signals are in A-Format.

        Parameters
        ----------
        input_dict : dict
            Dictionary with LSS measurement arrays

        Returns
        -------
        dict
            input_dict overwritten with A-Format signals
        """
        if input_dict["input_mode"] != InputMode.LSS:
            return input_dict

        input_dict["stacked_signals"] = np.apply_along_axis(
            lambda array: fftconvolve(array, input_dict["inverse_filter"], mode="full"),
            axis=1,
            arr=input_dict["stacked_signals"],
        )
        input_dict["input_mode"] = InputMode.AFORMAT

        return input_dict


# pylint: disable=too-few-public-methods
class AFormatProcessor(InputProcessor):
    """Processing when input data is in mode AFORMAT"""

    def process(self, input_dict: dict) -> dict:
        """Gets B-format arrays from A-format arrays. For more details see
        aira.utils.formatter.convert_ambisonics_a_to_b function.

        Parameters
        ----------
        input_dict : dict
            Dictionary with A-format arrays

        Returns
        -------
        dict
            input_dict overwritten with B-format signals
        """
        if input_dict["input_mode"] != InputMode.AFORMAT:
            return input_dict
        input_dict["stacked_signals"] = convert_ambisonics_a_to_b(
            input_dict["stacked_signals"][0, :],
            input_dict["stacked_signals"][1, :],
            input_dict["stacked_signals"][2, :],
            input_dict["stacked_signals"][3, :],
        )
        input_dict["input_mode"] = InputMode.BFORMAT
        return input_dict


# pylint: disable=too-few-public-methods
class BFormatProcessor(InputProcessor):
    """Processin when input data is in BFORMAT mode."""

    def process(self, input_dict: dict) -> dict:
        """Corrects B-format arrays frequency response for non-coincident microphones.

        Parameters
        ----------
        input_dict : dict
            Dictionary with B-format arrays.

        Returns
        -------
        dict
            input_dict overwritten with B-format frequency corrected arrays.
        """
        if input_dict["input_mode"] != InputMode.BFORMAT and not bool(
            input_dict["frequency_correction"]
        ):
            return input_dict

        frequency_corrector = NonCoincidentMicsCorrection(input_dict["sample_rate"])

        input_dict["stacked_signals"][0, :] = frequency_corrector.correct_omni(
            input_dict["stacked_signals"][0, :]
        )
        input_dict["stacked_signals"][1:, :] = frequency_corrector.correct_axis(
            input_dict["stacked_signals"][1:, :]
        )
        input_dict["input_mode"] = InputMode.BFORMAT
        return input_dict


# pylint: disable=too-few-public-methods
class InputProcessorChain:
    """Chain of input processors"""

    def __init__(self):
        self.processors = [LSSInputProcessor(), AFormatProcessor(), BFormatProcessor()]

    def process(self, input_dict: dict) -> np.ndarray:
        """Applies the chain of processors for the input_mode setted.

        Parameters
        ----------
        input_dict : dict
            Contains arrays and input mode data

        Returns
        -------
        np.ndarray
            Arrays processed stacked in single numpy.ndarray object
        """
        for process_i in self.processors:
            input_dict = process_i.process(input_dict)

        return input_dict["stacked_signals"]