"""Functionality for intensity computation and related signal processing.""" from typing import Tuple import numpy as np from aira.engine.filtering import apply_low_pass_filter FILTER_CUTOFF = 5000 OVERLAP_RATIO = 0.5 def analysis_crop_2d( analysis_length: float, sample_rate: int, intensity_directions: np.ndarray, ): """_summary_ Parameters ---------- analysis_length : float _description_ sample_rate : int _description_ intensity_directions : np.ndarray _description_ Returns ------- _type_ _description_ """ # Get analysis length max index analysis_length_idx = int(analysis_length * sample_rate) # Slice from intensity max to analysis length from intensity max earliest_peak_index = np.argmax(np.abs(intensity_directions), axis=1).min() intensity_directions_cropped = intensity_directions[ :, earliest_peak_index : earliest_peak_index + analysis_length_idx ] return intensity_directions_cropped def intensity_thresholding( threshold: float, intensity: np.ndarray, azimuth: np.ndarray, elevation: np.ndarray, reflections: np.ndarray, ) -> Tuple[np.ndarray]: """_summary_ Parameters ---------- threshold : _type_ _description_ """ reflex_to_direct = intensity_to_dB(intensity) - intensity_to_dB(intensity[0]) thresholding_mask = reflex_to_direct > threshold return ( reflex_to_direct[thresholding_mask], azimuth[thresholding_mask], elevation[thresholding_mask], reflections[thresholding_mask], ) def intensity_to_dB(intensity_array: np.ndarray) -> np.ndarray: """Converts intensity to dB scale using 1e-12 as intensity reference Parameters ---------- intensity_array : np.ndarray Intensity array Returns ------- np.ndarray Intensity array in dB scale """ return 10 * np.log10(intensity_array / 1e-12) def min_max_normalization(array: np.ndarray) -> np.ndarray: """Returns the input array normalized by its minimum and maximum value. Parameters ---------- array : np.ndarray Array to be normalized Returns ------- np.ndarray Array normalized """ return (array - array.min() * 1.1) / (array.max() - array.min() * 1.1) def integrate_intensity_directions( intensity_directions: np.ndarray, duration_secs: float, sample_rate: int, ) -> np.ndarray: """Integrate the intensity signal with Hamming windows of length `duration_secs`. Args: intensity_directions (np.ndarray): X, Y and Z intensity signals. duration_secs (float): the length of the window to apply, in seconds. sample_rate (int): sampling rate of the signal. Returns: np.ndarray: the integrated signal, of shape (3, ...) """ if intensity_directions.shape[0] == 4: intensity_directions = intensity_directions[1:, :] elif (intensity_directions.shape[0] < 3) or (intensity_directions.shape[0] > 4): raise ValueError(f"Unexpected input shape {intensity_directions.shape}") # Convert integration time to samples duration_samples = np.round(duration_secs * sample_rate).astype(np.int64) # Padding and windowing hop_size = int(duration_samples * (1 - OVERLAP_RATIO)) intensity_directions = np.concatenate( [ intensity_directions, np.zeros((3, intensity_directions.shape[1] % hop_size)), ], axis=1, ) output_shape = ( 3, int(intensity_directions.shape[1] / duration_samples / OVERLAP_RATIO) - 1, ) intensity_windowed = np.zeros(output_shape) time = np.zeros(output_shape[1]) window = np.hamming(duration_samples) for i in range(0, output_shape[1]): intensity_segment = intensity_directions[ :, i * hop_size : i * hop_size + duration_samples ] intensity_windowed[:, i] = np.mean(intensity_segment * window, axis=1) time[i] = i * hop_size / sample_rate # Add direct sound first with no windowing intensity_windowed = np.insert( intensity_windowed, 0, intensity_directions[:, 0], axis=1 ) return intensity_windowed, time def convert_bformat_to_intensity(signal: np.ndarray) -> Tuple[np.ndarray]: """Integrate and compute intensities for a B-format Ambisonics recording. Args: signal (np.ndarray): input B-format Ambisonics signal. Shape: (4, N). Returns: Tuple[np.ndarray]: integrated intensity, azimuth and elevation. """ # signal_filtered = apply_low_pass_filter(signal, cutoff_frequency, sample_rate) signal_filtered = signal # Calculate intensity from directions intensity_directions = ( signal_filtered[0, :] * signal_filtered[1:, :] ) # Intensity = pressure (W channel) * pressure gradient (XYZ channels) return intensity_directions