AIRA / aira /engine /intensity.py
nahue-passano
initial commit
f7fb447
raw
history blame
4.97 kB
"""Functionality for intensity computation and related signal processing."""
from typing import Tuple
import numpy as np
from aira.engine.filtering import apply_low_pass_filter
FILTER_CUTOFF = 5000
OVERLAP_RATIO = 0.5
def analysis_crop_2d(
analysis_length: float,
sample_rate: int,
intensity_directions: np.ndarray,
):
"""_summary_
Parameters
----------
analysis_length : float
_description_
sample_rate : int
_description_
intensity_directions : np.ndarray
_description_
Returns
-------
_type_
_description_
"""
# Get analysis length max index
analysis_length_idx = int(analysis_length * sample_rate)
# Slice from intensity max to analysis length from intensity max
earliest_peak_index = np.argmax(np.abs(intensity_directions), axis=1).min()
intensity_directions_cropped = intensity_directions[
:, earliest_peak_index : earliest_peak_index + analysis_length_idx
]
return intensity_directions_cropped
def intensity_thresholding(
threshold: float,
intensity: np.ndarray,
azimuth: np.ndarray,
elevation: np.ndarray,
reflections: np.ndarray,
) -> Tuple[np.ndarray]:
"""_summary_
Parameters
----------
threshold : _type_
_description_
"""
reflex_to_direct = intensity_to_dB(intensity) - intensity_to_dB(intensity[0])
thresholding_mask = reflex_to_direct > threshold
return (
reflex_to_direct[thresholding_mask],
azimuth[thresholding_mask],
elevation[thresholding_mask],
reflections[thresholding_mask],
)
def intensity_to_dB(intensity_array: np.ndarray) -> np.ndarray:
"""Converts intensity to dB scale using 1e-12 as intensity reference
Parameters
----------
intensity_array : np.ndarray
Intensity array
Returns
-------
np.ndarray
Intensity array in dB scale
"""
return 10 * np.log10(intensity_array / 1e-12)
def min_max_normalization(array: np.ndarray) -> np.ndarray:
"""Returns the input array normalized by its minimum and maximum value.
Parameters
----------
array : np.ndarray
Array to be normalized
Returns
-------
np.ndarray
Array normalized
"""
return (array - array.min() * 1.1) / (array.max() - array.min() * 1.1)
def integrate_intensity_directions(
intensity_directions: np.ndarray,
duration_secs: float,
sample_rate: int,
) -> np.ndarray:
"""Integrate the intensity signal with Hamming windows of length `duration_secs`.
Args:
intensity_directions (np.ndarray): X, Y and Z intensity signals.
duration_secs (float): the length of the window to apply, in seconds.
sample_rate (int): sampling rate of the signal.
Returns:
np.ndarray: the integrated signal, of shape (3, ...)
"""
if intensity_directions.shape[0] == 4:
intensity_directions = intensity_directions[1:, :]
elif (intensity_directions.shape[0] < 3) or (intensity_directions.shape[0] > 4):
raise ValueError(f"Unexpected input shape {intensity_directions.shape}")
# Convert integration time to samples
duration_samples = np.round(duration_secs * sample_rate).astype(np.int64)
# Padding and windowing
hop_size = int(duration_samples * (1 - OVERLAP_RATIO))
intensity_directions = np.concatenate(
[
intensity_directions,
np.zeros((3, intensity_directions.shape[1] % hop_size)),
],
axis=1,
)
output_shape = (
3,
int(intensity_directions.shape[1] / duration_samples / OVERLAP_RATIO) - 1,
)
intensity_windowed = np.zeros(output_shape)
time = np.zeros(output_shape[1])
window = np.hamming(duration_samples)
for i in range(0, output_shape[1]):
intensity_segment = intensity_directions[
:, i * hop_size : i * hop_size + duration_samples
]
intensity_windowed[:, i] = np.mean(intensity_segment * window, axis=1)
time[i] = i * hop_size / sample_rate
# Add direct sound first with no windowing
intensity_windowed = np.insert(
intensity_windowed, 0, intensity_directions[:, 0], axis=1
)
return intensity_windowed, time
def convert_bformat_to_intensity(signal: np.ndarray) -> Tuple[np.ndarray]:
"""Integrate and compute intensities for a B-format Ambisonics recording.
Args:
signal (np.ndarray): input B-format Ambisonics signal. Shape: (4, N).
Returns:
Tuple[np.ndarray]: integrated intensity, azimuth and elevation.
"""
# signal_filtered = apply_low_pass_filter(signal, cutoff_frequency, sample_rate)
signal_filtered = signal
# Calculate intensity from directions
intensity_directions = (
signal_filtered[0, :] * signal_filtered[1:, :]
) # Intensity = pressure (W channel) * pressure gradient (XYZ channels)
return intensity_directions