| | """ |
| | Audio Processing Module for Speech Pathology Diagnosis |
| | |
| | This module provides audio processing utilities including: |
| | - Audio loading, resampling, and normalization |
| | - Audio chunking for phone-level analysis |
| | - Voice Activity Detection (VAD) integration |
| | - Streaming audio buffer management |
| | """ |
| |
|
| | import logging |
| | import numpy as np |
| | import librosa |
| | import soundfile as sf |
| | import webrtcvad |
| | from typing import List, Optional, Tuple, Union, Iterator |
| | from pathlib import Path |
| | from dataclasses import dataclass |
| | from collections import deque |
| | import io |
| |
|
| | from config import AudioConfig |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | @dataclass |
| | class AudioChunk: |
| | """ |
| | Container for an audio chunk with metadata. |
| | |
| | Attributes: |
| | data: Audio samples as numpy array |
| | sample_rate: Sample rate in Hz |
| | start_time_ms: Start time in milliseconds |
| | end_time_ms: End time in milliseconds |
| | is_speech: Whether VAD detected speech in this chunk |
| | chunk_index: Index of chunk in sequence |
| | """ |
| | data: np.ndarray |
| | sample_rate: int |
| | start_time_ms: float |
| | end_time_ms: float |
| | is_speech: bool = False |
| | chunk_index: int = 0 |
| |
|
| |
|
| | class AudioProcessor: |
| | """ |
| | Audio processing utility for speech pathology diagnosis. |
| | |
| | Handles: |
| | - Loading audio from files or arrays |
| | - Resampling to target sample rate (16kHz) |
| | - Normalization to [-1, 1] range |
| | - Chunking audio into phone-level frames (20ms) |
| | - Voice Activity Detection (VAD) integration |
| | """ |
| | |
| | def __init__(self, audio_config: Optional[AudioConfig] = None): |
| | """ |
| | Initialize AudioProcessor. |
| | |
| | Args: |
| | audio_config: Audio configuration. Uses default if None. |
| | """ |
| | from config import default_audio_config |
| | |
| | self.config = audio_config or default_audio_config |
| | self.target_sr = self.config.sample_rate |
| | self.chunk_duration_ms = self.config.chunk_duration_ms |
| | self.hop_length_ms = self.config.hop_length_ms |
| | |
| | |
| | self.chunk_size_samples = int(self.chunk_duration_ms * self.target_sr / 1000) |
| | self.hop_size_samples = int(self.hop_length_ms * self.target_sr / 1000) |
| | |
| | |
| | try: |
| | self.vad = webrtcvad.Vad(self.config.vad_aggressiveness) |
| | logger.info(f"VAD initialized with aggressiveness={self.config.vad_aggressiveness}") |
| | except Exception as e: |
| | logger.warning(f"Failed to initialize VAD: {e}. VAD features will be disabled.") |
| | self.vad = None |
| | |
| | logger.info(f"AudioProcessor initialized: target_sr={self.target_sr}Hz, " |
| | f"chunk_duration={self.chunk_duration_ms}ms, " |
| | f"hop_length={self.hop_length_ms}ms") |
| | |
| | def load_audio( |
| | self, |
| | audio_source: Union[str, Path, np.ndarray, bytes], |
| | target_sr: Optional[int] = None |
| | ) -> Tuple[np.ndarray, int]: |
| | """ |
| | Load audio from file, array, or bytes. |
| | |
| | Args: |
| | audio_source: Audio file path, numpy array, or bytes |
| | target_sr: Target sample rate (defaults to config sample_rate) |
| | |
| | Returns: |
| | Tuple of (audio_array, sample_rate) |
| | |
| | Raises: |
| | ValueError: If audio cannot be loaded |
| | RuntimeError: If audio processing fails |
| | """ |
| | target_sr = target_sr or self.target_sr |
| | |
| | try: |
| | if isinstance(audio_source, (str, Path)): |
| | |
| | logger.debug(f"Loading audio from file: {audio_source}") |
| | audio_array, sr = librosa.load(str(audio_source), sr=target_sr, mono=True) |
| | logger.info(f"Loaded audio: {len(audio_array)} samples, {sr}Hz, " |
| | f"{len(audio_array)/sr:.2f}s duration") |
| | |
| | elif isinstance(audio_source, bytes): |
| | |
| | logger.debug("Loading audio from bytes") |
| | audio_io = io.BytesIO(audio_source) |
| | audio_array, sr = librosa.load(audio_io, sr=target_sr, mono=True) |
| | logger.info(f"Loaded audio from bytes: {len(audio_array)} samples, {sr}Hz") |
| | |
| | elif isinstance(audio_source, np.ndarray): |
| | |
| | audio_array = audio_source |
| | if len(audio_array.shape) > 1: |
| | audio_array = librosa.to_mono(audio_array) |
| | |
| | |
| | if target_sr and len(audio_array) > 0: |
| | |
| | |
| | |
| | if target_sr != self.target_sr: |
| | audio_array = librosa.resample( |
| | audio_array, |
| | orig_sr=self.target_sr, |
| | target_sr=target_sr |
| | ) |
| | sr = target_sr |
| | logger.debug(f"Using audio array: {len(audio_array)} samples") |
| | |
| | else: |
| | raise ValueError(f"Unsupported audio source type: {type(audio_source)}") |
| | |
| | |
| | audio_array = self.normalize_audio(audio_array) |
| | |
| | return audio_array, sr |
| | |
| | except Exception as e: |
| | logger.error(f"Failed to load audio: {e}", exc_info=True) |
| | raise ValueError(f"Cannot load audio: {e}") from e |
| | |
| | def normalize_audio(self, audio: np.ndarray) -> np.ndarray: |
| | """ |
| | Normalize audio to [-1, 1] range. |
| | |
| | Args: |
| | audio: Audio array |
| | |
| | Returns: |
| | Normalized audio array |
| | """ |
| | if len(audio) == 0: |
| | return audio |
| | |
| | max_val = np.abs(audio).max() |
| | if max_val > 0: |
| | audio = audio / max_val |
| | |
| | |
| | audio = np.clip(audio, -1.0, 1.0) |
| | |
| | return audio |
| | |
| | def resample_audio( |
| | self, |
| | audio: np.ndarray, |
| | orig_sr: int, |
| | target_sr: Optional[int] = None |
| | ) -> np.ndarray: |
| | """ |
| | Resample audio to target sample rate. |
| | |
| | Args: |
| | audio: Audio array |
| | orig_sr: Original sample rate |
| | target_sr: Target sample rate (defaults to config sample_rate) |
| | |
| | Returns: |
| | Resampled audio array |
| | """ |
| | target_sr = target_sr or self.target_sr |
| | |
| | if orig_sr == target_sr: |
| | return audio |
| | |
| | logger.debug(f"Resampling from {orig_sr}Hz to {target_sr}Hz") |
| | resampled = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) |
| | return resampled |
| | |
| | def chunk_audio( |
| | self, |
| | audio: np.ndarray, |
| | sample_rate: Optional[int] = None, |
| | apply_vad: bool = False |
| | ) -> Iterator[AudioChunk]: |
| | """ |
| | Chunk audio into overlapping frames for phone-level analysis. |
| | |
| | Args: |
| | audio: Audio array |
| | sample_rate: Sample rate (defaults to config sample_rate) |
| | apply_vad: Whether to apply VAD to detect speech chunks |
| | |
| | Yields: |
| | AudioChunk objects |
| | """ |
| | sample_rate = sample_rate or self.target_sr |
| | |
| | if len(audio) < self.chunk_size_samples: |
| | |
| | chunk = AudioChunk( |
| | data=audio, |
| | sample_rate=sample_rate, |
| | start_time_ms=0.0, |
| | end_time_ms=len(audio) / sample_rate * 1000, |
| | is_speech=self._detect_speech(audio, sample_rate) if apply_vad else False, |
| | chunk_index=0 |
| | ) |
| | yield chunk |
| | return |
| | |
| | chunk_index = 0 |
| | for start_sample in range(0, len(audio) - self.chunk_size_samples + 1, |
| | self.hop_size_samples): |
| | end_sample = start_sample + self.chunk_size_samples |
| | |
| | chunk_data = audio[start_sample:end_sample] |
| | start_time_ms = start_sample / sample_rate * 1000 |
| | end_time_ms = end_sample / sample_rate * 1000 |
| | |
| | |
| | is_speech = False |
| | if apply_vad: |
| | is_speech = self._detect_speech(chunk_data, sample_rate) |
| | |
| | chunk = AudioChunk( |
| | data=chunk_data, |
| | sample_rate=sample_rate, |
| | start_time_ms=start_time_ms, |
| | end_time_ms=end_time_ms, |
| | is_speech=is_speech, |
| | chunk_index=chunk_index |
| | ) |
| | |
| | yield chunk |
| | chunk_index += 1 |
| | |
| | def _detect_speech(self, audio_chunk: np.ndarray, sample_rate: int) -> bool: |
| | """ |
| | Detect if audio chunk contains speech using VAD. |
| | |
| | Args: |
| | audio_chunk: Audio chunk array |
| | sample_rate: Sample rate |
| | |
| | Returns: |
| | True if speech detected, False otherwise |
| | """ |
| | if self.vad is None: |
| | return True |
| | |
| | |
| | if sample_rate not in [8000, 16000, 32000, 48000]: |
| | logger.warning(f"VAD requires sample rate 8/16/32/48kHz, got {sample_rate}Hz. Skipping VAD.") |
| | return True |
| | |
| | |
| | frame_duration_ms = len(audio_chunk) / sample_rate * 1000 |
| | if frame_duration_ms not in [10, 20, 30]: |
| | logger.debug(f"Frame duration {frame_duration_ms}ms not optimal for VAD. Using anyway.") |
| | |
| | try: |
| | |
| | |
| | int16_audio = (audio_chunk * 32767).astype(np.int16) |
| | |
| | |
| | audio_bytes = int16_audio.tobytes() |
| | |
| | |
| | is_speech = self.vad.is_speech(audio_bytes, sample_rate) |
| | |
| | return is_speech |
| | |
| | except Exception as e: |
| | logger.warning(f"VAD detection failed: {e}. Assuming speech.") |
| | return True |
| | |
| | def get_speech_segments( |
| | self, |
| | audio: np.ndarray, |
| | sample_rate: Optional[int] = None, |
| | min_speech_duration_ms: float = 100.0 |
| | ) -> List[Tuple[float, float]]: |
| | """ |
| | Get speech segments from audio using VAD. |
| | |
| | Args: |
| | audio: Audio array |
| | sample_rate: Sample rate |
| | min_speech_duration_ms: Minimum duration of speech segment to include |
| | |
| | Returns: |
| | List of (start_ms, end_ms) tuples for speech segments |
| | """ |
| | sample_rate = sample_rate or self.target_sr |
| | |
| | if self.vad is None: |
| | |
| | duration_ms = len(audio) / sample_rate * 1000 |
| | return [(0.0, duration_ms)] |
| | |
| | speech_segments = [] |
| | in_speech = False |
| | speech_start_ms = 0.0 |
| | |
| | |
| | for chunk in self.chunk_audio(audio, sample_rate, apply_vad=True): |
| | if chunk.is_speech and not in_speech: |
| | |
| | in_speech = True |
| | speech_start_ms = chunk.start_time_ms |
| | elif not chunk.is_speech and in_speech: |
| | |
| | in_speech = False |
| | duration_ms = chunk.start_time_ms - speech_start_ms |
| | if duration_ms >= min_speech_duration_ms: |
| | speech_segments.append((speech_start_ms, chunk.start_time_ms)) |
| | |
| | |
| | if in_speech: |
| | duration_ms = len(audio) / sample_rate * 1000 - speech_start_ms |
| | if duration_ms >= min_speech_duration_ms: |
| | speech_segments.append((speech_start_ms, len(audio) / sample_rate * 1000)) |
| | |
| | logger.info(f"Detected {len(speech_segments)} speech segments") |
| | return speech_segments |
| | |
| | def process_audio_file( |
| | self, |
| | file_path: Union[str, Path], |
| | apply_vad: bool = False |
| | ) -> Tuple[np.ndarray, int, List[AudioChunk]]: |
| | """ |
| | Complete audio processing pipeline: load, normalize, chunk. |
| | |
| | Args: |
| | file_path: Path to audio file |
| | apply_vad: Whether to apply VAD |
| | |
| | Returns: |
| | Tuple of (audio_array, sample_rate, chunks_list) |
| | """ |
| | logger.info(f"Processing audio file: {file_path}") |
| | |
| | |
| | audio, sr = self.load_audio(file_path) |
| | |
| | |
| | chunks = list(self.chunk_audio(audio, sr, apply_vad=apply_vad)) |
| | |
| | logger.info(f"Processed audio: {len(audio)} samples, {len(chunks)} chunks") |
| | |
| | return audio, sr, chunks |
| |
|
| |
|
| | class StreamingAudioBuffer: |
| | """ |
| | Buffer for managing streaming audio chunks. |
| | |
| | Maintains a sliding window buffer for real-time audio processing. |
| | Handles chunk accumulation, overflow, and underflow scenarios. |
| | """ |
| | |
| | def __init__( |
| | self, |
| | buffer_duration_ms: float = 1000.0, |
| | chunk_duration_ms: float = 20.0, |
| | sample_rate: int = 16000 |
| | ): |
| | """ |
| | Initialize streaming audio buffer. |
| | |
| | Args: |
| | buffer_duration_ms: Maximum buffer duration in milliseconds |
| | chunk_duration_ms: Expected chunk duration in milliseconds |
| | sample_rate: Sample rate in Hz |
| | """ |
| | self.sample_rate = sample_rate |
| | self.chunk_duration_ms = chunk_duration_ms |
| | self.buffer_duration_ms = buffer_duration_ms |
| | |
| | |
| | self.buffer_size_samples = int(buffer_duration_ms * sample_rate / 1000) |
| | self.chunk_size_samples = int(chunk_duration_ms * sample_rate / 1000) |
| | |
| | |
| | self.buffer = deque(maxlen=self.buffer_size_samples) |
| | |
| | |
| | self.total_samples_received = 0 |
| | self.total_chunks_received = 0 |
| | self.overflow_count = 0 |
| | self.underflow_count = 0 |
| | |
| | logger.info(f"StreamingAudioBuffer initialized: " |
| | f"buffer_duration={buffer_duration_ms}ms, " |
| | f"chunk_duration={chunk_duration_ms}ms, " |
| | f"sample_rate={sample_rate}Hz") |
| | |
| | def add_chunk(self, audio_chunk: np.ndarray) -> bool: |
| | """ |
| | Add audio chunk to buffer. |
| | |
| | Args: |
| | audio_chunk: Audio chunk array |
| | |
| | Returns: |
| | True if chunk added successfully, False if buffer overflow |
| | """ |
| | if len(audio_chunk) == 0: |
| | return True |
| | |
| | |
| | if len(self.buffer) + len(audio_chunk) > self.buffer_size_samples: |
| | self.overflow_count += 1 |
| | logger.warning(f"Buffer overflow! Dropping oldest samples. " |
| | f"Buffer: {len(self.buffer)}/{self.buffer_size_samples} samples") |
| | |
| | samples_to_remove = len(self.buffer) + len(audio_chunk) - self.buffer_size_samples |
| | for _ in range(samples_to_remove): |
| | if self.buffer: |
| | self.buffer.popleft() |
| | |
| | |
| | self.buffer.extend(audio_chunk) |
| | self.total_samples_received += len(audio_chunk) |
| | self.total_chunks_received += 1 |
| | |
| | return True |
| | |
| | def get_chunk(self, chunk_duration_ms: Optional[float] = None) -> Optional[np.ndarray]: |
| | """ |
| | Get next chunk from buffer. |
| | |
| | Args: |
| | chunk_duration_ms: Chunk duration in milliseconds (defaults to configured) |
| | |
| | Returns: |
| | Audio chunk array or None if buffer doesn't have enough samples |
| | """ |
| | chunk_duration_ms = chunk_duration_ms or self.chunk_duration_ms |
| | chunk_size_samples = int(chunk_duration_ms * self.sample_rate / 1000) |
| | |
| | if len(self.buffer) < chunk_size_samples: |
| | self.underflow_count += 1 |
| | return None |
| | |
| | |
| | chunk = np.array([self.buffer.popleft() for _ in range(chunk_size_samples)]) |
| | |
| | return chunk |
| | |
| | def get_buffer(self, max_samples: Optional[int] = None) -> np.ndarray: |
| | """ |
| | Get entire buffer contents. |
| | |
| | Args: |
| | max_samples: Maximum number of samples to return (None = all) |
| | |
| | Returns: |
| | Audio array from buffer |
| | """ |
| | if max_samples is None: |
| | return np.array(self.buffer) |
| | else: |
| | return np.array(list(self.buffer)[:max_samples]) |
| | |
| | def clear(self): |
| | """Clear the buffer.""" |
| | self.buffer.clear() |
| | logger.debug("Buffer cleared") |
| | |
| | def get_stats(self) -> dict: |
| | """ |
| | Get buffer statistics. |
| | |
| | Returns: |
| | Dictionary with buffer statistics |
| | """ |
| | return { |
| | "buffer_size_samples": len(self.buffer), |
| | "buffer_capacity_samples": self.buffer_size_samples, |
| | "buffer_utilization": len(self.buffer) / self.buffer_size_samples, |
| | "total_samples_received": self.total_samples_received, |
| | "total_chunks_received": self.total_chunks_received, |
| | "overflow_count": self.overflow_count, |
| | "underflow_count": self.underflow_count, |
| | "buffer_duration_ms": len(self.buffer) / self.sample_rate * 1000 |
| | } |
| | |
| | def has_enough_data(self, chunk_duration_ms: Optional[float] = None) -> bool: |
| | """ |
| | Check if buffer has enough data for a chunk. |
| | |
| | Args: |
| | chunk_duration_ms: Chunk duration in milliseconds |
| | |
| | Returns: |
| | True if buffer has enough samples |
| | """ |
| | chunk_duration_ms = chunk_duration_ms or self.chunk_duration_ms |
| | chunk_size_samples = int(chunk_duration_ms * self.sample_rate / 1000) |
| | return len(self.buffer) >= chunk_size_samples |
| | |
| | def get_available_duration_ms(self) -> float: |
| | """ |
| | Get available audio duration in buffer in milliseconds. |
| | |
| | Returns: |
| | Duration in milliseconds |
| | """ |
| | return len(self.buffer) / self.sample_rate * 1000 |
| |
|
| |
|