MogensR's picture
Rename audio_processor.py to processing/audio/audio_processor.py
8468955
raw
history blame
23.3 kB
"""
Audio Processing Module
Handles audio extraction, processing, and integration with FFmpeg operations
"""
import os
import subprocess
import tempfile
import logging
import time
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
# Import from core
from core.exceptions import AudioProcessingError
logger = logging.getLogger(__name__)
class AudioProcessor:
"""
Comprehensive audio processing for video background replacement
"""
def __init__(self, temp_dir: Optional[str] = None):
self.temp_dir = temp_dir or tempfile.gettempdir()
self.ffmpeg_available = self._check_ffmpeg_availability()
self.ffprobe_available = self._check_ffprobe_availability()
# Audio processing statistics
self.stats = {
'audio_extractions': 0,
'audio_merges': 0,
'total_processing_time': 0.0,
'failed_operations': 0
}
if not self.ffmpeg_available:
logger.warning("FFmpeg not available - audio processing will be limited")
logger.info(f"AudioProcessor initialized (FFmpeg: {self.ffmpeg_available}, FFprobe: {self.ffprobe_available})")
def _check_ffmpeg_availability(self) -> bool:
"""Check if FFmpeg is available on the system"""
try:
result = subprocess.run(
['ffmpeg', '-version'],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
return False
def _check_ffprobe_availability(self) -> bool:
"""Check if FFprobe is available on the system"""
try:
result = subprocess.run(
['ffprobe', '-version'],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
return False
def get_audio_info(self, video_path: str) -> Dict[str, Any]:
"""
Get comprehensive audio information from video file
Args:
video_path: Path to the video file
Returns:
Dictionary containing audio information
"""
if not self.ffprobe_available:
return {'has_audio': False, 'error': 'FFprobe not available'}
try:
# Get audio stream information
result = subprocess.run([
'ffprobe', '-v', 'quiet', '-select_streams', 'a:0',
'-show_entries', 'stream=codec_name,sample_rate,channels,duration,bit_rate',
'-of', 'csv=p=0', video_path
], capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return {
'has_audio': False,
'error': 'No audio stream found',
'ffprobe_error': result.stderr
}
# Parse audio information
audio_data = result.stdout.strip().split(',')
if len(audio_data) >= 1 and audio_data[0]:
info = {
'has_audio': True,
'codec': audio_data[0] if len(audio_data) > 0 else 'unknown',
'sample_rate': audio_data[1] if len(audio_data) > 1 else 'unknown',
'channels': audio_data[2] if len(audio_data) > 2 else 'unknown',
'duration': audio_data[3] if len(audio_data) > 3 else 'unknown',
'bit_rate': audio_data[4] if len(audio_data) > 4 else 'unknown'
}
# Convert string values to appropriate types
try:
if info['sample_rate'] != 'unknown':
info['sample_rate'] = int(info['sample_rate'])
if info['channels'] != 'unknown':
info['channels'] = int(info['channels'])
if info['duration'] != 'unknown':
info['duration'] = float(info['duration'])
if info['bit_rate'] != 'unknown':
info['bit_rate'] = int(info['bit_rate'])
except ValueError:
pass # Keep as string if conversion fails
return info
else:
return {'has_audio': False, 'error': 'Audio stream data empty'}
except subprocess.TimeoutExpired:
return {'has_audio': False, 'error': 'FFprobe timeout'}
except Exception as e:
logger.error(f"Error getting audio info: {e}")
return {'has_audio': False, 'error': str(e)}
def extract_audio(self, video_path: str, output_path: Optional[str] = None,
audio_format: str = 'aac', quality: str = 'high') -> Optional[str]:
"""
Extract audio from video file
Args:
video_path: Path to input video
output_path: Output path for audio (auto-generated if None)
audio_format: Output audio format (aac, mp3, wav)
quality: Audio quality (low, medium, high)
Returns:
Path to extracted audio file or None if failed
"""
if not self.ffmpeg_available:
raise AudioProcessingError("extract", "FFmpeg not available", video_path)
start_time = time.time()
try:
# Check if input has audio
audio_info = self.get_audio_info(video_path)
if not audio_info.get('has_audio', False):
logger.info(f"No audio found in {video_path}")
return None
# Generate output path if not provided
if output_path is None:
timestamp = int(time.time())
output_path = os.path.join(
self.temp_dir,
f"extracted_audio_{timestamp}.{audio_format}"
)
# Quality settings
quality_settings = {
'low': {'aac': ['-b:a', '96k'], 'mp3': ['-b:a', '128k'], 'wav': []},
'medium': {'aac': ['-b:a', '192k'], 'mp3': ['-b:a', '192k'], 'wav': []},
'high': {'aac': ['-b:a', '320k'], 'mp3': ['-b:a', '320k'], 'wav': []}
}
codec_settings = {
'aac': ['-c:a', 'aac'],
'mp3': ['-c:a', 'libmp3lame'],
'wav': ['-c:a', 'pcm_s16le']
}
# Build FFmpeg command
cmd = ['ffmpeg', '-y', '-i', video_path]
cmd.extend(codec_settings.get(audio_format, ['-c:a', 'aac']))
cmd.extend(quality_settings.get(quality, {}).get(audio_format, []))
cmd.extend(['-vn', output_path]) # -vn excludes video
# Execute command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
if result.returncode != 0:
raise AudioProcessingError(
"extract",
f"FFmpeg failed: {result.stderr}",
video_path,
output_path
)
if not os.path.exists(output_path):
raise AudioProcessingError(
"extract",
"Output audio file was not created",
video_path,
output_path
)
# Update statistics
processing_time = time.time() - start_time
self.stats['audio_extractions'] += 1
self.stats['total_processing_time'] += processing_time
logger.info(f"Audio extracted successfully in {processing_time:.1f}s: {output_path}")
return output_path
except subprocess.TimeoutExpired:
self.stats['failed_operations'] += 1
raise AudioProcessingError("extract", "FFmpeg timeout during extraction", video_path)
except Exception as e:
self.stats['failed_operations'] += 1
if isinstance(e, AudioProcessingError):
raise
else:
raise AudioProcessingError("extract", f"Unexpected error: {str(e)}", video_path)
def add_audio_to_video(self, original_video: str, processed_video: str,
output_path: Optional[str] = None,
audio_quality: str = 'high') -> str:
"""
Add audio from original video to processed video
Args:
original_video: Path to original video with audio
processed_video: Path to processed video without audio
output_path: Output path (auto-generated if None)
audio_quality: Audio quality setting
Returns:
Path to final video with audio
"""
if not self.ffmpeg_available:
logger.warning("FFmpeg not available - returning processed video without audio")
return processed_video
start_time = time.time()
try:
# Check if original video has audio
audio_info = self.get_audio_info(original_video)
if not audio_info.get('has_audio', False):
logger.info("Original video has no audio - returning processed video")
return processed_video
# Generate output path if not provided
if output_path is None:
timestamp = int(time.time())
output_path = os.path.join(
self.temp_dir,
f"final_with_audio_{timestamp}.mp4"
)
# Quality settings for audio encoding
quality_settings = {
'low': ['-b:a', '96k'],
'medium': ['-b:a', '192k'],
'high': ['-b:a', '320k']
}
# Build FFmpeg command to combine video and audio
cmd = [
'ffmpeg', '-y',
'-i', processed_video, # Video input
'-i', original_video, # Audio source
'-c:v', 'copy', # Copy video stream as-is
'-c:a', 'aac', # Encode audio as AAC
]
# Add quality settings
cmd.extend(quality_settings.get(audio_quality, quality_settings['high']))
# Map streams and set duration
cmd.extend([
'-map', '0:v:0', # Video from first input
'-map', '1:a:0', # Audio from second input
'-shortest', # Match shortest stream duration
output_path
])
# Execute command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=600 # 10 minute timeout
)
if result.returncode != 0:
logger.warning(f"Audio merge failed: {result.stderr}")
logger.warning("Returning processed video without audio")
return processed_video
if not os.path.exists(output_path):
logger.warning("Output video with audio was not created")
return processed_video
# Verify the output file
if os.path.getsize(output_path) == 0:
logger.warning("Output video file is empty")
try:
os.remove(output_path)
except:
pass
return processed_video
# Clean up original processed video if successful
try:
if output_path != processed_video:
os.remove(processed_video)
logger.debug("Cleaned up intermediate processed video")
except Exception as e:
logger.warning(f"Could not clean up intermediate file: {e}")
# Update statistics
processing_time = time.time() - start_time
self.stats['audio_merges'] += 1
self.stats['total_processing_time'] += processing_time
logger.info(f"Audio merged successfully in {processing_time:.1f}s: {output_path}")
return output_path
except subprocess.TimeoutExpired:
self.stats['failed_operations'] += 1
logger.warning("Audio merge timeout - returning processed video without audio")
return processed_video
except Exception as e:
self.stats['failed_operations'] += 1
logger.warning(f"Audio merge error: {e} - returning processed video without audio")
return processed_video
def sync_audio_video(self, video_path: str, audio_path: str,
output_path: str, offset_ms: float = 0.0) -> bool:
"""
Synchronize separate audio and video files
Args:
video_path: Path to video file
audio_path: Path to audio file
output_path: Output path for synchronized file
offset_ms: Audio offset in milliseconds (positive = delay audio)
Returns:
True if successful, False otherwise
"""
if not self.ffmpeg_available:
raise AudioProcessingError("sync", "FFmpeg not available")
try:
cmd = ['ffmpeg', '-y', '-i', video_path, '-i', audio_path]
# Add audio offset if specified
if offset_ms != 0.0:
offset_seconds = offset_ms / 1000.0
cmd.extend(['-itsoffset', str(offset_seconds)])
cmd.extend([
'-c:v', 'copy', # Copy video as-is
'-c:a', 'aac', # Encode audio as AAC
'-b:a', '192k', # Audio bitrate
'-shortest', # Match shortest stream
output_path
])
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=600
)
if result.returncode != 0:
raise AudioProcessingError(
"sync",
f"Synchronization failed: {result.stderr}",
video_path
)
return os.path.exists(output_path) and os.path.getsize(output_path) > 0
except subprocess.TimeoutExpired:
raise AudioProcessingError("sync", "Synchronization timeout", video_path)
except Exception as e:
if isinstance(e, AudioProcessingError):
raise
else:
raise AudioProcessingError("sync", f"Unexpected error: {str(e)}", video_path)
def adjust_audio_levels(self, input_path: str, output_path: str,
volume_factor: float = 1.0, normalize: bool = False) -> bool:
"""
Adjust audio levels in a video file
Args:
input_path: Input video path
output_path: Output video path
volume_factor: Volume multiplication factor (1.0 = no change)
normalize: Whether to normalize audio levels
Returns:
True if successful, False otherwise
"""
if not self.ffmpeg_available:
raise AudioProcessingError("adjust_levels", "FFmpeg not available")
try:
cmd = ['ffmpeg', '-y', '-i', input_path, '-c:v', 'copy']
# Build audio filter
audio_filters = []
if volume_factor != 1.0:
audio_filters.append(f"volume={volume_factor}")
if normalize:
audio_filters.append("loudnorm")
if audio_filters:
cmd.extend(['-af', ','.join(audio_filters)])
cmd.extend(['-c:a', 'aac', '-b:a', '192k', output_path])
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=600
)
if result.returncode != 0:
raise AudioProcessingError(
"adjust_levels",
f"Level adjustment failed: {result.stderr}",
input_path
)
return os.path.exists(output_path) and os.path.getsize(output_path) > 0
except Exception as e:
if isinstance(e, AudioProcessingError):
raise
else:
raise AudioProcessingError("adjust_levels", f"Unexpected error: {str(e)}", input_path)
def get_supported_formats(self) -> Dict[str, List[str]]:
"""Get supported audio and video formats"""
if not self.ffmpeg_available:
return {'audio': [], 'video': []}
try:
# Get supported formats from FFmpeg
result = subprocess.run(
['ffmpeg', '-formats'],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
return {'audio': ['aac', 'mp3', 'wav'], 'video': ['mp4', 'avi', 'mov']}
# Parse output (simplified - could be more comprehensive)
lines = result.stdout.split('\n')
audio_formats = []
video_formats = []
for line in lines:
if 'aac' in line.lower():
audio_formats.append('aac')
elif 'mp3' in line.lower():
audio_formats.append('mp3')
elif 'wav' in line.lower():
audio_formats.append('wav')
elif 'mp4' in line.lower():
video_formats.append('mp4')
elif 'avi' in line.lower():
video_formats.append('avi')
elif 'mov' in line.lower():
video_formats.append('mov')
return {
'audio': list(set(audio_formats)) or ['aac', 'mp3', 'wav'],
'video': list(set(video_formats)) or ['mp4', 'avi', 'mov']
}
except Exception as e:
logger.warning(f"Could not get supported formats: {e}")
return {'audio': ['aac', 'mp3', 'wav'], 'video': ['mp4', 'avi', 'mov']}
def validate_audio_video_compatibility(self, video_path: str, audio_path: str) -> Dict[str, Any]:
"""
Validate compatibility between video and audio files
Returns:
Dictionary with compatibility information
"""
if not self.ffprobe_available:
return {'compatible': False, 'error': 'FFprobe not available'}
try:
# Get video info
video_result = subprocess.run([
'ffprobe', '-v', 'quiet', '-select_streams', 'v:0',
'-show_entries', 'stream=duration', '-of', 'csv=p=0', video_path
], capture_output=True, text=True, timeout=30)
# Get audio info
audio_result = subprocess.run([
'ffprobe', '-v', 'quiet', '-select_streams', 'a:0',
'-show_entries', 'stream=duration', '-of', 'csv=p=0', audio_path
], capture_output=True, text=True, timeout=30)
if video_result.returncode != 0 or audio_result.returncode != 0:
return {'compatible': False, 'error': 'Could not read file information'}
try:
video_duration = float(video_result.stdout.strip())
audio_duration = float(audio_result.stdout.strip())
duration_diff = abs(video_duration - audio_duration)
duration_diff_percent = (duration_diff / max(video_duration, audio_duration)) * 100
return {
'compatible': duration_diff_percent < 5.0, # 5% tolerance
'video_duration': video_duration,
'audio_duration': audio_duration,
'duration_difference': duration_diff,
'duration_difference_percent': duration_diff_percent,
'recommendation': (
'Compatible' if duration_diff_percent < 5.0
else 'Duration mismatch - consider trimming/extending'
)
}
except ValueError:
return {'compatible': False, 'error': 'Invalid duration values'}
except Exception as e:
return {'compatible': False, 'error': str(e)}
def get_stats(self) -> Dict[str, Any]:
"""Get audio processing statistics"""
return {
'ffmpeg_available': self.ffmpeg_available,
'ffprobe_available': self.ffprobe_available,
'audio_extractions': self.stats['audio_extractions'],
'audio_merges': self.stats['audio_merges'],
'total_processing_time': self.stats['total_processing_time'],
'failed_operations': self.stats['failed_operations'],
'success_rate': (
(self.stats['audio_extractions'] + self.stats['audio_merges']) /
max(1, self.stats['audio_extractions'] + self.stats['audio_merges'] + self.stats['failed_operations'])
) * 100
}
def cleanup_temp_files(self, max_age_hours: int = 24):
"""Clean up temporary audio files older than specified age"""
try:
temp_path = Path(self.temp_dir)
current_time = time.time()
cutoff_time = current_time - (max_age_hours * 3600)
cleaned_files = 0
for file_path in temp_path.glob("*audio*.{aac,mp3,wav,mp4}"):
if file_path.stat().st_mtime < cutoff_time:
try:
file_path.unlink()
cleaned_files += 1
except Exception as e:
logger.warning(f"Could not delete temp file {file_path}: {e}")
if cleaned_files > 0:
logger.info(f"Cleaned up {cleaned_files} temporary audio files")
except Exception as e:
logger.warning(f"Error during temp file cleanup: {e}")