Spaces:
Build error
Build error
| from scipy.signal import butter, filtfilt, find_peaks | |
| from scipy.signal import savgol_filter, find_peaks | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.cluster import KMeans | |
| import plotly.graph_objects as go | |
| import pandas as pd | |
| import numpy as np | |
| import librosa | |
| import pywt | |
| # GENERAL HELPER FUNCTIONS | |
| def denoise_audio(audiodata: np.ndarray, sr: int) -> tuple[np.ndarray, int]: | |
| """ | |
| Enhanced denoising of audio signals optimized for heart sounds. | |
| Uses a combination of bandpass filtering, adaptive wavelet denoising, | |
| and improved spectral subtraction. | |
| Parameters: | |
| ----------- | |
| audiodata : np.ndarray | |
| Input audio signal (1D numpy array) | |
| sr : int | |
| Sampling rate in Hz | |
| Returns: | |
| -------- | |
| tuple[np.ndarray, int] | |
| Tuple containing (denoised_signal, sampling_rate) | |
| """ | |
| # Input validation and conversion | |
| if not isinstance(audiodata, np.ndarray) or audiodata.ndim != 1: | |
| raise ValueError("audiodata must be a 1D numpy array") | |
| if not isinstance(sr, int) or sr <= 0: | |
| raise ValueError("sr must be a positive integer") | |
| # Convert to float32 and normalize | |
| audio = audiodata.astype(np.float32) | |
| audio = audio / np.max(np.abs(audio)) | |
| # 1. Enhanced Bandpass Filter | |
| # Optimize frequency range for heart sounds (20-200 Hz) | |
| nyquist = sr / 2 | |
| low, high = 20 / nyquist, 200 / nyquist | |
| order = 4 # Filter order | |
| b, a = butter(order, [low, high], btype='band') | |
| filtered = filtfilt(b, a, audio) | |
| # 2. Adaptive Wavelet Denoising | |
| def apply_wavelet_denoising(sig): | |
| # Use sym4 wavelet (good for biomedical signals) | |
| wavelet = 'sym4' | |
| level = min(6, pywt.dwt_max_level(len(sig), pywt.Wavelet(wavelet).dec_len)) | |
| # Decompose signal | |
| coeffs = pywt.wavedec(sig, wavelet, level=level) | |
| # Adaptive thresholding based on level | |
| for i in range(1, len(coeffs)): | |
| # Calculate level-dependent threshold | |
| sigma = np.median(np.abs(coeffs[i])) / 0.6745 | |
| threshold = sigma * np.sqrt(2 * np.log(len(coeffs[i]))) | |
| # Adjust threshold based on decomposition level | |
| level_factor = 1 - (i / len(coeffs)) # Higher levels get lower thresholds | |
| coeffs[i] = pywt.threshold(coeffs[i], threshold * level_factor, mode='soft') | |
| return pywt.waverec(coeffs, wavelet) | |
| # Apply wavelet denoising | |
| denoised = apply_wavelet_denoising(filtered) | |
| # Ensure consistent length | |
| if len(denoised) != len(audio): | |
| denoised = librosa.util.fix_length(denoised, size=len(audio)) | |
| # 3. Improved Spectral Subtraction | |
| def spectral_subtract(sig): | |
| # Parameters | |
| frame_length = int(sr * 0.04) # 40ms frames | |
| hop_length = frame_length // 2 | |
| # Compute STFT | |
| D = librosa.stft(sig, n_fft=frame_length, hop_length=hop_length) | |
| mag, phase = np.abs(D), np.angle(D) | |
| # Estimate noise spectrum from low-energy frames | |
| frame_energy = np.sum(mag**2, axis=0) | |
| noise_threshold = np.percentile(frame_energy, 15) | |
| noise_frames = mag[:, frame_energy < noise_threshold] | |
| if noise_frames.size > 0: | |
| noise_spectrum = np.median(noise_frames, axis=1) | |
| # Oversubtraction factor (frequency-dependent) | |
| freq_bins = np.fft.rfftfreq(frame_length, 1/sr) | |
| alpha = 1.0 + 0.01 * (freq_bins / nyquist) | |
| alpha = alpha[:len(noise_spectrum)].reshape(-1, 1) | |
| # Spectral subtraction with flooring | |
| mag_clean = np.maximum(mag - alpha * noise_spectrum.reshape(-1, 1), 0.01 * mag) | |
| # Reconstruct signal | |
| D_clean = mag_clean * np.exp(1j * phase) | |
| return librosa.istft(D_clean, hop_length=hop_length) | |
| return sig | |
| # Apply spectral subtraction | |
| final = spectral_subtract(denoised) | |
| # Final normalization | |
| final = final / np.max(np.abs(final)) | |
| return final, sr | |
| def getaudiodata(filepath: str, target_sr: int = 16000) -> tuple[int, np.ndarray]: | |
| """ | |
| Load and process audio data with consistent output properties. | |
| Parameters: | |
| ----------- | |
| filepath : str | |
| Path to the audio file | |
| target_sr : int | |
| Target sampling rate (default: 16000 Hz) | |
| Returns: | |
| -------- | |
| tuple[int, np.ndarray] | |
| Sampling rate and processed audio data with consistent properties: | |
| - dtype: float32 | |
| - shape: (N,) mono audio | |
| - amplitude range: [-0.95, 0.95] | |
| - no NaN or Inf values | |
| - C-contiguous memory layout | |
| """ | |
| # Load audio with specified sampling rate | |
| audiodata, sr = librosa.load(filepath, sr=target_sr) | |
| # Ensure numpy array | |
| audiodata = np.asarray(audiodata) | |
| # Convert to mono if stereo | |
| if len(audiodata.shape) > 1: | |
| audiodata = np.mean(audiodata, axis=1) | |
| # Handle any NaN or Inf values | |
| audiodata = np.nan_to_num(audiodata, nan=0.0, posinf=0.0, neginf=0.0) | |
| # Normalize to prevent clipping while maintaining relative amplitudes | |
| max_abs = np.max(np.abs(audiodata)) | |
| if max_abs > 0: # Avoid division by zero | |
| audiodata = audiodata * (0.95 / max_abs) | |
| # Ensure float32 dtype and memory contiguous | |
| audiodata = np.ascontiguousarray(audiodata, dtype=np.float32) | |
| return sr, audiodata | |
| def getBeats(audiodata: np.ndarray, sr: int, method='envelope') -> tuple[float, np.ndarray, np.ndarray]: | |
| """ | |
| Advanced heartbeat detection optimized for peak detection with improved sensitivity. | |
| Parameters: | |
| ----------- | |
| audiodata : np.ndarray | |
| Audio time series | |
| sr : int | |
| Sampling rate | |
| method : str | |
| Detection method: 'onset', 'envelope', 'fusion' (default) | |
| Returns: | |
| -------- | |
| tempo : float | |
| Estimated heart rate in BPM | |
| peak_times : np.ndarray | |
| Times of detected heartbeat peaks | |
| cleaned_audio : np.ndarray | |
| Cleaned audio signal | |
| """ | |
| # Denoise and normalize | |
| audiodata, sr = denoise_audio(audiodata, sr) | |
| # Normalize to prevent clipping while maintaining relative amplitudes | |
| cleaned_audio = audiodata / np.max(np.abs(audiodata)) | |
| def get_envelope_peaks(): | |
| """Detect peaks using enhanced envelope method with better sensitivity""" | |
| # Calculate envelope using appropriate frame sizes | |
| hop_length = int(sr * 0.01) # 10ms hop | |
| frame_length = int(sr * 0.04) # 40ms window | |
| # Calculate RMS energy | |
| rms = librosa.feature.rms( | |
| y=cleaned_audio, | |
| frame_length=frame_length, | |
| hop_length=hop_length | |
| )[0] | |
| # Smooth the envelope (less aggressive smoothing) | |
| rms_smooth = savgol_filter(rms, 7, 3) | |
| # Find peaks with more lenient thresholds | |
| peaks, properties = find_peaks( | |
| rms_smooth, | |
| distance=int(0.2 * (sr / hop_length)), # Minimum 0.2s between peaks (300 BPM max) | |
| height=np.mean(rms_smooth) + 0.1 * np.std(rms_smooth), # Lower height threshold | |
| prominence=np.mean(rms_smooth) * 0.1, # Lower prominence threshold | |
| width=(int(0.01 * (sr / hop_length)), int(0.2 * (sr / hop_length))) # 10-200ms width | |
| ) | |
| # Refine peak locations using original signal | |
| refined_peaks = [] | |
| window_size = int(0.05 * sr) # 50ms window for refinement | |
| for peak in peaks: | |
| # Convert envelope peak to sample domain | |
| sample_idx = peak * hop_length | |
| # Define window boundaries | |
| start = max(0, sample_idx - window_size//2) | |
| end = min(len(cleaned_audio), sample_idx + window_size//2) | |
| # Find the maximum amplitude within the window | |
| window = np.abs(cleaned_audio[int(start):int(end)]) | |
| max_idx = np.argmax(window) | |
| refined_peaks.append(start + max_idx) | |
| return np.array(refined_peaks), rms_smooth | |
| def get_onset_peaks(): | |
| """Enhanced onset detection with better sensitivity""" | |
| # Multi-band onset detection with adjusted parameters | |
| onset_env = librosa.onset.onset_strength( | |
| y=cleaned_audio, | |
| sr=sr, | |
| hop_length=256, # Smaller hop length for better temporal resolution | |
| aggregate=np.median, | |
| n_mels=128 | |
| ) | |
| # More lenient thresholding | |
| threshold = np.mean(onset_env) + 0.3 * np.std(onset_env) | |
| # Get onset positions | |
| onset_frames = librosa.onset.onset_detect( | |
| onset_envelope=onset_env, | |
| sr=sr, | |
| hop_length=256, | |
| backtrack=True, | |
| threshold=threshold, | |
| pre_max=20, # 20 frames before peak | |
| post_max=20, # 20 frames after peak | |
| pre_avg=25, # 25 frames before for mean | |
| post_avg=25, # 25 frames after for mean | |
| wait=10 # Wait 10 frames before detecting next onset | |
| ) | |
| # Refine onset positions to peaks | |
| refined_peaks = [] | |
| window_size = int(0.05 * sr) # 50ms window | |
| for frame in onset_frames: | |
| # Convert frame to sample index | |
| sample_idx = frame * 256 # Using hop_length=256 | |
| # Define window boundaries | |
| start = max(0, sample_idx - window_size//2) | |
| end = min(len(cleaned_audio), sample_idx + window_size//2) | |
| # Find the maximum amplitude within the window | |
| window = np.abs(cleaned_audio[int(start):int(end)]) | |
| max_idx = np.argmax(window) | |
| refined_peaks.append(start + max_idx) | |
| return np.array(refined_peaks), onset_env | |
| # Apply selected method | |
| if method == 'envelope': | |
| peaks, _ = get_envelope_peaks() | |
| elif method == 'onset': | |
| peaks, _ = get_onset_peaks() | |
| else: # fusion method | |
| # Get peaks from both methods | |
| env_peaks, _ = get_envelope_peaks() | |
| onset_peaks, _ = get_onset_peaks() | |
| # Merge nearby peaks (within 50ms) | |
| all_peaks = np.sort(np.concatenate([env_peaks, onset_peaks])) | |
| merged_peaks = [] | |
| last_peak = -np.inf | |
| for peak in all_peaks: | |
| if (peak - last_peak) / sr > 0.05: # 50ms minimum separation | |
| merged_peaks.append(peak) | |
| last_peak = peak | |
| peaks = np.array(merged_peaks) | |
| # Convert peaks to times | |
| peak_times = peaks / sr | |
| # Calculate tempo using peak times | |
| if len(peak_times) > 1: | |
| # Use weighted average of intervals | |
| intervals = np.diff(peak_times) | |
| tempos = 60 / intervals # Convert intervals to BPM | |
| # Remove physiologically impossible tempos | |
| valid_tempos = tempos[(tempos >= 30) & (tempos <= 300)] | |
| if len(valid_tempos) > 0: | |
| tempo = np.median(valid_tempos) # Use median for robustness | |
| else: | |
| tempo = 0 | |
| else: | |
| tempo = 0 | |
| return tempo, peak_times, cleaned_audio | |
| def plotBeattimes(beattimes: np.ndarray, | |
| audiodata: np.ndarray, | |
| sr: int, | |
| beattimes2: np.ndarray = None) -> go.Figure: | |
| """ | |
| Plot audio waveform with beat markers for one or two sets of beat times. | |
| Parameters: | |
| ----------- | |
| beattimes : np.ndarray | |
| Primary array of beat times in seconds (S1 beats if beattimes2 is provided) | |
| audiodata : np.ndarray | |
| Audio time series data | |
| sr : int | |
| Sampling rate | |
| beattimes2 : np.ndarray, optional | |
| Secondary array of beat times in seconds (S2 beats) | |
| Returns: | |
| -------- | |
| go.Figure | |
| Plotly figure with waveform and beat markers | |
| """ | |
| # Calculate time array for the full audio | |
| time = np.arange(len(audiodata)) / sr | |
| # Create the figure | |
| fig = go.Figure() | |
| # Add waveform | |
| fig.add_trace( | |
| go.Scatter( | |
| x=time, | |
| y=audiodata, | |
| mode='lines', | |
| name='Waveform', | |
| line=dict(color='blue', width=1) | |
| ) | |
| ) | |
| # Process and plot primary beat times | |
| if isinstance(beattimes[0], str): | |
| beat_indices = np.round(np.array([float(bt.replace(',', '.')) for bt in beattimes]) * sr).astype(int) | |
| else: | |
| beat_indices = np.round(beattimes * sr).astype(int) | |
| beat_indices = beat_indices[beat_indices < len(audiodata)] | |
| beat_amplitudes = audiodata[beat_indices] | |
| # Define beat name based on whether secondary beats are provided | |
| beat_name = "Beats S1" if beattimes2 is not None else "Beats" | |
| # Add primary beat markers | |
| fig.add_trace( | |
| go.Scatter( | |
| x=beattimes[beat_indices < len(audiodata)], | |
| y=beat_amplitudes, | |
| mode='markers', | |
| name=beat_name, | |
| marker=dict( | |
| color='red', | |
| size=8, | |
| symbol='circle', | |
| line=dict(color='darkred', width=1) | |
| ) | |
| ) | |
| ) | |
| # Add primary beat vertical lines | |
| for beat_time in beattimes[beat_indices < len(audiodata)]: | |
| fig.add_vline( | |
| x=beat_time, | |
| line=dict(color="rgba(255, 0, 0, 0.2)", width=1), | |
| layer="below" | |
| ) | |
| # Process and plot secondary beat times if provided | |
| if beattimes2 is not None: | |
| if isinstance(beattimes2[0], str): | |
| beat_indices2 = np.round(np.array([float(bt.replace(',', '.')) for bt in beattimes2]) * sr).astype(int) | |
| else: | |
| beat_indices2 = np.round(beattimes2 * sr).astype(int) | |
| beat_indices2 = beat_indices2[beat_indices2 < len(audiodata)] | |
| beat_amplitudes2 = audiodata[beat_indices2] | |
| # Add secondary beat markers | |
| fig.add_trace( | |
| go.Scatter( | |
| x=beattimes2[beat_indices2 < len(audiodata)], | |
| y=beat_amplitudes2, | |
| mode='markers', | |
| name="Beats S2", | |
| marker=dict( | |
| color='green', | |
| size=8, | |
| symbol='circle', | |
| line=dict(color='darkgreen', width=1) | |
| ) | |
| ) | |
| ) | |
| # Add secondary beat vertical lines | |
| for beat_time in beattimes2[beat_indices2 < len(audiodata)]: | |
| fig.add_vline( | |
| x=beat_time, | |
| line=dict(color="rgba(0, 255, 0, 0.2)", width=1), | |
| layer="below" | |
| ) | |
| # Update layout | |
| fig.update_layout( | |
| title="Audio Waveform with Beat Detection", | |
| xaxis_title="Time (seconds)", | |
| yaxis_title="Amplitude", | |
| showlegend=True, # Changed to True to show beat types | |
| hovermode='closest', | |
| plot_bgcolor='white', | |
| legend=dict( | |
| yanchor="top", | |
| y=0.99, | |
| xanchor="left", | |
| x=0.01 | |
| ) | |
| ) | |
| return fig | |
| def iterate_beat_segments(beat_times, sr, audio): | |
| """ | |
| Iterate over audio segments between beats marked with label 1. | |
| Parameters: | |
| - beat_times: df of beattimes and labels as DataFrame | |
| - sr: Sample rate of the audio | |
| - audio: np.ndarray of audio data | |
| Yields: | |
| - List of segment metrics with associated beat information | |
| """ | |
| # Get indices where label is 1 | |
| label_ones = beat_times[beat_times['Label (S1=1/S2=0)'] == 1].index.tolist() | |
| segment_metrics = [] | |
| # Iterate through pairs of label 1 indices | |
| for i in range(len(label_ones) - 1): | |
| start_idx = label_ones[i] | |
| end_idx = label_ones[i + 1] | |
| # Get all beats between two label 1 beats (inclusive) | |
| segment_beats = beat_times.iloc[start_idx:end_idx + 1] | |
| # Create list of tuples (label, beattime) | |
| beat_info = list(zip(segment_beats['Label (S1=1/S2=0)'], | |
| segment_beats['Beattimes'])) | |
| # Get start and end samples | |
| start_sample = librosa.time_to_samples(segment_beats.iloc[0]['Beattimes'], sr=sr) | |
| end_sample = librosa.time_to_samples(segment_beats.iloc[-1]['Beattimes'], sr=sr) | |
| # Extract audio segment | |
| segment = audio[start_sample:end_sample] | |
| # Analyze segment with beat information if not empty | |
| if len(segment) > 0: | |
| segment_metrics.append(segment_analysis(segment, sr, beat_info)) | |
| return segment_metrics | |
| def segment_analysis(segment, sr, s1s2:list): | |
| """ | |
| Analyze an audio segment and compute various metrics. | |
| Parameters: | |
| - segment: np.ndarray of audio segment data | |
| - sr: Sample rate of the audio | |
| Returns: | |
| - List of computed metrics | |
| """ | |
| # Duration | |
| duration = len(segment) / sr | |
| # RMS Energy | |
| rms_energy = np.sqrt(np.mean(segment**2)) | |
| # Calculate frequency spectrum and find dominant frequencies | |
| fft = np.abs(np.fft.rfft(segment)) | |
| freqs = np.fft.rfftfreq(len(segment), d=1/sr) | |
| # Focus on frequency range typical for heart sounds (20-200 Hz) | |
| mask = (freqs >= 20) & (freqs <= 200) | |
| dominant_freq_idx = np.argmax(fft[mask]) | |
| mean_frequency = freqs[mask][dominant_freq_idx] | |
| s1_to_s2_duration = [] | |
| s2_to_s1_duration = [] | |
| prev = s1s2[0] | |
| for i in range(1, len(s1s2)): | |
| if prev[0] == 0 and s1s2[i][0] == 1: | |
| s2_to_s1_duration.append(s1s2[i][1] - prev[1]) | |
| elif prev[0] == 1 and s1s2[i][0] == 0: | |
| s1_to_s2_duration.append(s1s2[i][1] - prev[1]) | |
| prev = s1s2[i] | |
| return { | |
| "rms_energy": rms_energy, | |
| "mean_frequency": mean_frequency, | |
| "duration": duration, | |
| "s1_to_s2_duration": s1_to_s2_duration, | |
| "s2_to_s1_duration": s2_to_s1_duration, | |
| "segment": segment | |
| } | |
| def find_s1s2(df:pd.DataFrame): | |
| times = df['Beattimes'].to_numpy() | |
| n_peaks = len(times) | |
| # Initialize the feature array | |
| feature_array = np.zeros((n_peaks, 4)) | |
| # Fill in the peak times (first column) | |
| feature_array[:, 0] = times | |
| # Calculate and fill distances to previous peaks (second column) | |
| feature_array[1:, 1] = np.diff(times) # For all except first peak | |
| feature_array[0, 1] = feature_array[1, 1] # First peak uses same as second | |
| # Calculate and fill distances to next peaks (third column) | |
| feature_array[:-1, 2] = np.diff(times) # For all except last peak | |
| feature_array[-1, 2] = feature_array[-2, 2] # Last peak uses same as second-to-last | |
| # Extract features (distances to prev and next peaks) | |
| X = feature_array[:, 1:3] | |
| # Scale features | |
| scaler = StandardScaler() | |
| X_scaled = scaler.fit_transform(X) | |
| # Apply K-means clustering | |
| kmeans = KMeans(n_clusters=2, random_state=42) | |
| labels = kmeans.fit_predict(X_scaled) | |
| # Update the labels in the feature array | |
| feature_array[:, 3] = labels | |
| return feature_array | |
| # ANALYZE | |
| def compute_segment_metrics(beattimes: pd.DataFrame, sr: int, audio: np.ndarray): | |
| beattimes[beattimes['Label (S1=1/S2=0)'] == 1] | |
| segment_metrics = iterate_beat_segments(beattimes, sr, audio) | |
| print("segment_metrics", segment_metrics) | |
| return segment_metrics | |
| def compute_hrv(s1_to_s2, s2_to_s1, sampling_rate): | |
| """ | |
| Compute Heart Rate Variability with debug statements | |
| """ | |
| # Convert to numpy arrays if not already | |
| s1_to_s2 = np.array(s1_to_s2) | |
| s2_to_s1 = np.array(s2_to_s1) | |
| # Debug: Print input values | |
| print("First few s1_to_s2 values:", s1_to_s2[:5]) | |
| print("First few s2_to_s1 values:", s2_to_s1[:5]) | |
| # Calculate RR intervals (full cardiac cycle) | |
| rr_intervals = s1_to_s2 + s2_to_s1 | |
| # Debug: Print RR intervals | |
| print("First few RR intervals (samples):", rr_intervals[:5]) | |
| # Convert to seconds | |
| rr_intervals = rr_intervals / sampling_rate | |
| print("First few RR intervals (seconds):", rr_intervals[:5]) | |
| # Calculate cumulative time for each heartbeat | |
| time = np.cumsum(rr_intervals) | |
| # Calculate instantaneous heart rate | |
| heart_rate = 60 / rr_intervals # beats per minute | |
| print("First few heart rate values:", heart_rate[:5]) | |
| # Compute RMSSD using a rolling window | |
| window_size = int(30 / np.mean(rr_intervals)) # Approximate 30-second window | |
| print("Window size:", window_size) | |
| hrv_values = [] | |
| for i in range(len(rr_intervals)): | |
| window_start = max(0, i - window_size) | |
| window_data = rr_intervals[window_start:i+1] | |
| if len(window_data) > 1: | |
| # Debug: Print window data occasionally | |
| if i % 100 == 0: | |
| print(f"\nWindow {i}:") | |
| print("Window data:", window_data) | |
| print("Successive differences:", np.diff(window_data)) | |
| successive_diffs = np.diff(window_data) | |
| rmssd = np.sqrt(np.mean(successive_diffs ** 2)) * 1000 # Convert to ms | |
| hrv_values.append(rmssd) | |
| else: | |
| hrv_values.append(np.nan) | |
| hrv_values = np.array(hrv_values) | |
| # Debug: Print HRV statistics | |
| print("\nHRV Statistics:") | |
| print("Min HRV:", np.nanmin(hrv_values)) | |
| print("Max HRV:", np.nanmax(hrv_values)) | |
| print("Mean HRV:", np.nanmean(hrv_values)) | |
| print("Number of valid HRV values:", np.sum(~np.isnan(hrv_values))) | |
| # Remove potential NaN values at the start | |
| valid_idx = ~np.isnan(hrv_values) | |
| time = time[valid_idx] | |
| hrv_values = hrv_values[valid_idx] | |
| heart_rate = heart_rate[valid_idx] | |
| return time, hrv_values, heart_rate |