Spaces:
Sleeping
Sleeping
File size: 4,971 Bytes
f7fb447 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
"""Functionality for intensity computation and related signal processing."""
from typing import Tuple
import numpy as np
from aira.engine.filtering import apply_low_pass_filter
FILTER_CUTOFF = 5000
OVERLAP_RATIO = 0.5
def analysis_crop_2d(
analysis_length: float,
sample_rate: int,
intensity_directions: np.ndarray,
):
"""_summary_
Parameters
----------
analysis_length : float
_description_
sample_rate : int
_description_
intensity_directions : np.ndarray
_description_
Returns
-------
_type_
_description_
"""
# Get analysis length max index
analysis_length_idx = int(analysis_length * sample_rate)
# Slice from intensity max to analysis length from intensity max
earliest_peak_index = np.argmax(np.abs(intensity_directions), axis=1).min()
intensity_directions_cropped = intensity_directions[
:, earliest_peak_index : earliest_peak_index + analysis_length_idx
]
return intensity_directions_cropped
def intensity_thresholding(
threshold: float,
intensity: np.ndarray,
azimuth: np.ndarray,
elevation: np.ndarray,
reflections: np.ndarray,
) -> Tuple[np.ndarray]:
"""_summary_
Parameters
----------
threshold : _type_
_description_
"""
reflex_to_direct = intensity_to_dB(intensity) - intensity_to_dB(intensity[0])
thresholding_mask = reflex_to_direct > threshold
return (
reflex_to_direct[thresholding_mask],
azimuth[thresholding_mask],
elevation[thresholding_mask],
reflections[thresholding_mask],
)
def intensity_to_dB(intensity_array: np.ndarray) -> np.ndarray:
"""Converts intensity to dB scale using 1e-12 as intensity reference
Parameters
----------
intensity_array : np.ndarray
Intensity array
Returns
-------
np.ndarray
Intensity array in dB scale
"""
return 10 * np.log10(intensity_array / 1e-12)
def min_max_normalization(array: np.ndarray) -> np.ndarray:
"""Returns the input array normalized by its minimum and maximum value.
Parameters
----------
array : np.ndarray
Array to be normalized
Returns
-------
np.ndarray
Array normalized
"""
return (array - array.min() * 1.1) / (array.max() - array.min() * 1.1)
def integrate_intensity_directions(
intensity_directions: np.ndarray,
duration_secs: float,
sample_rate: int,
) -> np.ndarray:
"""Integrate the intensity signal with Hamming windows of length `duration_secs`.
Args:
intensity_directions (np.ndarray): X, Y and Z intensity signals.
duration_secs (float): the length of the window to apply, in seconds.
sample_rate (int): sampling rate of the signal.
Returns:
np.ndarray: the integrated signal, of shape (3, ...)
"""
if intensity_directions.shape[0] == 4:
intensity_directions = intensity_directions[1:, :]
elif (intensity_directions.shape[0] < 3) or (intensity_directions.shape[0] > 4):
raise ValueError(f"Unexpected input shape {intensity_directions.shape}")
# Convert integration time to samples
duration_samples = np.round(duration_secs * sample_rate).astype(np.int64)
# Padding and windowing
hop_size = int(duration_samples * (1 - OVERLAP_RATIO))
intensity_directions = np.concatenate(
[
intensity_directions,
np.zeros((3, intensity_directions.shape[1] % hop_size)),
],
axis=1,
)
output_shape = (
3,
int(intensity_directions.shape[1] / duration_samples / OVERLAP_RATIO) - 1,
)
intensity_windowed = np.zeros(output_shape)
time = np.zeros(output_shape[1])
window = np.hamming(duration_samples)
for i in range(0, output_shape[1]):
intensity_segment = intensity_directions[
:, i * hop_size : i * hop_size + duration_samples
]
intensity_windowed[:, i] = np.mean(intensity_segment * window, axis=1)
time[i] = i * hop_size / sample_rate
# Add direct sound first with no windowing
intensity_windowed = np.insert(
intensity_windowed, 0, intensity_directions[:, 0], axis=1
)
return intensity_windowed, time
def convert_bformat_to_intensity(signal: np.ndarray) -> Tuple[np.ndarray]:
"""Integrate and compute intensities for a B-format Ambisonics recording.
Args:
signal (np.ndarray): input B-format Ambisonics signal. Shape: (4, N).
Returns:
Tuple[np.ndarray]: integrated intensity, azimuth and elevation.
"""
# signal_filtered = apply_low_pass_filter(signal, cutoff_frequency, sample_rate)
signal_filtered = signal
# Calculate intensity from directions
intensity_directions = (
signal_filtered[0, :] * signal_filtered[1:, :]
) # Intensity = pressure (W channel) * pressure gradient (XYZ channels)
return intensity_directions
|