#!/usr/bin/env python3 """ Audio Processing Module Handles audio extraction, processing, and integration with FFmpeg operations. Upgrades: - Prefer lossless audio stream-copy for muxing (no generational loss). - Safe fallback to AAC re-encode when needed. - Optional EBU R128 loudness normalization (two-pass loudnorm). - Optional audio/video offset with sample-accurate filters. - Robust ffprobe-based audio detection and metadata. - MoviePy fallback when ffmpeg is unavailable. """ from __future__ import annotations import os import re import json import time import math import shutil import logging import tempfile import subprocess from pathlib import Path from typing import Optional, Dict, Any, List from core.exceptions import AudioProcessingError logger = logging.getLogger(__name__) class AudioProcessor: """ Comprehensive audio processing for video background replacement. """ def __init__(self, temp_dir: Optional[str] = None): self.temp_dir = temp_dir or tempfile.gettempdir() self.ffmpeg_path = shutil.which("ffmpeg") self.ffprobe_path = shutil.which("ffprobe") self.ffmpeg_available = self.ffmpeg_path is not None self.ffprobe_available = self.ffprobe_path is not None self.stats = { "audio_extractions": 0, "audio_merges": 0, "total_processing_time": 0.0, "failed_operations": 0, } if not self.ffmpeg_available: logger.warning("FFmpeg not available - audio processing will be limited") logger.info( "AudioProcessor initialized (FFmpeg: %s, FFprobe: %s)", self.ffmpeg_available, self.ffprobe_available, ) # ------------------------------- # Utilities # ------------------------------- def _run(self, cmd: List[str], tag: str = "") -> subprocess.CompletedProcess: logger.info("ffmpeg%s: %s", f"[{tag}]" if tag else "", " ".join(cmd)) return subprocess.run(cmd, text=True, capture_output=True) def _has_audio(self, path: str) -> bool: if not os.path.isfile(path): return False if self.ffprobe_available: try: proc = subprocess.run( [ self.ffprobe_path, "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=index", "-of", "csv=p=0", path, ], text=True, capture_output=True, check=False, ) return bool(proc.stdout.strip()) except Exception: pass # fallback heuristic via ffmpeg demuxer messages if self.ffmpeg_available: try: proc = subprocess.run( [self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", path, "-f", "null", "-"], text=True, capture_output=True, ) return "Audio:" in (proc.stderr or "") except Exception: return False return False # ------------------------------- # Metadata # ------------------------------- def get_audio_info(self, video_path: str) -> Dict[str, Any]: """ Get comprehensive audio information from a media file. """ if not self.ffprobe_available: return {"has_audio": False, "error": "FFprobe not available"} try: proc = subprocess.run( [ self.ffprobe_path, "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_name,sample_rate,channels,bit_rate,duration", "-of", "json", video_path, ], text=True, capture_output=True, check=False, ) if proc.returncode != 0: return {"has_audio": False, "error": proc.stderr.strip()} data = json.loads(proc.stdout or "{}") streams = data.get("streams", []) if not streams: return {"has_audio": False, "error": "No audio stream found"} s = streams[0] info = { "has_audio": True, "codec": s.get("codec_name", "unknown"), "sample_rate": int(s["sample_rate"]) if s.get("sample_rate") else "unknown", "channels": int(s["channels"]) if s.get("channels") else "unknown", "duration": float(s["duration"]) if s.get("duration") else "unknown", "bit_rate": int(s["bit_rate"]) if s.get("bit_rate") else "unknown", } return info except Exception as e: logger.error("Error getting audio info: %s", e) return {"has_audio": False, "error": str(e)} # ------------------------------- # Extraction # ------------------------------- def extract_audio( self, video_path: str, output_path: Optional[str] = None, audio_format: str = "aac", quality: str = "high", ) -> Optional[str]: """ Extract audio from a media file to a separate file. """ if not self.ffmpeg_available: raise AudioProcessingError("extract", "FFmpeg not available", video_path) start = time.time() info = self.get_audio_info(video_path) if not info.get("has_audio", False): logger.info("No audio found in %s", video_path) return None if output_path is None: output_path = os.path.join(self.temp_dir, f"extracted_audio_{int(time.time())}.{audio_format}") quality_map = { "low": {"aac": ["-b:a", "96k"], "mp3": ["-b:a", "128k"], "wav": []}, "medium": {"aac": ["-b:a", "192k"], "mp3": ["-b:a", "192k"], "wav": []}, "high": {"aac": ["-b:a", "320k"], "mp3": ["-b:a", "320k"], "wav": []}, } codec_map = {"aac": ["-c:a", "aac"], "mp3": ["-c:a", "libmp3lame"], "wav": ["-c:a", "pcm_s16le"]} cmd = [self.ffmpeg_path, "-y", "-i", video_path] cmd += codec_map.get(audio_format, ["-c:a", "aac"]) cmd += quality_map.get(quality, {}).get(audio_format, []) cmd += ["-vn", output_path] proc = self._run(cmd, "extract") if proc.returncode != 0: self.stats["failed_operations"] += 1 raise AudioProcessingError("extract", f"FFmpeg failed: {proc.stderr}", video_path, output_path) if not os.path.exists(output_path): self.stats["failed_operations"] += 1 raise AudioProcessingError("extract", "Output audio file was not created", video_path, output_path) self.stats["audio_extractions"] += 1 self.stats["total_processing_time"] += (time.time() - start) logger.info("Audio extracted: %s", output_path) return output_path # ------------------------------- # Loudness normalization (EBU R128, two-pass) # ------------------------------- def _measure_loudness(self, src_with_audio: str, stream_selector: str = "1:a:0") -> Optional[Dict[str, float]]: """ First pass loudnorm to measure levels. Returns dict with input_i, input_tp, input_lra, input_thresh, target_offset. We run ffmpeg with -filter_complex on the selected audio input and parse the printed JSON (stderr). """ # Build a dummy graph that takes the audio stream and measures it # We’ll map it but discard the output (null muxer) cmd = [ self.ffmpeg_path, "-hide_banner", "-nostats", "-loglevel", "warning", "-i", src_with_audio, "-vn", "-af", "loudnorm=I=-16:TP=-1.5:LRA=11:print_format=json", "-f", "null", "-" ] proc = self._run(cmd, "loudnorm-pass1") txt = (proc.stderr or "") + (proc.stdout or "") # Extract JSON block m = re.search(r"\{\s*\"input_i\"[^\}]+\}", txt, re.MULTILINE | re.DOTALL) if not m: logger.warning("Could not parse loudnorm analysis output.") return None try: data = json.loads(m.group(0)) # Legacy ffmpeg uses keys like "input_i", "input_tp", "input_lra", "input_thresh", "target_offset" return { "input_i": float(data.get("input_i")), "input_tp": float(data.get("input_tp")), "input_lra": float(data.get("input_lra")), "input_thresh": float(data.get("input_thresh")), "target_offset": float(data.get("target_offset")), } except Exception as e: logger.warning("Loudnorm analysis JSON parse error: %s", e) return None def _build_loudnorm_filter(self, measured: Dict[str, float], target_I=-16.0, target_TP=-1.5, target_LRA=11.0) -> str: """ Build the second-pass loudnorm filter string using measured values. """ # Some ffmpeg builds call these "measured_*" or "input_*"; we used "input_*" names above. return ( "loudnorm=" f"I={target_I}:TP={target_TP}:LRA={target_LRA}:" f"measured_I={measured['input_i']}:" f"measured_TP={measured['input_tp']}:" f"measured_LRA={measured['input_lra']}:" f"measured_thresh={measured['input_thresh']}:" f"offset={measured['target_offset']}:" "linear=true:print_format=summary" ) # ------------------------------- # Muxing (video + audio) # ------------------------------- def add_audio_to_video( self, original_video: str, processed_video: str, output_path: Optional[str] = None, audio_quality: str = "high", normalize: bool = False, normalize_I: float = -16.0, normalize_TP: float = -1.5, normalize_LRA: float = 11.0, offset_ms: float = 0.0, ) -> str: """ Add/mux the audio from original_video into processed_video. Strategy: 1) If no audio in original → return processed (or copy to desired name). 2) If ffmpeg present: a) If normalize/offset requested → re-encode AAC with filters (two-pass loudnorm). b) Else try stream-copy (lossless): -c:a copy. If that fails, AAC re-encode. 3) If ffmpeg missing → fallback to MoviePy (re-encode). Returns path to the muxed video (MP4). """ if not os.path.isfile(processed_video): raise FileNotFoundError(f"Processed video not found: {processed_video}") if output_path is None: base = os.path.splitext(os.path.basename(processed_video))[0] output_path = os.path.join(os.path.dirname(processed_video), f"{base}_with_audio.mp4") # If no audio available, just return the processed video (copied to expected name) if not self._has_audio(original_video): logger.info("Original has no audio; returning processed video unchanged.") if processed_video != output_path: shutil.copy2(processed_video, output_path) return output_path if not self.ffmpeg_available: logger.warning("FFmpeg not available – using MoviePy fallback.") return self._moviepy_mux(original_video, processed_video, output_path) start = time.time() # If normalization or offset requested → we must re-encode audio with filters. if normalize or abs(offset_ms) > 1e-3: # Two-pass loudnorm if normalize=True filter_chain = [] if abs(offset_ms) > 1e-3: if offset_ms > 0: # Positive delay: adelay per channel. Use stereo-safe form. ms = int(round(offset_ms)) filter_chain.append(f"adelay={ms}|{ms}") else: # Negative offset: trim audio start and reset PTS secs = abs(offset_ms) / 1000.0 filter_chain.append(f"atrim=start={secs},asetpts=PTS-STARTPTS") if normalize: measured = self._measure_loudness(original_video) if measured: filter_chain.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) else: # Fallback to single-pass loudnorm filter_chain.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") afilter = ",".join(filter_chain) if filter_chain else None # Build re-encode command: copy video, re-encode audio AAC (web-safe), filters applied cmd = [ self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", processed_video, # 0 = video "-i", original_video, # 1 = audio "-map", "0:v:0", "-map", "1:a:0", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", "-shortest", "-movflags", "+faststart", "-y", output_path, ] if afilter: # Apply audio filter chain to the mapped audio input cmd = [ self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", processed_video, "-i", original_video, "-map", "0:v:0", "-filter_complex", f"[1:a]{afilter}[aout]", "-map", "[aout]", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", "-shortest", "-movflags", "+faststart", "-y", output_path, ] proc = self._run(cmd, "mux-reencode-filters") if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: self.stats["audio_merges"] += 1 self.stats["total_processing_time"] += (time.time() - start) logger.info("Audio merged with filters (normalize=%s, offset_ms=%.2f): %s", normalize, offset_ms, output_path) return output_path logger.warning("Filtered mux failed; stderr: %s", proc.stderr) # Else: try pure stream-copy (lossless) cmd_copy = [ self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", processed_video, # 0 = video "-i", original_video, # 1 = audio "-map", "0:v:0", "-map", "1:a:0", "-c:v", "copy", "-c:a", "copy", "-shortest", "-movflags", "+faststart", "-y", output_path, ] proc = self._run(cmd_copy, "mux-copy") if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: self.stats["audio_merges"] += 1 self.stats["total_processing_time"] += (time.time() - start) logger.info("Audio merged (stream-copy): %s", output_path) return output_path # Last resort: AAC re-encode without filters quality_map = {"low": ["-b:a", "96k"], "medium": ["-b:a", "192k"], "high": ["-b:a", "320k"]} cmd_aac = [ self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", processed_video, "-i", original_video, "-map", "0:v:0", "-map", "1:a:0", "-c:v", "copy", "-c:a", "aac", *quality_map.get(audio_quality, quality_map["high"]), "-ac", "2", "-ar", "48000", "-shortest", "-movflags", "+faststart", "-y", output_path, ] proc = self._run(cmd_aac, "mux-aac") if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: self.stats["audio_merges"] += 1 self.stats["total_processing_time"] += (time.time() - start) logger.info("Audio merged (AAC re-encode): %s", output_path) return output_path # Fallback: MoviePy (re-encodes) logger.warning("FFmpeg mux failed; using MoviePy fallback.") return self._moviepy_mux(original_video, processed_video, output_path) # ------------------------------- # Fallback: MoviePy # ------------------------------- def _moviepy_mux(self, original_video: str, processed_video: str, output_path: str) -> str: try: from moviepy.editor import VideoFileClip, AudioFileClip except Exception as e: self.stats["failed_operations"] += 1 raise AudioProcessingError("mux", f"MoviePy unavailable and ffmpeg failed: {e}", processed_video) with VideoFileClip(processed_video) as v_clip: try: a_clip = AudioFileClip(original_video) except Exception as e: logger.warning("MoviePy could not load audio from %s (%s). Returning processed video.", original_video, e) if processed_video != output_path: shutil.copy2(processed_video, output_path) return output_path v_clip = v_clip.set_audio(a_clip) v_clip.write_videofile( output_path, codec="libx264", audio_codec="aac", audio_bitrate="192k", temp_audiofile=os.path.join(self.temp_dir, "temp-audio.m4a"), remove_temp=True, threads=2, preset="medium", ) return output_path # ------------------------------- # Sync helper (explicit) # ------------------------------- def sync_audio_video( self, video_path: str, audio_path: str, output_path: str, offset_ms: float = 0.0, normalize: bool = False, normalize_I: float = -16.0, normalize_TP: float = -1.5, normalize_LRA: float = 11.0, ) -> bool: """ Synchronize a separate audio file with a video (copy video, re-encode audio AAC). Positive offset_ms delays audio; negative trims audio start. """ if not self.ffmpeg_available: raise AudioProcessingError("sync", "FFmpeg not available") filter_chain = [] if abs(offset_ms) > 1e-3: if offset_ms > 0: ms = int(round(offset_ms)) filter_chain.append(f"adelay={ms}|{ms}") else: secs = abs(offset_ms) / 1000.0 filter_chain.append(f"atrim=start={secs},asetpts=PTS-STARTPTS") if normalize: measured = self._measure_loudness(audio_path) if measured: filter_chain.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) else: filter_chain.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") afilter = ",".join(filter_chain) if filter_chain else None if afilter: cmd = [ self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", video_path, "-i", audio_path, "-map", "0:v:0", "-filter_complex", f"[1:a]{afilter}[aout]", "-map", "[aout]", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", "-shortest", "-movflags", "+faststart", "-y", output_path, ] else: cmd = [ self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", video_path, "-i", audio_path, "-map", "0:v:0", "-map", "1:a:0", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", "-shortest", "-movflags", "+faststart", "-y", output_path, ] proc = self._run(cmd, "sync") return proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0 # ------------------------------- # Levels (simple convenience) # ------------------------------- def adjust_audio_levels( self, input_path: str, output_path: str, volume_factor: float = 1.0, normalize: bool = False, normalize_I: float = -16.0, normalize_TP: float = -1.5, normalize_LRA: float = 11.0, ) -> bool: """ Adjust levels on a single-file video (copy video, re-encode audio AAC). """ if not self.ffmpeg_available: raise AudioProcessingError("adjust_levels", "FFmpeg not available") filters = [] if volume_factor != 1.0: filters.append(f"volume={volume_factor}") if normalize: measured = self._measure_loudness(input_path) if measured: filters.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) else: filters.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") if filters: cmd = [ self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", input_path, "-c:v", "copy", "-af", ",".join(filters), "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", "-movflags", "+faststart", "-y", output_path, ] else: # nothing to do; copy shutil.copy2(input_path, output_path) return True proc = self._run(cmd, "adjust-levels") if proc.returncode != 0: raise AudioProcessingError("adjust_levels", proc.stderr, input_path) return os.path.exists(output_path) and os.path.getsize(output_path) > 0 # ------------------------------- # Housekeeping / stats # ------------------------------- def get_stats(self) -> Dict[str, Any]: tot_ops = self.stats["audio_extractions"] + self.stats["audio_merges"] + self.stats["failed_operations"] successes = self.stats["audio_extractions"] + self.stats["audio_merges"] success_rate = (successes / max(1, tot_ops)) * 100.0 return { "ffmpeg_available": self.ffmpeg_available, "ffprobe_available": self.ffprobe_available, "audio_extractions": self.stats["audio_extractions"], "audio_merges": self.stats["audio_merges"], "total_processing_time": self.stats["total_processing_time"], "failed_operations": self.stats["failed_operations"], "success_rate": success_rate, } def cleanup_temp_files(self, max_age_hours: int = 24): """ Clean up temporary audio/video files older than specified age in temp_dir. """ try: temp_path = Path(self.temp_dir) cutoff = time.time() - (max_age_hours * 3600) cleaned = 0 # Pathlib doesn't support brace expansion; iterate explicitly for ext in (".aac", ".mp3", ".wav", ".mp4", ".m4a"): for p in temp_path.glob(f"*audio*{ext}"): try: if p.stat().st_mtime < cutoff: p.unlink() cleaned += 1 except Exception as e: logger.warning("Could not delete temp file %s: %s", p, e) if cleaned: logger.info("Cleaned up %d temporary audio files", cleaned) except Exception as e: logger.warning("Temp file cleanup error: %s", e)