Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Crossfade engine for Synesthesia runtime. | |
| Pure-function audio utilities: equal-power and linear crossfade, | |
| loop-tail failsafe extension, and lightweight audio fingerprint extraction. | |
| All functions operate on numpy arrays — no external audio library dependencies. | |
| """ | |
| from __future__ import annotations | |
| import enum | |
| import time | |
| from typing import Any | |
| import numpy as np | |
| # --------------------------------------------------------------------------- | |
| # Crossfade mode enum | |
| # --------------------------------------------------------------------------- | |
| class CrossfadeMode(enum.Enum): | |
| LINEAR = "linear" | |
| EQUAL_POWER = "equal_power" | |
| # --------------------------------------------------------------------------- | |
| # Crossfade functions | |
| # --------------------------------------------------------------------------- | |
| def equal_power_crossfade( | |
| clip_a: np.ndarray, | |
| clip_b: np.ndarray, | |
| crossfade_samples: int, | |
| ) -> np.ndarray: | |
| """Equal-power crossfade between two audio clips. | |
| Uses ``sqrt()`` gain curves so that the total acoustic power stays | |
| perceptually constant through the transition:: | |
| gain_a(t) = sqrt(1 - t/N) | |
| gain_b(t) = sqrt(t/N) | |
| Parameters | |
| ---------- | |
| clip_a : np.ndarray | |
| First audio clip. At least *crossfade_samples* frames long. | |
| clip_b : np.ndarray | |
| Second audio clip. At least *crossfade_samples* frames long. | |
| crossfade_samples : int | |
| Number of samples in the crossfade region. | |
| Returns | |
| ------- | |
| np.ndarray | |
| A contiguous output array: | |
| ``clip_a[:-crossfade] | blended_region | clip_b[crossfade:]`` | |
| """ | |
| crossfade_samples = min(crossfade_samples, len(clip_a), len(clip_b)) | |
| if crossfade_samples <= 0: | |
| return np.concatenate([clip_a, clip_b]) | |
| t = np.linspace(0.0, 1.0, crossfade_samples, dtype=np.float32) | |
| # Reshape for broadcasting if multi-channel | |
| if clip_a.ndim == 2: | |
| t = t[:, np.newaxis] | |
| fade_out = np.sqrt(1.0 - t) | |
| fade_in = np.sqrt(t) | |
| blended = clip_a[-crossfade_samples:] * fade_out + clip_b[:crossfade_samples] * fade_in | |
| return np.concatenate([ | |
| clip_a[:-crossfade_samples], | |
| blended, | |
| clip_b[crossfade_samples:], | |
| ]) | |
| def linear_crossfade( | |
| clip_a: np.ndarray, | |
| clip_b: np.ndarray, | |
| crossfade_samples: int, | |
| ) -> np.ndarray: | |
| """Linear crossfade between two audio clips. | |
| Parameters are identical to :func:`equal_power_crossfade`. | |
| """ | |
| crossfade_samples = min(crossfade_samples, len(clip_a), len(clip_b)) | |
| if crossfade_samples <= 0: | |
| return np.concatenate([clip_a, clip_b]) | |
| t = np.linspace(0.0, 1.0, crossfade_samples, dtype=np.float32) | |
| if clip_a.ndim == 2: | |
| t = t[:, np.newaxis] | |
| blended = clip_a[-crossfade_samples:] * (1.0 - t) + clip_b[:crossfade_samples] * t | |
| return np.concatenate([ | |
| clip_a[:-crossfade_samples], | |
| blended, | |
| clip_b[crossfade_samples:], | |
| ]) | |
| def crossfade( | |
| clip_a: np.ndarray, | |
| clip_b: np.ndarray, | |
| crossfade_samples: int, | |
| mode: CrossfadeMode = CrossfadeMode.EQUAL_POWER, | |
| ) -> np.ndarray: | |
| """Dispatch to the appropriate crossfade function.""" | |
| if mode is CrossfadeMode.EQUAL_POWER: | |
| return equal_power_crossfade(clip_a, clip_b, crossfade_samples) | |
| return linear_crossfade(clip_a, clip_b, crossfade_samples) | |
| # --------------------------------------------------------------------------- | |
| # Failsafe: loop-tail extension | |
| # --------------------------------------------------------------------------- | |
| def loop_tail_extend( | |
| clip: np.ndarray, | |
| extension_samples: int, | |
| fade_samples: int = 2048, | |
| ) -> np.ndarray: | |
| """Extend a clip by looping its tail audio. | |
| **FAILSAFE**: called when the next clip is not ready at the crossfade | |
| deadline. The tail of *clip* is repeated to fill *extension_samples* | |
| with a short crossfade at each loop seam to avoid clicks. | |
| Playback must **never** output silence — this fills the gap. | |
| Parameters | |
| ---------- | |
| clip : np.ndarray | |
| Source audio clip (full clip, not just the tail). | |
| extension_samples : int | |
| Number of extra samples to generate. | |
| fade_samples : int | |
| Short fade applied at each loop-seam to prevent clicks. | |
| Returns | |
| ------- | |
| np.ndarray | |
| The original *clip* with the looped tail appended. | |
| """ | |
| if extension_samples <= 0: | |
| return clip | |
| # Use the last 5 seconds (or entire clip if shorter) as the loop source | |
| loop_source_len = min(len(clip), 48000 * 5) | |
| loop_source = clip[-loop_source_len:].copy() | |
| # Apply fade-in/fade-out to the loop source for seamless looping | |
| fade = min(fade_samples, len(loop_source) // 4) | |
| if fade > 0: | |
| if loop_source.ndim == 1: | |
| loop_source[:fade] *= np.linspace(0.0, 1.0, fade, dtype=np.float32) | |
| loop_source[-fade:] *= np.linspace(1.0, 0.0, fade, dtype=np.float32) | |
| else: | |
| ramp_up = np.linspace(0.0, 1.0, fade, dtype=np.float32)[:, np.newaxis] | |
| ramp_down = np.linspace(1.0, 0.0, fade, dtype=np.float32)[:, np.newaxis] | |
| loop_source[:fade] *= ramp_up | |
| loop_source[-fade:] *= ramp_down | |
| # Tile the source until we have enough extension material | |
| repeats = (extension_samples // len(loop_source)) + 2 | |
| tiled = np.tile(loop_source, (repeats,) if loop_source.ndim == 1 else (repeats, 1)) | |
| extension = tiled[:extension_samples] | |
| # Crossfade the junction between original clip tail and extension | |
| junction_fade = min(fade, len(clip), extension_samples) | |
| if junction_fade > 0: | |
| t = np.linspace(0.0, 1.0, junction_fade, dtype=np.float32) | |
| if clip.ndim == 2: | |
| t = t[:, np.newaxis] | |
| clip_end = clip[-junction_fade:].copy() | |
| ext_start = extension[:junction_fade].copy() | |
| blended = clip_end * (1.0 - t) + ext_start * t | |
| result = np.concatenate([clip[:-junction_fade], blended, extension[junction_fade:]]) | |
| else: | |
| result = np.concatenate([clip, extension]) | |
| return result | |
| # --------------------------------------------------------------------------- | |
| # Audio fingerprint extraction (≤ 20 ms target) | |
| # --------------------------------------------------------------------------- | |
| def _mel_filterbank(sample_rate: int, n_fft: int, n_mels: int = 40) -> np.ndarray: | |
| """Build a mel-scale filterbank matrix (numpy-only, no librosa).""" | |
| def _hz_to_mel(f: float) -> float: | |
| return 2595.0 * np.log10(1.0 + f / 700.0) | |
| def _mel_to_hz(m: float) -> float: | |
| return 700.0 * (10.0 ** (m / 2595.0) - 1.0) | |
| mel_low = _hz_to_mel(0.0) | |
| mel_high = _hz_to_mel(sample_rate / 2.0) | |
| mel_points = np.linspace(mel_low, mel_high, n_mels + 2) | |
| hz_points = np.array([_mel_to_hz(m) for m in mel_points]) | |
| bin_points = np.floor((n_fft + 1) * hz_points / sample_rate).astype(int) | |
| filters = np.zeros((n_mels, n_fft // 2 + 1), dtype=np.float32) | |
| for i in range(n_mels): | |
| lo, mid, hi = bin_points[i], bin_points[i + 1], bin_points[i + 2] | |
| if mid > lo: | |
| filters[i, lo:mid] = (np.arange(lo, mid) - lo) / (mid - lo) | |
| if hi > mid: | |
| filters[i, mid:hi] = (hi - np.arange(mid, hi)) / (hi - mid) | |
| return filters | |
| # Pre-computed filterbanks (lazily populated) | |
| _FILTERBANK_CACHE: dict[tuple[int, int, int], np.ndarray] = {} | |
| def _get_filterbank(sample_rate: int, n_fft: int = 2048, n_mels: int = 40) -> np.ndarray: | |
| key = (sample_rate, n_fft, n_mels) | |
| if key not in _FILTERBANK_CACHE: | |
| _FILTERBANK_CACHE[key] = _mel_filterbank(sample_rate, n_fft, n_mels) | |
| return _FILTERBANK_CACHE[key] | |
| def extract_audio_fingerprint( | |
| audio: np.ndarray, | |
| sample_rate: int, | |
| tail_seconds: float = 2.0, | |
| n_mels: int = 40, | |
| ) -> dict[str, Any]: | |
| """Extract lightweight spectral features from the tail of an audio clip. | |
| Target runtime: **≤ 20 ms**. Uses only numpy — no librosa. | |
| Parameters | |
| ---------- | |
| audio : np.ndarray | |
| Full audio clip (mono or first-channel of stereo). | |
| sample_rate : int | |
| Sample rate in Hz. | |
| tail_seconds : float | |
| How many seconds from the end of the clip to analyse. | |
| n_mels : int | |
| Number of mel bands. | |
| Returns | |
| ------- | |
| dict | |
| ``mel_spectrogram_mean``, ``spectral_centroid``, ``tempo_envelope`` | |
| """ | |
| _start = time.monotonic() | |
| # Take the tail and ensure mono | |
| tail_len = int(sample_rate * tail_seconds) | |
| tail = audio[-tail_len:] | |
| if tail.ndim == 2: | |
| tail = tail[:, 0] | |
| tail = tail.astype(np.float32) | |
| n_fft = 2048 | |
| hop = n_fft // 2 | |
| # ---- Mel spectrogram mean ---- | |
| fb = _get_filterbank(sample_rate, n_fft, n_mels) | |
| # Compute magnitude spectrogram (single averaged frame for speed) | |
| num_frames = max(1, (len(tail) - n_fft) // hop + 1) | |
| spec_sum = np.zeros(n_fft // 2 + 1, dtype=np.float64) | |
| for i in range(num_frames): | |
| frame = tail[i * hop: i * hop + n_fft] | |
| if len(frame) < n_fft: | |
| frame = np.pad(frame, (0, n_fft - len(frame))) | |
| spec_sum += np.abs(np.fft.rfft(frame)) | |
| spec_mean = (spec_sum / num_frames).astype(np.float32) | |
| mel_mean = (fb @ spec_mean).tolist() | |
| # ---- Spectral centroid ---- | |
| freqs = np.fft.rfftfreq(n_fft, 1.0 / sample_rate) | |
| total_mag = np.sum(spec_mean) + 1e-8 | |
| centroid = float(np.sum(freqs * spec_mean) / total_mag) | |
| # ---- Tempo envelope (RMS slope) ---- | |
| window_size = int(0.05 * sample_rate) # 50 ms windows | |
| if window_size > 0 and len(tail) > window_size: | |
| num_windows = len(tail) // window_size | |
| rms_values = np.array([ | |
| np.sqrt(np.mean(tail[i * window_size: (i + 1) * window_size] ** 2)) | |
| for i in range(num_windows) | |
| ]) | |
| if len(rms_values) >= 2: | |
| # Linear regression slope | |
| x = np.arange(len(rms_values), dtype=np.float32) | |
| slope = float(np.polyfit(x, rms_values, 1)[0]) | |
| else: | |
| slope = 0.0 | |
| else: | |
| slope = 0.0 | |
| elapsed_ms = (time.monotonic() - _start) * 1000.0 | |
| return { | |
| "mel_spectrogram_mean": mel_mean, | |
| "spectral_centroid": centroid, | |
| "tempo_envelope": slope, | |
| "extraction_time_ms": round(elapsed_ms, 2), | |
| } | |
| class CrossfadeEngine: | |
| def __init__(self): | |
| self.active_index = 0 | |
| class CrossfadeState: | |
| IDLE = "idle" | |
| FADING = "fading" | |