| """ |
| Audio Processor - Handles audio blending and processing |
| """ |
|
|
| import numpy as np |
| import torch |
| import torchaudio |
| from scipy import signal |
| import logging |
| from typing import Optional, Tuple |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AudioProcessor: |
| """Handles audio processing, blending, and effects.""" |
| |
| def __init__(self, config: dict): |
| """ |
| Initialize audio processor. |
| |
| Args: |
| config: Configuration dictionary |
| """ |
| self.config = config |
| self.sample_rate = config.get("sample_rate", 44100) |
| |
| def blend_clip( |
| self, |
| new_clip_path: str, |
| previous_clip: Optional[np.ndarray], |
| lead_in: float = 2.0, |
| lead_out: float = 2.0 |
| ) -> str: |
| """ |
| Blend new clip with previous clip using crossfades. |
| |
| Args: |
| new_clip_path: Path to new audio clip |
| previous_clip: Previous clip as numpy array |
| lead_in: Lead-in duration in seconds for blending |
| lead_out: Lead-out duration in seconds for blending |
| |
| Returns: |
| Path to blended clip |
| """ |
| try: |
| |
| new_audio, sr = torchaudio.load(new_clip_path) |
| if sr != self.sample_rate: |
| resampler = torchaudio.transforms.Resample(sr, self.sample_rate) |
| new_audio = resampler(new_audio) |
| |
| new_np = new_audio.numpy() |
| |
| |
| if previous_clip is None: |
| return new_clip_path |
| |
| |
| lead_in_samples = int(lead_in * self.sample_rate) |
| lead_out_samples = int(lead_out * self.sample_rate) |
| |
| |
| if previous_clip.shape[0] != new_np.shape[0]: |
| |
| if previous_clip.shape[0] == 1 and new_np.shape[0] == 2: |
| previous_clip = np.repeat(previous_clip, 2, axis=0) |
| elif previous_clip.shape[0] == 2 and new_np.shape[0] == 1: |
| new_np = np.repeat(new_np, 2, axis=0) |
| |
| |
| if previous_clip.shape[1] >= lead_out_samples and new_np.shape[1] >= lead_in_samples: |
| |
| prev_tail = previous_clip[:, -lead_out_samples:] |
| new_head = new_np[:, :lead_in_samples] |
| |
| |
| |
| fade_out = np.cos(np.linspace(0, np.pi/2, lead_out_samples)) ** 2 |
| fade_in = np.sin(np.linspace(0, np.pi/2, lead_in_samples)) ** 2 |
| |
| |
| if lead_in_samples != lead_out_samples: |
| |
| blend_length = min(lead_in_samples, lead_out_samples) |
| prev_tail = prev_tail[:, -blend_length:] |
| new_head = new_head[:, :blend_length] |
| fade_out = fade_out[-blend_length:] |
| fade_in = fade_in[:blend_length] |
| |
| |
| blended_region = (prev_tail * fade_out + new_head * fade_in) |
| |
| |
| result = new_np.copy() |
| result[:, :blended_region.shape[1]] = blended_region |
| |
| else: |
| |
| result = new_np |
| |
| |
| result = self._apply_compression(result) |
| |
| |
| from pathlib import Path |
| from datetime import datetime |
| |
| output_dir = Path(self.config.get("output_dir", "outputs")) |
| output_dir.mkdir(exist_ok=True) |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_path = output_dir / f"blended_{timestamp}.wav" |
| |
| result_tensor = torch.from_numpy(result).float() |
| torchaudio.save( |
| str(output_path), |
| result_tensor, |
| self.sample_rate, |
| encoding="PCM_S", |
| bits_per_sample=16 |
| ) |
| |
| logger.info(f"✅ Blended clip saved: {output_path}") |
| return str(output_path) |
| |
| except Exception as e: |
| logger.error(f"Blending failed: {e}") |
| |
| return new_clip_path |
| |
| def crossfade( |
| self, |
| audio1: np.ndarray, |
| audio2: np.ndarray, |
| fade_duration: float = 2.0 |
| ) -> np.ndarray: |
| """ |
| Create crossfade between two audio segments. |
| |
| Args: |
| audio1: First audio segment |
| audio2: Second audio segment |
| fade_duration: Duration of crossfade in seconds |
| |
| Returns: |
| Crossfaded audio |
| """ |
| fade_samples = int(fade_duration * self.sample_rate) |
| |
| |
| if audio1.shape[0] != audio2.shape[0]: |
| target_channels = max(audio1.shape[0], audio2.shape[0]) |
| if audio1.shape[0] < target_channels: |
| audio1 = np.repeat(audio1, target_channels // audio1.shape[0], axis=0) |
| if audio2.shape[0] < target_channels: |
| audio2 = np.repeat(audio2, target_channels // audio2.shape[0], axis=0) |
| |
| |
| fade_out_region = audio1[:, -fade_samples:] |
| fade_in_region = audio2[:, :fade_samples] |
| |
| |
| fade_out_curve = np.cos(np.linspace(0, np.pi/2, fade_samples)) ** 2 |
| fade_in_curve = np.sin(np.linspace(0, np.pi/2, fade_samples)) ** 2 |
| |
| |
| faded = fade_out_region * fade_out_curve + fade_in_region * fade_in_curve |
| |
| |
| result = np.concatenate([ |
| audio1[:, :-fade_samples], |
| faded, |
| audio2[:, fade_samples:] |
| ], axis=1) |
| |
| return result |
| |
| def _apply_compression(self, audio: np.ndarray, threshold: float = 0.8) -> np.ndarray: |
| """ |
| Apply gentle compression to prevent clipping. |
| |
| Args: |
| audio: Input audio |
| threshold: Compression threshold |
| |
| Returns: |
| Compressed audio |
| """ |
| |
| peak = np.abs(audio).max() |
| |
| if peak > threshold: |
| |
| compressed = np.tanh(audio * (threshold / peak)) * threshold |
| return compressed |
| |
| return audio |
| |
| def normalize_audio(self, audio: np.ndarray, target_db: float = -3.0) -> np.ndarray: |
| """ |
| Normalize audio to target dB level. |
| |
| Args: |
| audio: Input audio |
| target_db: Target level in dB |
| |
| Returns: |
| Normalized audio |
| """ |
| |
| peak = np.abs(audio).max() |
| if peak == 0: |
| return audio |
| |
| current_db = 20 * np.log10(peak) |
| |
| |
| gain_db = target_db - current_db |
| gain_linear = 10 ** (gain_db / 20) |
| |
| |
| normalized = audio * gain_linear |
| |
| |
| normalized = np.clip(normalized, -1.0, 1.0) |
| |
| return normalized |
| |
| def remove_clicks_pops(self, audio: np.ndarray) -> np.ndarray: |
| """ |
| Remove clicks and pops from audio. |
| |
| Args: |
| audio: Input audio |
| |
| Returns: |
| Cleaned audio |
| """ |
| |
| from scipy.ndimage import median_filter |
| |
| cleaned = np.zeros_like(audio) |
| for ch in range(audio.shape[0]): |
| cleaned[ch] = median_filter(audio[ch], size=3) |
| |
| return cleaned |
| |
| def apply_fade( |
| self, |
| audio: np.ndarray, |
| fade_in: float = 0.0, |
| fade_out: float = 0.0 |
| ) -> np.ndarray: |
| """ |
| Apply fade in/out to audio. |
| |
| Args: |
| audio: Input audio |
| fade_in: Fade in duration in seconds |
| fade_out: Fade out duration in seconds |
| |
| Returns: |
| Faded audio |
| """ |
| result = audio.copy() |
| |
| |
| if fade_in > 0: |
| fade_in_samples = int(fade_in * self.sample_rate) |
| fade_in_samples = min(fade_in_samples, audio.shape[1]) |
| fade_curve = np.linspace(0, 1, fade_in_samples) ** 2 |
| result[:, :fade_in_samples] *= fade_curve |
| |
| |
| if fade_out > 0: |
| fade_out_samples = int(fade_out * self.sample_rate) |
| fade_out_samples = min(fade_out_samples, audio.shape[1]) |
| fade_curve = np.linspace(1, 0, fade_out_samples) ** 2 |
| result[:, -fade_out_samples:] *= fade_curve |
| |
| return result |
| |
| def resample_audio( |
| self, |
| audio: np.ndarray, |
| orig_sr: int, |
| target_sr: int |
| ) -> np.ndarray: |
| """ |
| Resample audio to target sample rate. |
| |
| Args: |
| audio: Input audio |
| orig_sr: Original sample rate |
| target_sr: Target sample rate |
| |
| Returns: |
| Resampled audio |
| """ |
| if orig_sr == target_sr: |
| return audio |
| |
| |
| num_samples = int(audio.shape[1] * target_sr / orig_sr) |
| resampled = signal.resample(audio, num_samples, axis=1) |
| |
| return resampled |
| |
| def match_loudness( |
| self, |
| audio1: np.ndarray, |
| audio2: np.ndarray |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Match loudness between two audio segments. |
| |
| Args: |
| audio1: First audio segment |
| audio2: Second audio segment |
| |
| Returns: |
| Tuple of loudness-matched audio segments |
| """ |
| |
| rms1 = np.sqrt(np.mean(audio1 ** 2)) |
| rms2 = np.sqrt(np.mean(audio2 ** 2)) |
| |
| if rms2 == 0: |
| return audio1, audio2 |
| |
| |
| gain = rms2 / rms1 |
| |
| |
| matched_audio1 = audio1 * gain |
| |
| |
| peak = np.abs(matched_audio1).max() |
| if peak > 1.0: |
| matched_audio1 = matched_audio1 / peak |
| |
| return matched_audio1, audio2 |
|
|