Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import os | |
| import tempfile | |
| from typing import Optional | |
| import librosa | |
| import numpy as np | |
| import soundfile as sf | |
| from scipy.signal import butter, lfilter, filtfilt | |
| def _load_audio(audio_path: str, mono: bool = False) -> tuple[np.ndarray, int]: | |
| """Load audio file with standard settings.""" | |
| y, sr = librosa.load(audio_path, sr=None, mono=mono, res_type="soxr_vhq") | |
| # Ensure shape is (samples, channels) for stereo audio | |
| if not mono and y.ndim > 1 and y.shape[0] == 2: | |
| y = y.T | |
| return y, int(sr) | |
| def detect_noise_profile(audio: np.ndarray, sample_rate: int) -> dict: | |
| """ | |
| Analyze audio to detect noise characteristics. | |
| Args: | |
| audio: Audio data as numpy array | |
| sample_rate: Sample rate of audio | |
| Returns: | |
| Dictionary with noise profile information | |
| """ | |
| # Convert to mono for analysis if stereo | |
| if audio.ndim > 1: | |
| audio = np.mean(audio, axis=1) | |
| # Ensure audio is long enough for STFT | |
| if len(audio) < 2048: | |
| return { | |
| "noise_floor": 0.001, | |
| "steady_noise": 0.001, | |
| "hiss_level": 0.001, | |
| "snr_estimate": 20.0, | |
| "has_significant_noise": False, | |
| } | |
| # Compute spectral features for noise detection | |
| stft = librosa.stft(audio, n_fft=2048, hop_length=512) | |
| magnitude = np.abs(stft) | |
| # Identify noise floor (quiet parts) | |
| noise_floor = np.percentile(magnitude, 10) | |
| # Detect steady noise (consistent low-frequency content) | |
| freqs = librosa.fft_frequencies(sr=sample_rate, n_fft=2048) | |
| low_freq_mask = freqs < 200 # Below 200 Hz | |
| steady_noise = np.mean(magnitude[low_freq_mask, :], axis=0) | |
| # Detect hiss (high frequency noise) | |
| high_freq_mask = freqs > 4000 # Above 4 kHz | |
| hiss_level = np.mean(magnitude[high_freq_mask, :], axis=0) | |
| # Compute overall noise characteristics | |
| signal_power = np.mean(magnitude**2, axis=1) | |
| noise_power = np.mean(magnitude**2, axis=1) - signal_power | |
| snr_estimate = 10 * np.log10(signal_power / (noise_power + 1e-10)) | |
| return { | |
| "noise_floor": float(noise_floor), | |
| "steady_noise": float(np.mean(steady_noise)), | |
| "hiss_level": float(np.mean(hiss_level)), | |
| "snr_estimate": float(np.mean(snr_estimate)), | |
| "has_significant_noise": bool( | |
| np.mean(steady_noise) > noise_floor * 2 | |
| or np.mean(hiss_level) > noise_floor * 1.5 | |
| ), | |
| } | |
| def spectral_subtraction( | |
| audio: np.ndarray, noise_profile: dict, sample_rate: int | |
| ) -> np.ndarray: | |
| """ | |
| Apply spectral subtraction to remove identified noise. | |
| Args: | |
| audio: Input audio data | |
| noise_profile: Noise profile from detect_noise_profile() | |
| sample_rate: Sample rate of audio | |
| Returns: | |
| Cleaned audio data | |
| """ | |
| # Handle stereo audio by processing each channel separately | |
| if audio.ndim > 1: | |
| cleaned_channels = [] | |
| for channel in range(audio.shape[1]): | |
| channel_audio = audio[:, channel] | |
| cleaned_channel = _process_channel_spectral_subtraction( | |
| channel_audio, noise_profile, sample_rate | |
| ) | |
| cleaned_channels.append(cleaned_channel) | |
| return np.column_stack(cleaned_channels) | |
| else: | |
| return _process_channel_spectral_subtraction(audio, noise_profile, sample_rate) | |
| def _process_channel_spectral_subtraction( | |
| audio: np.ndarray, noise_profile: dict, sample_rate: int | |
| ) -> np.ndarray: | |
| """Process a single channel with spectral subtraction.""" | |
| # Ensure audio is long enough for STFT | |
| if len(audio) < 2048: | |
| return audio | |
| # Compute STFT of audio | |
| stft = librosa.stft(audio, n_fft=2048, hop_length=512) | |
| magnitude = np.abs(stft) | |
| phase = np.angle(stft) | |
| # Create noise gate based on noise floor | |
| noise_gate = np.minimum(magnitude / (noise_profile["noise_floor"] + 1e-10), 1.0) | |
| # Apply gentle noise reduction | |
| reduction_factor = 0.3 if noise_profile["has_significant_noise"] else 0.15 | |
| cleaned_magnitude = magnitude * (1 - noise_gate * reduction_factor) | |
| # Reconstruct audio | |
| cleaned_stft = cleaned_magnitude * np.exp(1j * phase) | |
| cleaned_audio = librosa.istft(cleaned_stft, hop_length=512, length=len(audio)) | |
| return cleaned_audio | |
| def adaptive_filter( | |
| audio: np.ndarray, sample_rate: int, noise_type: str = "general" | |
| ) -> np.ndarray: | |
| """ | |
| Apply adaptive filtering based on noise type. | |
| Args: | |
| audio: Input audio data | |
| sample_rate: Sample rate of audio | |
| noise_type: Type of noise to address ('general', 'hiss', 'hum', 'background') | |
| Returns: | |
| Filtered audio data | |
| """ | |
| # Handle stereo audio by processing each channel separately | |
| if audio.ndim > 1: | |
| filtered_channels = [] | |
| for channel in range(audio.shape[1]): | |
| channel_audio = audio[:, channel] | |
| filtered_channel = _process_channel_adaptive_filter( | |
| channel_audio, sample_rate, noise_type | |
| ) | |
| filtered_channels.append(filtered_channel) | |
| return np.column_stack(filtered_channels) | |
| else: | |
| return _process_channel_adaptive_filter(audio, sample_rate, noise_type) | |
| def _process_channel_adaptive_filter( | |
| audio: np.ndarray, sample_rate: int, noise_type: str = "general" | |
| ) -> np.ndarray: | |
| """Process a single channel with adaptive filtering.""" | |
| if noise_type == "hiss": | |
| # High-pass filter to reduce hiss (above 4kHz) | |
| cutoff = 4000 | |
| b, a = butter(4, cutoff, fs=sample_rate, btype="high", output="ba") | |
| filtered_audio = lfilter(b, a, audio) | |
| elif noise_type == "hum": | |
| # Notch filter for common hum frequencies (50/60 Hz and harmonics) | |
| # Apply multiple notch filters | |
| filtered_audio = audio.copy() | |
| hum_freqs = [50, 60, 100, 120, 180, 240] # Common power line harmonics | |
| for freq in hum_freqs: | |
| if freq < sample_rate / 2: | |
| # Create notch filter | |
| b, a = butter( | |
| 2, | |
| [freq * 0.9, freq * 1.1], | |
| fs=sample_rate, | |
| btype="bandstop", | |
| output="ba", | |
| ) | |
| filtered_audio = lfilter(b, a, filtered_audio) | |
| elif noise_type == "background": | |
| # Spectral subtraction for background noise | |
| noise_profile = detect_noise_profile(audio, sample_rate) | |
| filtered_audio = spectral_subtraction(audio, noise_profile, sample_rate) | |
| else: | |
| # General broadband noise reduction | |
| # Apply gentle low-pass filter | |
| cutoff = int(min(8000, sample_rate // 2.5)) | |
| b, a = butter(4, cutoff, fs=sample_rate, btype="low", output="ba") | |
| filtered_audio = lfilter(b, a, audio) | |
| return filtered_audio | |
| def remove_noise( | |
| audio_path: str, | |
| noise_type: str = "general", | |
| sensitivity: float = 0.5, | |
| output_path: Optional[str] = None, | |
| output_format: str = "wav", | |
| ) -> str: | |
| """ | |
| Remove noise from audio using adaptive filtering and spectral subtraction. | |
| This function analyzes the audio to detect noise characteristics and applies | |
| appropriate noise reduction techniques based on the noise type and sensitivity | |
| settings. It supports various noise types including hiss, hum, rumble, and | |
| general background noise. | |
| Args: | |
| audio_path: Path to the audio file or URL (supports common formats: WAV, MP3, FLAC, M4A) | |
| noise_type: Type of noise to remove ('general', 'hiss', 'hum', 'rumble', 'background') | |
| - 'general': Broadband noise reduction | |
| - 'hiss': High-frequency noise removal | |
| - 'hum': Power line hum removal (50/60 Hz) | |
| - 'rumble': Low-frequency rumble removal | |
| - 'background': General background noise | |
| sensitivity: Noise reduction sensitivity (0.0 to 1.0, default: 0.5) | |
| Higher values remove more noise but may affect audio quality | |
| output_path: Optional output directory (default: None, uses temp directory) | |
| output_format: Output format for the cleaned audio ('wav' or 'mp3', default: 'wav') | |
| Returns: | |
| Path to the cleaned audio file | |
| Examples: | |
| >>> remove_noise("noisy_recording.wav", "hiss", 0.7, "output", "wav") | |
| # Returns 'path/to/noisy_recording_hiss_removed.wav' with reduced hiss | |
| >>> remove_noise("podcast.mp3", "background", 0.3, "output", "mp3") | |
| # Returns 'path/to/podcast_background_removed.mp3' with reduced background noise | |
| Note: | |
| - Higher sensitivity values remove more noise but may affect audio quality | |
| - Different noise types use specialized algorithms for optimal results | |
| - Processing time varies with audio length and noise complexity | |
| - Preserves original audio quality and sample rate | |
| - Works with mono or stereo audio files | |
| """ | |
| try: | |
| # Load audio | |
| audio, sample_rate = _load_audio(audio_path, mono=False) | |
| # Apply noise reduction based on type and sensitivity | |
| if noise_type == "hiss": | |
| # High-pass filter for hiss removal | |
| cutoff = 4000 - sensitivity * 2000 # 2000-4000 Hz range | |
| b, a = butter(4, cutoff, fs=sample_rate, btype="high", output="ba") | |
| if audio.ndim > 1: | |
| filtered_audio = np.zeros_like(audio) | |
| for channel in range(audio.shape[1]): | |
| filtered_audio[:, channel] = filtfilt(b, a, audio[:, channel]) | |
| else: | |
| filtered_audio = filtfilt(b, a, audio) | |
| elif noise_type == "hum": | |
| # Multiple notch filters for harmonics | |
| filtered_audio = audio.copy() | |
| fundamental_freqs = [50, 60, 100] # Common power line fundamentals | |
| for fundamental in fundamental_freqs: | |
| if fundamental < sample_rate // 2: | |
| # Filter fundamental and first few harmonics | |
| for harmonic in range(1, 6): | |
| freq = fundamental * harmonic | |
| if freq < sample_rate // 2: | |
| b, a = butter( | |
| 2, | |
| [freq * 0.95, freq * 1.05], | |
| fs=sample_rate, | |
| btype="bandstop", | |
| output="ba", | |
| ) | |
| if filtered_audio.ndim > 1: | |
| for channel in range(filtered_audio.shape[1]): | |
| filtered_audio[:, channel] = filtfilt( | |
| b, a, filtered_audio[:, channel] | |
| ) | |
| else: | |
| filtered_audio = filtfilt(b, a, filtered_audio) | |
| elif noise_type == "rumble": | |
| # High-pass filter for rumble removal | |
| cutoff = 20 + sensitivity * 80 # 20-100 Hz range | |
| b, a = butter(4, cutoff, fs=sample_rate, btype="high", output="ba") | |
| if audio.ndim > 1: | |
| filtered_audio = np.zeros_like(audio) | |
| for channel in range(audio.shape[1]): | |
| filtered_audio[:, channel] = filtfilt(b, a, audio[:, channel]) | |
| else: | |
| filtered_audio = filtfilt(b, a, audio) | |
| else: # background or general | |
| # General noise reduction | |
| noise_profile = detect_noise_profile(audio, sample_rate) | |
| filtered_audio = spectral_subtraction(audio, noise_profile, sample_rate) | |
| # Apply based on sensitivity | |
| strength = 0.2 + sensitivity * 0.6 | |
| filtered_audio = (1 - strength) * filtered_audio + strength * audio | |
| # Skip normalization to preserve original dynamics and pitch | |
| # Only normalize if clipping would occur | |
| max_val = np.max(np.abs(filtered_audio)) | |
| if max_val > 1.0: | |
| filtered_audio = filtered_audio / max_val * 0.95 | |
| # Save output | |
| if output_path is None: | |
| output_path = tempfile.mkdtemp(suffix="_noise_removed") | |
| else: | |
| os.makedirs(output_path, exist_ok=True) | |
| # Generate output filename with timestamp | |
| from datetime import datetime | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| input_filename = os.path.splitext(os.path.basename(audio_path))[0] | |
| output_filename = ( | |
| f"{input_filename}_{noise_type}_removed_{timestamp}.{output_format}" | |
| ) | |
| output_file = os.path.join(output_path, output_filename) | |
| # Save using librosa's output function (most reliable) | |
| # librosa expects (samples, channels) format | |
| audio_for_saving = filtered_audio | |
| try: | |
| # Use librosa to save - this should preserve pitch correctly | |
| sf.write(output_file, audio_for_saving, sample_rate) | |
| print("Successfully saved audio file using librosa/soundfile") | |
| except Exception as e: | |
| print(f"librosa/soundfile failed: {e}") | |
| # Try with FLAC format as fallback | |
| try: | |
| flac_path = output_file.replace(".wav", ".flac") | |
| sf.write(flac_path, audio_for_saving, sample_rate, format="FLAC") | |
| print(f"Successfully saved as FLAC: {flac_path}") | |
| return flac_path | |
| except Exception as e2: | |
| print(f"FLAC also failed: {e2}") | |
| raise RuntimeError("Could not save audio file with any method") | |
| return output_file | |
| except Exception as e: | |
| raise RuntimeError(f"Error removing noise: {str(e)}") | |
| def remove_noise_wrapper(audio_path: str, noise_reduction_factor: float = 0.5) -> str: | |
| """ | |
| Wrapper function for noise removal with error handling for MCP integration. | |
| Args: | |
| audio_path: Path to the input audio file | |
| noise_reduction_factor: Noise reduction strength (0.1-1.0, default: 0.5) | |
| Returns: | |
| Path to cleaned audio file or error message | |
| """ | |
| try: | |
| return remove_noise(audio_path, "general", noise_reduction_factor) | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| if __name__ == "__main__": | |
| """ | |
| Script section for running audio cleaning locally. | |
| Usage: | |
| python tools/audio_cleaning.py input.wav | |
| python tools/audio_cleaning.py input.wav --reduction 0.7 | |
| """ | |
| import argparse | |
| import sys | |
| parser = argparse.ArgumentParser( | |
| description="Remove noise from audio files", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python tools/audio_cleaning.py noisy.wav | |
| python tools/audio_cleaning.py noisy.wav --reduction 0.7 | |
| python tools/audio_cleaning.py noisy.wav --output cleaned/ | |
| """, | |
| ) | |
| parser.add_argument("audio_path", help="Path to the input audio file") | |
| parser.add_argument( | |
| "--reduction", | |
| type=float, | |
| default=0.5, | |
| help="Noise reduction factor (0.1-1.0, default: 0.5)", | |
| ) | |
| parser.add_argument("--output", help="Output directory (default: output/)") | |
| args = parser.parse_args() | |
| print("Audio Cleaning Tool") | |
| print("=" * 25) | |
| print(f"Input: {args.audio_path}") | |
| print(f"Noise reduction: {args.reduction}") | |
| if args.output: | |
| print(f"Output directory: {args.output}") | |
| print() | |
| try: | |
| result = remove_noise( | |
| audio_path=args.audio_path, | |
| noise_type="general", | |
| sensitivity=args.reduction, | |
| output_path=args.output or "output", | |
| output_format="wav", | |
| ) | |
| print("ā Audio cleaning completed!") | |
| print(f"Output saved to: {result}") | |
| except Exception as e: | |
| print(f"ā Error: {e}") | |
| sys.exit(1) | |