| |
| """ |
| VYNL AI Mastering Module |
| Reference matching + genre presets + loudness normalization |
| """ |
|
|
| import numpy as np |
| from pathlib import Path |
| import tempfile |
|
|
| try: |
| import librosa |
| import soundfile as sf |
| from scipy.ndimage import uniform_filter1d |
| HAS_LIBROSA = True |
| except ImportError: |
| HAS_LIBROSA = False |
| uniform_filter1d = None |
|
|
| try: |
| import pyloudnorm as pyln |
| HAS_PYLOUDNORM = True |
| except ImportError: |
| HAS_PYLOUDNORM = False |
|
|
| |
| |
| |
|
|
| PRESETS = { |
| 'Balanced': { |
| 'eq_low': 0, |
| 'eq_mid': 0, |
| 'eq_high': 0, |
| 'compression_ratio': 3, |
| 'compression_threshold': -18, |
| 'target_lufs': -14, |
| }, |
| 'Warm': { |
| 'eq_low': 2, |
| 'eq_mid': -1, |
| 'eq_high': -2, |
| 'compression_ratio': 2.5, |
| 'compression_threshold': -16, |
| 'target_lufs': -14, |
| }, |
| 'Bright': { |
| 'eq_low': -1, |
| 'eq_mid': 1, |
| 'eq_high': 3, |
| 'compression_ratio': 3, |
| 'compression_threshold': -18, |
| 'target_lufs': -13, |
| }, |
| 'Punchy': { |
| 'eq_low': 3, |
| 'eq_mid': 0, |
| 'eq_high': 1, |
| 'compression_ratio': 4, |
| 'compression_threshold': -20, |
| 'target_lufs': -12, |
| }, |
| 'Reference Match': { |
| 'eq_low': 0, |
| 'eq_mid': 0, |
| 'eq_high': 0, |
| 'compression_ratio': 3, |
| 'compression_threshold': -18, |
| 'target_lufs': -14, |
| }, |
| } |
|
|
| |
| |
| |
|
|
| def analyze_audio(audio_path): |
| """Analyze audio file for mastering metrics""" |
| if not HAS_LIBROSA: |
| return None |
|
|
| try: |
| y, sr = librosa.load(audio_path, sr=44100, mono=False) |
|
|
| |
| if y.ndim == 1: |
| y_mono = y |
| else: |
| y_mono = librosa.to_mono(y) |
|
|
| |
| peak_db = 20 * np.log10(np.max(np.abs(y_mono)) + 1e-10) |
|
|
| |
| rms = np.sqrt(np.mean(y_mono**2)) |
| rms_db = 20 * np.log10(rms + 1e-10) |
|
|
| |
| frame_length = int(sr * 0.1) |
| hop_length = frame_length // 2 |
|
|
| frames_rms = [] |
| for i in range(0, len(y_mono) - frame_length, hop_length): |
| frame = y_mono[i:i+frame_length] |
| frame_rms = np.sqrt(np.mean(frame**2)) |
| if frame_rms > 0: |
| frames_rms.append(20 * np.log10(frame_rms + 1e-10)) |
|
|
| if frames_rms: |
| dynamic_range = np.percentile(frames_rms, 95) - np.percentile(frames_rms, 5) |
| else: |
| dynamic_range = 0 |
|
|
| |
| lufs = -14 |
| if HAS_PYLOUDNORM: |
| try: |
| meter = pyln.Meter(sr) |
| lufs = meter.integrated_loudness(y_mono) |
| except: |
| pass |
|
|
| |
| spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=y_mono, sr=sr)) |
|
|
| return { |
| 'peak_db': float(peak_db), |
| 'rms_db': float(rms_db), |
| 'lufs': float(lufs) if not np.isinf(lufs) else -24, |
| 'dynamic_range': float(dynamic_range), |
| 'spectral_centroid': float(spectral_centroid), |
| 'duration': float(len(y_mono) / sr), |
| 'sample_rate': sr, |
| } |
|
|
| except Exception as e: |
| return {'error': str(e)} |
|
|
|
|
| def analyze_reference(reference_path, target_path): |
| """Analyze reference track and compute matching parameters""" |
| ref_analysis = analyze_audio(reference_path) |
| target_analysis = analyze_audio(target_path) |
|
|
| if not ref_analysis or not target_analysis: |
| return PRESETS['Balanced'] |
|
|
| if 'error' in ref_analysis or 'error' in target_analysis: |
| return PRESETS['Balanced'] |
|
|
| |
| centroid_diff = ref_analysis['spectral_centroid'] - target_analysis['spectral_centroid'] |
|
|
| |
| if centroid_diff > 500: |
| eq_high = 2 |
| elif centroid_diff < -500: |
| eq_high = -2 |
| else: |
| eq_high = 0 |
|
|
| |
| target_lufs = ref_analysis['lufs'] |
| if target_lufs < -20 or target_lufs > -6: |
| target_lufs = -14 |
|
|
| return { |
| 'eq_low': 0, |
| 'eq_mid': 0, |
| 'eq_high': eq_high, |
| 'compression_ratio': 3, |
| 'compression_threshold': -18, |
| 'target_lufs': target_lufs, |
| 'reference_lufs': ref_analysis['lufs'], |
| 'reference_peak': ref_analysis['peak_db'], |
| } |
|
|
|
|
| |
| |
| |
|
|
| def apply_eq(y, sr, low_db=0, mid_db=0, high_db=0): |
| """Apply 3-band EQ""" |
| if not HAS_LIBROSA: |
| return y |
|
|
| |
| low_freq = 200 |
| high_freq = 4000 |
|
|
| |
| D = librosa.stft(y) |
| freqs = librosa.fft_frequencies(sr=sr) |
|
|
| |
| low_mask = freqs < low_freq |
| mid_mask = (freqs >= low_freq) & (freqs < high_freq) |
| high_mask = freqs >= high_freq |
|
|
| |
| gains = np.ones(len(freqs)) |
| gains[low_mask] *= 10 ** (low_db / 20) |
| gains[mid_mask] *= 10 ** (mid_db / 20) |
| gains[high_mask] *= 10 ** (high_db / 20) |
|
|
| |
| D_eq = D * gains[:, np.newaxis] |
|
|
| |
| y_eq = librosa.istft(D_eq, length=len(y)) |
|
|
| return y_eq |
|
|
|
|
| def apply_compression(y, sr, ratio=3, threshold_db=-18, attack_ms=10, release_ms=100): |
| """Apply dynamic range compression""" |
| if ratio <= 1: |
| return y |
|
|
| |
| threshold = 10 ** (threshold_db / 20) |
|
|
| |
| attack_samples = int(sr * attack_ms / 1000) |
| release_samples = int(sr * release_ms / 1000) |
|
|
| envelope = np.abs(y) |
|
|
| |
| envelope = uniform_filter1d(envelope, size=attack_samples) |
|
|
| |
| gain = np.ones_like(envelope) |
| above_thresh = envelope > threshold |
|
|
| if np.any(above_thresh): |
| |
| gain[above_thresh] = (threshold / envelope[above_thresh]) ** (1 - 1/ratio) |
|
|
| |
| y_compressed = y * gain |
|
|
| |
| makeup = 1 / np.mean(gain[gain < 1]) if np.any(gain < 1) else 1 |
| y_compressed *= min(makeup, 2) |
|
|
| return y_compressed |
|
|
|
|
| def apply_limiter(y, ceiling_db=-0.3): |
| """Apply brick-wall limiter""" |
| ceiling = 10 ** (ceiling_db / 20) |
|
|
| |
| y_limited = np.tanh(y / ceiling) * ceiling |
|
|
| return y_limited |
|
|
|
|
| def normalize_loudness(y, sr, target_lufs=-14): |
| """Normalize to target LUFS""" |
| if not HAS_PYLOUDNORM: |
| |
| peak = np.max(np.abs(y)) |
| if peak > 0: |
| target_peak = 10 ** (-1 / 20) |
| y = y * (target_peak / peak) |
| return y |
|
|
| try: |
| meter = pyln.Meter(sr) |
| current_lufs = meter.integrated_loudness(y) |
|
|
| if np.isinf(current_lufs) or np.isnan(current_lufs): |
| return y |
|
|
| |
| gain_db = target_lufs - current_lufs |
| gain = 10 ** (gain_db / 20) |
|
|
| |
| y_normalized = y * gain |
| y_normalized = apply_limiter(y_normalized) |
|
|
| return y_normalized |
|
|
| except: |
| return y |
|
|
|
|
| |
| |
| |
|
|
| def master_audio(input_path, output_path=None, preset='Balanced', |
| reference_path=None, target_lufs=None, |
| eq_low=None, eq_mid=None, eq_high=None): |
| """ |
| Master audio file |
| |
| Args: |
| input_path: Path to input audio |
| output_path: Path for output (optional, creates temp file if None) |
| preset: Preset name or 'Reference Match' |
| reference_path: Path to reference track (for Reference Match) |
| target_lufs: Override target LUFS |
| eq_low/mid/high: Override EQ settings |
| |
| Returns: |
| (output_path, analysis_dict) |
| """ |
|
|
| if not HAS_LIBROSA: |
| return None, {'error': 'librosa not installed'} |
|
|
| try: |
| |
| y, sr = librosa.load(input_path, sr=44100, mono=True) |
|
|
| |
| if preset == 'Reference Match' and reference_path: |
| settings = analyze_reference(reference_path, input_path) |
| else: |
| settings = PRESETS.get(preset, PRESETS['Balanced']).copy() |
|
|
| |
| if eq_low is not None: |
| settings['eq_low'] = eq_low |
| if eq_mid is not None: |
| settings['eq_mid'] = eq_mid |
| if eq_high is not None: |
| settings['eq_high'] = eq_high |
| if target_lufs is not None: |
| settings['target_lufs'] = target_lufs |
|
|
| |
| input_analysis = analyze_audio(input_path) |
|
|
| |
| y_processed = y.copy() |
|
|
| |
| y_processed = apply_eq( |
| y_processed, sr, |
| low_db=settings['eq_low'], |
| mid_db=settings['eq_mid'], |
| high_db=settings['eq_high'] |
| ) |
|
|
| |
| y_processed = apply_compression( |
| y_processed, sr, |
| ratio=settings['compression_ratio'], |
| threshold_db=settings['compression_threshold'] |
| ) |
|
|
| |
| y_processed = normalize_loudness( |
| y_processed, sr, |
| target_lufs=settings['target_lufs'] |
| ) |
|
|
| |
| y_processed = apply_limiter(y_processed, ceiling_db=-0.3) |
|
|
| |
| if output_path is None: |
| temp_dir = tempfile.mkdtemp() |
| output_path = Path(temp_dir) / f"{Path(input_path).stem}_mastered.wav" |
|
|
| |
| sf.write(str(output_path), y_processed, sr) |
|
|
| |
| output_analysis = analyze_audio(str(output_path)) |
|
|
| |
| result = { |
| 'input': input_analysis, |
| 'output': output_analysis, |
| 'settings': settings, |
| 'preset': preset, |
| } |
|
|
| return str(output_path), result |
|
|
| except Exception as e: |
| return None, {'error': str(e)} |
|
|
|
|
| def format_analysis(analysis): |
| """Format analysis dict for display""" |
| if not analysis: |
| return "Analysis unavailable" |
|
|
| if 'error' in analysis: |
| return f"Error: {analysis['error']}" |
|
|
| lines = [] |
|
|
| if 'input' in analysis: |
| inp = analysis['input'] |
| lines.append("INPUT:") |
| lines.append(f" LUFS: {inp.get('lufs', 'N/A'):.1f}") |
| lines.append(f" Peak: {inp.get('peak_db', 'N/A'):.1f} dB") |
| lines.append(f" Dynamic Range: {inp.get('dynamic_range', 'N/A'):.1f} dB") |
|
|
| if 'output' in analysis: |
| out = analysis['output'] |
| lines.append("\nOUTPUT:") |
| lines.append(f" LUFS: {out.get('lufs', 'N/A'):.1f}") |
| lines.append(f" Peak: {out.get('peak_db', 'N/A'):.1f} dB") |
| lines.append(f" Dynamic Range: {out.get('dynamic_range', 'N/A'):.1f} dB") |
|
|
| if 'settings' in analysis: |
| settings = analysis['settings'] |
| lines.append("\nSETTINGS:") |
| lines.append(f" Target LUFS: {settings.get('target_lufs', -14)}") |
| lines.append(f" EQ: Low {settings.get('eq_low', 0):+.0f} / Mid {settings.get('eq_mid', 0):+.0f} / High {settings.get('eq_high', 0):+.0f}") |
| lines.append(f" Compression: {settings.get('compression_ratio', 3)}:1 @ {settings.get('compression_threshold', -18)} dB") |
|
|
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import sys |
|
|
| if len(sys.argv) < 2: |
| print("Usage: python mastering.py <input.wav> [output.wav] [preset]") |
| print("Presets: Balanced, Warm, Bright, Punchy, Reference Match") |
| sys.exit(1) |
|
|
| input_path = sys.argv[1] |
| output_path = sys.argv[2] if len(sys.argv) > 2 else None |
| preset = sys.argv[3] if len(sys.argv) > 3 else 'Balanced' |
|
|
| print(f"Mastering: {input_path}") |
| print(f"Preset: {preset}") |
|
|
| out_path, analysis = master_audio(input_path, output_path, preset) |
|
|
| if out_path: |
| print(f"\nOutput: {out_path}") |
| print(format_analysis(analysis)) |
| else: |
| print(f"Error: {analysis.get('error', 'Unknown error')}") |
|
|