Spaces:
Sleeping
Sleeping
| """ | |
| Audio Feature Extractor - IMPROVED VERSION | |
| Extracts 14 voice features from audio to detect busy/distracted states. | |
| KEY IMPROVEMENTS: | |
| 1. HNR instead of SNR - Better for voice recordings (not affected by recording noise) | |
| 2. Smarter noise classification using multiple spectral features | |
| 3. Removed useless latency feature (t9_latency) from consideration | |
| """ | |
| import numpy as np | |
| import librosa | |
| import soundfile as sf | |
| from scipy import signal | |
| from typing import Dict, Tuple, List | |
| import noisereduce as nr | |
| import torch | |
| import warnings | |
| try: | |
| from .emotion_features import EmotionFeatureExtractor | |
| except ImportError: | |
| from emotion_features import EmotionFeatureExtractor | |
| warnings.filterwarnings("ignore") | |
| class AudioFeatureExtractor: | |
| """Extract 14 audio features for busy detection (Enhanced with Silero VAD)""" | |
| _vad_model_cache = None | |
| _vad_utils_cache = None | |
| _emotion_extractor_cache = None | |
| def __init__(self, sample_rate: int = 16000, use_emotion: bool = True, config: Dict = None, emotion_models_dir: str = None): | |
| self.config = config or {} | |
| self.sample_rate = self.config.get('audio_sample_rate', sample_rate) | |
| self.vad_sample_rate = self.config.get('vad_sample_rate', self.sample_rate) | |
| self.use_emotion = use_emotion and (not self.config.get('skip_emotion_features', False)) | |
| self.skip_noise_reduction = bool(self.config.get('skip_noise_reduction', False)) | |
| self.audio_duration_limit = self.config.get('audio_duration_limit', None) | |
| self.emotion_models_dir = emotion_models_dir | |
| print("Loading Silero VAD...") | |
| try: | |
| if AudioFeatureExtractor._vad_model_cache is None: | |
| AudioFeatureExtractor._vad_model_cache, AudioFeatureExtractor._vad_utils_cache = torch.hub.load( | |
| repo_or_dir='snakers4/silero-vad', | |
| model='silero_vad', | |
| force_reload=False, | |
| trust_repo=True | |
| ) | |
| self.vad_model = AudioFeatureExtractor._vad_model_cache | |
| utils = AudioFeatureExtractor._vad_utils_cache | |
| self.get_speech_timestamps = utils[0] | |
| print("[OK] Silero VAD loaded (cached)") | |
| except Exception as e: | |
| print(f"[WARN] Failed to load Silero VAD: {e}. Fallback to energy VAD might be needed.") | |
| self.vad_model = None | |
| if self.use_emotion: | |
| print("Loading Emotion CNN...") | |
| try: | |
| if AudioFeatureExtractor._emotion_extractor_cache is None: | |
| # Pass models dir to extractor | |
| AudioFeatureExtractor._emotion_extractor_cache = EmotionFeatureExtractor(models_dir=self.emotion_models_dir) | |
| self.emotion_extractor = AudioFeatureExtractor._emotion_extractor_cache | |
| print("[OK] Emotion CNN loaded (cached)") | |
| except Exception as e: | |
| print(f"[WARN] Emotion features disabled: {e}") | |
| self.emotion_extractor = None | |
| self.use_emotion = False | |
| else: | |
| self.emotion_extractor = None | |
| def _prepare_vad_audio(self, audio: np.ndarray) -> Tuple[np.ndarray, List[Dict]]: | |
| """Prepare audio for VAD and return speech timestamps.""" | |
| if self.vad_model is None or len(audio) < 512: | |
| return audio, [] | |
| audio_vad = audio | |
| if self.vad_sample_rate != self.sample_rate: | |
| try: | |
| audio_vad = librosa.resample(audio, orig_sr=self.sample_rate, target_sr=self.vad_sample_rate) | |
| except Exception: | |
| audio_vad = audio | |
| wav = torch.tensor(audio_vad, dtype=torch.float32).unsqueeze(0) | |
| try: | |
| speech_dict = self.get_speech_timestamps(wav, self.vad_model, sampling_rate=self.vad_sample_rate) | |
| except Exception: | |
| speech_dict = [] | |
| return audio_vad, speech_dict | |
| def _split_speech_pause(self, audio: np.ndarray) -> Tuple[np.ndarray, np.ndarray, int]: | |
| """Return speech audio, pause audio, and the sample rate used for VAD.""" | |
| if self.vad_model is None: | |
| return audio, np.array([], dtype=audio.dtype), self.sample_rate | |
| audio_vad, speech_dict = self._prepare_vad_audio(audio) | |
| if not speech_dict: | |
| return np.array([], dtype=audio_vad.dtype), audio_vad, self.vad_sample_rate | |
| mask = np.zeros(len(audio_vad), dtype=bool) | |
| for seg in speech_dict: | |
| start = max(0, int(seg.get('start', 0))) | |
| end = min(len(audio_vad), int(seg.get('end', 0))) | |
| if end > start: | |
| mask[start:end] = True | |
| speech_audio = audio_vad[mask] | |
| pause_audio = audio_vad[~mask] | |
| return speech_audio, pause_audio, self.vad_sample_rate | |
| def load_audio(self, audio_path: str) -> np.ndarray: | |
| """Load and preprocess audio file""" | |
| audio, sr = librosa.load( | |
| audio_path, | |
| sr=self.sample_rate, | |
| mono=True, | |
| duration=self.audio_duration_limit | |
| ) | |
| return audio | |
| def extract_snr(self, audio: np.ndarray) -> float: | |
| """ | |
| V1: Signal-to-Noise Ratio (SNR) | |
| Signal power is calculated only during speech; noise power only during pauses. | |
| """ | |
| if len(audio) == 0 or len(audio) < 2048: | |
| return 15.0 # Neutral default | |
| try: | |
| speech_audio, pause_audio, _ = self._split_speech_pause(audio) | |
| if len(speech_audio) == 0: | |
| return 0.0 | |
| signal_power = float(np.mean(speech_audio ** 2)) | |
| if signal_power <= 0: | |
| return 0.0 | |
| if len(pause_audio) > 0: | |
| noise_power = float(np.mean(pause_audio ** 2)) | |
| else: | |
| noise_power = 1e-8 | |
| if noise_power <= 0: | |
| noise_power = 1e-8 | |
| snr_db = 10.0 * np.log10(signal_power / noise_power) | |
| return float(np.clip(snr_db, -10.0, 40.0)) | |
| except Exception as e: | |
| print(f"SNR extraction failed: {e}") | |
| return 15.0 | |
| def extract_hnr(self, audio: np.ndarray) -> float: | |
| """ | |
| V1: Harmonics-to-Noise Ratio (HNR) | |
| Measures voice quality - higher = clearer voice | |
| IMPROVEMENT: HNR is better than SNR for voice because: | |
| - Not affected by recording equipment noise | |
| - Focuses on harmonic structure of speech | |
| - More robust to environmental noise | |
| Range: 0-30 dB (typical: 10-20 dB for clear speech) | |
| """ | |
| if len(audio) == 0 or len(audio) < 2048: | |
| return 15.0 # Neutral default | |
| try: | |
| # Method 1: Autocorrelation-based HNR (most accurate) | |
| frame_length = 2048 | |
| hop_length = 512 | |
| hnr_values = [] | |
| for i in range(0, len(audio) - frame_length, hop_length): | |
| frame = audio[i:i+frame_length] | |
| # Only process frames with enough energy | |
| energy = np.sum(frame ** 2) | |
| if energy < 0.001: | |
| continue | |
| # Autocorrelation | |
| autocorr = np.correlate(frame, frame, mode='full') | |
| autocorr = autocorr[len(autocorr)//2:] | |
| # Normalize | |
| if autocorr[0] > 0: | |
| autocorr = autocorr / autocorr[0] | |
| else: | |
| continue | |
| # Find fundamental frequency peak (skip first 20 samples = ~1250 Hz max) | |
| min_lag = int(self.sample_rate / 400) # Max 400 Hz | |
| max_lag = int(self.sample_rate / 75) # Min 75 Hz | |
| if max_lag >= len(autocorr): | |
| continue | |
| peak_idx = np.argmax(autocorr[min_lag:max_lag]) + min_lag | |
| if peak_idx > 0 and autocorr[peak_idx] > 0.3: # Minimum correlation threshold | |
| # HNR calculation | |
| periodic_power = autocorr[peak_idx] | |
| aperiodic_power = 1 - periodic_power | |
| if aperiodic_power > 0: | |
| hnr_db = 10 * np.log10(periodic_power / aperiodic_power) | |
| # Clip to realistic range | |
| hnr_db = np.clip(hnr_db, 0, 30) | |
| hnr_values.append(hnr_db) | |
| if len(hnr_values) > 0: | |
| # Return median (more robust than mean) | |
| return float(np.median(hnr_values)) | |
| # Method 2: Fallback using spectral flatness | |
| flatness = np.mean(librosa.feature.spectral_flatness(y=audio)) | |
| # Convert to HNR-like scale (inverted) | |
| hnr_proxy = (1 - np.clip(flatness, 0, 1)) * 25 | |
| return float(hnr_proxy) | |
| except Exception as e: | |
| print(f"HNR extraction failed: {e}") | |
| return 15.0 # Safe default | |
| def classify_noise_type(self, audio: np.ndarray) -> Dict[str, float]: | |
| """ | |
| V2: Background Noise Classification (one-hot encoded) | |
| IMPROVEMENT: Uses multiple spectral features for better accuracy: | |
| - Spectral centroid (frequency brightness) | |
| - Spectral rolloff (energy distribution) | |
| - Zero crossing rate (noisiness) | |
| - Low frequency energy (rumble) | |
| - High frequency energy (hiss) | |
| - Spectral contrast (texture) | |
| """ | |
| if len(audio) < 512: | |
| return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1} | |
| try: | |
| # Extract comprehensive spectral features | |
| S = np.abs(librosa.stft(audio)) | |
| if S.shape[1] == 0: | |
| return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1} | |
| # Feature 1: Spectral Centroid (brightness) - computed on pauses only | |
| pause_audio = None | |
| if self.vad_model is not None: | |
| _, pause_audio, vad_sr = self._split_speech_pause(audio) | |
| else: | |
| vad_sr = self.sample_rate | |
| if pause_audio is not None and len(pause_audio) >= 512: | |
| S_pause = np.abs(librosa.stft(pause_audio)) | |
| centroid = np.mean(librosa.feature.spectral_centroid(S=S_pause, sr=vad_sr)) | |
| else: | |
| centroid = np.mean(librosa.feature.spectral_centroid(S=S, sr=self.sample_rate)) | |
| # Feature 2: Spectral Rolloff (energy concentration) | |
| rolloff = np.mean(librosa.feature.spectral_rolloff(S=S, sr=self.sample_rate)) | |
| # Feature 3: Zero Crossing Rate | |
| zcr = np.mean(librosa.feature.zero_crossing_rate(audio)) | |
| # Feature 4: Low frequency energy (0-500 Hz) | |
| freqs = librosa.fft_frequencies(sr=self.sample_rate, n_fft=2048) | |
| low_freq_mask = freqs < 500 | |
| low_energy = np.mean(S[low_freq_mask, :]) if np.any(low_freq_mask) else 0 | |
| # Feature 5: High frequency energy (4000+ Hz) | |
| high_freq_mask = freqs > 4000 | |
| high_energy = np.mean(S[high_freq_mask, :]) if np.any(high_freq_mask) else 0 | |
| # Feature 6: Overall energy | |
| total_energy = np.mean(audio ** 2) | |
| # Feature 7: Spectral contrast (texture measure) | |
| contrast = np.mean(librosa.feature.spectral_contrast(S=S, sr=self.sample_rate)) | |
| # Score each noise type based on features | |
| scores = { | |
| 'traffic': 0.0, | |
| 'office': 0.0, | |
| 'crowd': 0.0, | |
| 'wind': 0.0, | |
| 'clean': 0.0 | |
| } | |
| # Traffic: Low frequency dominant + rumble + consistent | |
| if low_energy > 0.002 and centroid < 2000 and contrast < 20: | |
| scores['traffic'] = low_energy * 100 + (2500 - centroid) / 1000 | |
| # Office: Mid frequencies + keyboard clicks + air conditioning hum | |
| if 1500 < centroid < 3500 and 0.0005 < total_energy < 0.005: | |
| scores['office'] = (3500 - abs(centroid - 2500)) / 1000 + contrast / 30 | |
| # Crowd: High ZCR + varying spectrum + speech-like energy | |
| if zcr > 0.08 and total_energy > 0.003 and contrast > 15: | |
| scores['crowd'] = zcr * 10 + total_energy * 50 | |
| # Wind: Very high ZCR + high frequency energy + low contrast | |
| if zcr > 0.12 and high_energy > 0.001 and contrast < 15: | |
| scores['wind'] = zcr * 8 + high_energy * 100 | |
| # Clean: Low energy + low ZCR + high contrast (speech only) | |
| if total_energy < 0.005 and zcr < 0.08 and contrast > 20: | |
| scores['clean'] = (0.005 - total_energy) * 200 + contrast / 30 | |
| # If all scores are low, default to clean | |
| if max(scores.values()) < 0.1: | |
| scores['clean'] = 1.0 | |
| # Normalize to probabilities | |
| total = sum(scores.values()) | |
| if total > 0: | |
| scores = {k: v/total for k, v in scores.items()} | |
| else: | |
| scores['clean'] = 1.0 | |
| return scores | |
| except Exception as e: | |
| print(f"Noise classification failed: {e}") | |
| return {'traffic': 0, 'office': 0, 'crowd': 0, 'wind': 0, 'clean': 1} | |
| def extract_speech_rate(self, audio: np.ndarray, transcript: str) -> float: | |
| """V3: Speech Rate (words per second)""" | |
| if not transcript: | |
| return 0.0 | |
| word_count = len(transcript.split()) | |
| duration = len(audio) / self.sample_rate | |
| if duration == 0: | |
| return 0.0 | |
| return word_count / duration | |
| def extract_pitch_features(self, audio: np.ndarray) -> Tuple[float, float]: | |
| """V4-V5: Pitch Mean and Std""" | |
| try: | |
| if len(audio) < 2048: | |
| return 0.0, 0.0 | |
| # Use pyin (more robust than yin) | |
| f0, voiced_flag, voiced_probs = librosa.pyin( | |
| audio, | |
| fmin=librosa.note_to_hz('C2'), | |
| fmax=librosa.note_to_hz('C7'), | |
| sr=self.sample_rate | |
| ) | |
| # Only use voiced frames | |
| f0_voiced = f0[voiced_flag] | |
| if len(f0_voiced) == 0: | |
| return 0.0, 0.0 | |
| return float(np.mean(f0_voiced)), float(np.std(f0_voiced)) | |
| except Exception as e: | |
| print(f"Pitch extraction failed: {e}") | |
| return 0.0, 0.0 | |
| def extract_energy_features(self, audio: np.ndarray) -> Tuple[float, float]: | |
| """V6-V7: Energy Mean and Std""" | |
| try: | |
| rms = librosa.feature.rms(y=audio)[0] | |
| e_mean = float(np.mean(rms)) | |
| e_std = float(np.std(rms)) | |
| if e_mean > 0: | |
| e_std = e_std / e_mean | |
| else: | |
| e_std = 0.0 | |
| return e_mean, e_std | |
| except: | |
| return 0.0, 0.0 | |
| def extract_pause_features(self, audio: np.ndarray) -> Tuple[float, float, int]: | |
| """ | |
| V8-V10: Pause Ratio, Average Pause Duration, Mid-Pause Count | |
| Uses Silero VAD | |
| """ | |
| if self.vad_model is None or len(audio) < 512: | |
| return 0.0, 0.0, 0 | |
| try: | |
| audio_vad, speech_dict = self._prepare_vad_audio(audio) | |
| # Calculate speech duration | |
| speech_samples = sum(seg['end'] - seg['start'] for seg in speech_dict) | |
| total_samples = len(audio_vad) | |
| if total_samples == 0: | |
| return 0.0, 0.0, 0 | |
| # Pause Ratio | |
| pause_samples = total_samples - speech_samples | |
| pause_ratio = pause_samples / total_samples | |
| # Calculate gaps between speech segments | |
| gaps = [] | |
| if len(speech_dict) > 1: | |
| for i in range(len(speech_dict) - 1): | |
| gap = speech_dict[i+1]['start'] - speech_dict[i]['end'] | |
| if gap > 0: | |
| gaps.append(gap / self.vad_sample_rate) # Convert to seconds | |
| avg_pause_dur = float(np.mean(gaps)) if gaps else 0.0 | |
| # Mid-Pause Count (0.3s - 1.0s) | |
| mid_pause_cnt = sum(1 for g in gaps if 0.3 <= g <= 1.0) | |
| return float(pause_ratio), float(avg_pause_dur), int(mid_pause_cnt) | |
| except Exception as e: | |
| print(f"VAD Error: {e}") | |
| return 0.0, 0.0, 0 | |
| def extract_all(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]: | |
| """Extract all audio features (14 original + 3 emotion = 17 total)""" | |
| if audio.dtype != np.float32: | |
| audio = audio.astype(np.float32) | |
| features = {} | |
| # V1: SNR (speech-only signal vs pause-only noise) | |
| features['v1_snr'] = self.extract_snr(audio) | |
| # V2: Noise classification (IMPROVED) | |
| noise_class = self.classify_noise_type(audio) | |
| features['v2_noise_traffic'] = noise_class['traffic'] | |
| features['v2_noise_office'] = noise_class['office'] | |
| features['v2_noise_crowd'] = noise_class['crowd'] | |
| features['v2_noise_wind'] = noise_class['wind'] | |
| features['v2_noise_clean'] = noise_class['clean'] | |
| # V3: Speech rate | |
| features['v3_speech_rate'] = self.extract_speech_rate(audio, transcript) | |
| # V4-V5: Pitch | |
| p_mean, p_std = self.extract_pitch_features(audio) | |
| features['v4_pitch_mean'] = p_mean | |
| features['v5_pitch_std'] = p_std | |
| # V6-V7: Energy | |
| e_mean, e_std = self.extract_energy_features(audio) | |
| features['v6_energy_mean'] = e_mean | |
| features['v7_energy_std'] = e_std | |
| # V8-V10: Pause features | |
| pause_ratio, avg_pause, mid_pause_cnt = self.extract_pause_features(audio) | |
| features['v8_pause_ratio'] = pause_ratio | |
| features['v9_avg_pause_dur'] = avg_pause | |
| features['v10_mid_pause_cnt'] = float(mid_pause_cnt) | |
| # V11-V13: Emotion features | |
| if self.use_emotion and self.emotion_extractor is not None: | |
| try: | |
| emotion_features = self.emotion_extractor.extract_all(audio, self.sample_rate) | |
| features.update(emotion_features) | |
| except Exception as e: | |
| print(f"⚠ Emotion features skipped: {e}") | |
| # Add zero values for compatibility | |
| features['v11_emotion_stress'] = 0.0 | |
| features['v12_emotion_energy'] = 0.0 | |
| features['v13_emotion_valence'] = 0.0 | |
| return features | |
| def extract_basic(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]: | |
| """ | |
| Extract a minimal set of audio features for fast decisions. | |
| Uses only low-cost features. | |
| """ | |
| if audio.dtype != np.float32: | |
| audio = audio.astype(np.float32) | |
| features = {} | |
| features['v1_snr'] = self.extract_snr(audio) | |
| features['v3_speech_rate'] = self.extract_speech_rate(audio, transcript) | |
| e_mean, e_std = self.extract_energy_features(audio) | |
| features['v6_energy_mean'] = e_mean | |
| features['v7_energy_std'] = e_std | |
| pause_ratio, avg_pause, mid_pause_cnt = self.extract_pause_features(audio) | |
| features['v8_pause_ratio'] = pause_ratio | |
| features['v9_avg_pause_dur'] = avg_pause | |
| features['v10_mid_pause_cnt'] = float(mid_pause_cnt) | |
| return features | |
| if __name__ == "__main__": | |
| extractor = AudioFeatureExtractor() | |
| print("Audio Feature Extractor initialized successfully") | |
| print("Using HNR instead of SNR for better voice quality measurement") | |