|
|
""" |
|
|
Audio Processing Module |
|
|
Handles audio extraction, processing, and integration with FFmpeg operations |
|
|
""" |
|
|
|
|
|
import os |
|
|
import subprocess |
|
|
import tempfile |
|
|
import logging |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import Optional, Dict, Any, List, Tuple |
|
|
|
|
|
|
|
|
from core.exceptions import AudioProcessingError |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class AudioProcessor: |
|
|
""" |
|
|
Comprehensive audio processing for video background replacement |
|
|
""" |
|
|
|
|
|
def __init__(self, temp_dir: Optional[str] = None): |
|
|
self.temp_dir = temp_dir or tempfile.gettempdir() |
|
|
self.ffmpeg_available = self._check_ffmpeg_availability() |
|
|
self.ffprobe_available = self._check_ffprobe_availability() |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
'audio_extractions': 0, |
|
|
'audio_merges': 0, |
|
|
'total_processing_time': 0.0, |
|
|
'failed_operations': 0 |
|
|
} |
|
|
|
|
|
if not self.ffmpeg_available: |
|
|
logger.warning("FFmpeg not available - audio processing will be limited") |
|
|
|
|
|
logger.info(f"AudioProcessor initialized (FFmpeg: {self.ffmpeg_available}, FFprobe: {self.ffprobe_available})") |
|
|
|
|
|
def _check_ffmpeg_availability(self) -> bool: |
|
|
"""Check if FFmpeg is available on the system""" |
|
|
try: |
|
|
result = subprocess.run( |
|
|
['ffmpeg', '-version'], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=10 |
|
|
) |
|
|
return result.returncode == 0 |
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): |
|
|
return False |
|
|
|
|
|
def _check_ffprobe_availability(self) -> bool: |
|
|
"""Check if FFprobe is available on the system""" |
|
|
try: |
|
|
result = subprocess.run( |
|
|
['ffprobe', '-version'], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=10 |
|
|
) |
|
|
return result.returncode == 0 |
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): |
|
|
return False |
|
|
|
|
|
def get_audio_info(self, video_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get comprehensive audio information from video file |
|
|
|
|
|
Args: |
|
|
video_path: Path to the video file |
|
|
|
|
|
Returns: |
|
|
Dictionary containing audio information |
|
|
""" |
|
|
if not self.ffprobe_available: |
|
|
return {'has_audio': False, 'error': 'FFprobe not available'} |
|
|
|
|
|
try: |
|
|
|
|
|
result = subprocess.run([ |
|
|
'ffprobe', '-v', 'quiet', '-select_streams', 'a:0', |
|
|
'-show_entries', 'stream=codec_name,sample_rate,channels,duration,bit_rate', |
|
|
'-of', 'csv=p=0', video_path |
|
|
], capture_output=True, text=True, timeout=30) |
|
|
|
|
|
if result.returncode != 0: |
|
|
return { |
|
|
'has_audio': False, |
|
|
'error': 'No audio stream found', |
|
|
'ffprobe_error': result.stderr |
|
|
} |
|
|
|
|
|
|
|
|
audio_data = result.stdout.strip().split(',') |
|
|
|
|
|
if len(audio_data) >= 1 and audio_data[0]: |
|
|
info = { |
|
|
'has_audio': True, |
|
|
'codec': audio_data[0] if len(audio_data) > 0 else 'unknown', |
|
|
'sample_rate': audio_data[1] if len(audio_data) > 1 else 'unknown', |
|
|
'channels': audio_data[2] if len(audio_data) > 2 else 'unknown', |
|
|
'duration': audio_data[3] if len(audio_data) > 3 else 'unknown', |
|
|
'bit_rate': audio_data[4] if len(audio_data) > 4 else 'unknown' |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
if info['sample_rate'] != 'unknown': |
|
|
info['sample_rate'] = int(info['sample_rate']) |
|
|
if info['channels'] != 'unknown': |
|
|
info['channels'] = int(info['channels']) |
|
|
if info['duration'] != 'unknown': |
|
|
info['duration'] = float(info['duration']) |
|
|
if info['bit_rate'] != 'unknown': |
|
|
info['bit_rate'] = int(info['bit_rate']) |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
return info |
|
|
else: |
|
|
return {'has_audio': False, 'error': 'Audio stream data empty'} |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
return {'has_audio': False, 'error': 'FFprobe timeout'} |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting audio info: {e}") |
|
|
return {'has_audio': False, 'error': str(e)} |
|
|
|
|
|
def extract_audio(self, video_path: str, output_path: Optional[str] = None, |
|
|
audio_format: str = 'aac', quality: str = 'high') -> Optional[str]: |
|
|
""" |
|
|
Extract audio from video file |
|
|
|
|
|
Args: |
|
|
video_path: Path to input video |
|
|
output_path: Output path for audio (auto-generated if None) |
|
|
audio_format: Output audio format (aac, mp3, wav) |
|
|
quality: Audio quality (low, medium, high) |
|
|
|
|
|
Returns: |
|
|
Path to extracted audio file or None if failed |
|
|
""" |
|
|
if not self.ffmpeg_available: |
|
|
raise AudioProcessingError("extract", "FFmpeg not available", video_path) |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
audio_info = self.get_audio_info(video_path) |
|
|
if not audio_info.get('has_audio', False): |
|
|
logger.info(f"No audio found in {video_path}") |
|
|
return None |
|
|
|
|
|
|
|
|
if output_path is None: |
|
|
timestamp = int(time.time()) |
|
|
output_path = os.path.join( |
|
|
self.temp_dir, |
|
|
f"extracted_audio_{timestamp}.{audio_format}" |
|
|
) |
|
|
|
|
|
|
|
|
quality_settings = { |
|
|
'low': {'aac': ['-b:a', '96k'], 'mp3': ['-b:a', '128k'], 'wav': []}, |
|
|
'medium': {'aac': ['-b:a', '192k'], 'mp3': ['-b:a', '192k'], 'wav': []}, |
|
|
'high': {'aac': ['-b:a', '320k'], 'mp3': ['-b:a', '320k'], 'wav': []} |
|
|
} |
|
|
|
|
|
codec_settings = { |
|
|
'aac': ['-c:a', 'aac'], |
|
|
'mp3': ['-c:a', 'libmp3lame'], |
|
|
'wav': ['-c:a', 'pcm_s16le'] |
|
|
} |
|
|
|
|
|
|
|
|
cmd = ['ffmpeg', '-y', '-i', video_path] |
|
|
cmd.extend(codec_settings.get(audio_format, ['-c:a', 'aac'])) |
|
|
cmd.extend(quality_settings.get(quality, {}).get(audio_format, [])) |
|
|
cmd.extend(['-vn', output_path]) |
|
|
|
|
|
|
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=300 |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
raise AudioProcessingError( |
|
|
"extract", |
|
|
f"FFmpeg failed: {result.stderr}", |
|
|
video_path, |
|
|
output_path |
|
|
) |
|
|
|
|
|
if not os.path.exists(output_path): |
|
|
raise AudioProcessingError( |
|
|
"extract", |
|
|
"Output audio file was not created", |
|
|
video_path, |
|
|
output_path |
|
|
) |
|
|
|
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
self.stats['audio_extractions'] += 1 |
|
|
self.stats['total_processing_time'] += processing_time |
|
|
|
|
|
logger.info(f"Audio extracted successfully in {processing_time:.1f}s: {output_path}") |
|
|
return output_path |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
self.stats['failed_operations'] += 1 |
|
|
raise AudioProcessingError("extract", "FFmpeg timeout during extraction", video_path) |
|
|
except Exception as e: |
|
|
self.stats['failed_operations'] += 1 |
|
|
if isinstance(e, AudioProcessingError): |
|
|
raise |
|
|
else: |
|
|
raise AudioProcessingError("extract", f"Unexpected error: {str(e)}", video_path) |
|
|
|
|
|
def add_audio_to_video(self, original_video: str, processed_video: str, |
|
|
output_path: Optional[str] = None, |
|
|
audio_quality: str = 'high') -> str: |
|
|
""" |
|
|
Add audio from original video to processed video |
|
|
|
|
|
Args: |
|
|
original_video: Path to original video with audio |
|
|
processed_video: Path to processed video without audio |
|
|
output_path: Output path (auto-generated if None) |
|
|
audio_quality: Audio quality setting |
|
|
|
|
|
Returns: |
|
|
Path to final video with audio |
|
|
""" |
|
|
if not self.ffmpeg_available: |
|
|
logger.warning("FFmpeg not available - returning processed video without audio") |
|
|
return processed_video |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
audio_info = self.get_audio_info(original_video) |
|
|
if not audio_info.get('has_audio', False): |
|
|
logger.info("Original video has no audio - returning processed video") |
|
|
return processed_video |
|
|
|
|
|
|
|
|
if output_path is None: |
|
|
timestamp = int(time.time()) |
|
|
output_path = os.path.join( |
|
|
self.temp_dir, |
|
|
f"final_with_audio_{timestamp}.mp4" |
|
|
) |
|
|
|
|
|
|
|
|
quality_settings = { |
|
|
'low': ['-b:a', '96k'], |
|
|
'medium': ['-b:a', '192k'], |
|
|
'high': ['-b:a', '320k'] |
|
|
} |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
'ffmpeg', '-y', |
|
|
'-i', processed_video, |
|
|
'-i', original_video, |
|
|
'-c:v', 'copy', |
|
|
'-c:a', 'aac', |
|
|
] |
|
|
|
|
|
|
|
|
cmd.extend(quality_settings.get(audio_quality, quality_settings['high'])) |
|
|
|
|
|
|
|
|
cmd.extend([ |
|
|
'-map', '0:v:0', |
|
|
'-map', '1:a:0', |
|
|
'-shortest', |
|
|
output_path |
|
|
]) |
|
|
|
|
|
|
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=600 |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
logger.warning(f"Audio merge failed: {result.stderr}") |
|
|
logger.warning("Returning processed video without audio") |
|
|
return processed_video |
|
|
|
|
|
if not os.path.exists(output_path): |
|
|
logger.warning("Output video with audio was not created") |
|
|
return processed_video |
|
|
|
|
|
|
|
|
if os.path.getsize(output_path) == 0: |
|
|
logger.warning("Output video file is empty") |
|
|
try: |
|
|
os.remove(output_path) |
|
|
except: |
|
|
pass |
|
|
return processed_video |
|
|
|
|
|
|
|
|
try: |
|
|
if output_path != processed_video: |
|
|
os.remove(processed_video) |
|
|
logger.debug("Cleaned up intermediate processed video") |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not clean up intermediate file: {e}") |
|
|
|
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
self.stats['audio_merges'] += 1 |
|
|
self.stats['total_processing_time'] += processing_time |
|
|
|
|
|
logger.info(f"Audio merged successfully in {processing_time:.1f}s: {output_path}") |
|
|
return output_path |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
self.stats['failed_operations'] += 1 |
|
|
logger.warning("Audio merge timeout - returning processed video without audio") |
|
|
return processed_video |
|
|
except Exception as e: |
|
|
self.stats['failed_operations'] += 1 |
|
|
logger.warning(f"Audio merge error: {e} - returning processed video without audio") |
|
|
return processed_video |
|
|
|
|
|
def sync_audio_video(self, video_path: str, audio_path: str, |
|
|
output_path: str, offset_ms: float = 0.0) -> bool: |
|
|
""" |
|
|
Synchronize separate audio and video files |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
audio_path: Path to audio file |
|
|
output_path: Output path for synchronized file |
|
|
offset_ms: Audio offset in milliseconds (positive = delay audio) |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
if not self.ffmpeg_available: |
|
|
raise AudioProcessingError("sync", "FFmpeg not available") |
|
|
|
|
|
try: |
|
|
cmd = ['ffmpeg', '-y', '-i', video_path, '-i', audio_path] |
|
|
|
|
|
|
|
|
if offset_ms != 0.0: |
|
|
offset_seconds = offset_ms / 1000.0 |
|
|
cmd.extend(['-itsoffset', str(offset_seconds)]) |
|
|
|
|
|
cmd.extend([ |
|
|
'-c:v', 'copy', |
|
|
'-c:a', 'aac', |
|
|
'-b:a', '192k', |
|
|
'-shortest', |
|
|
output_path |
|
|
]) |
|
|
|
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=600 |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
raise AudioProcessingError( |
|
|
"sync", |
|
|
f"Synchronization failed: {result.stderr}", |
|
|
video_path |
|
|
) |
|
|
|
|
|
return os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
raise AudioProcessingError("sync", "Synchronization timeout", video_path) |
|
|
except Exception as e: |
|
|
if isinstance(e, AudioProcessingError): |
|
|
raise |
|
|
else: |
|
|
raise AudioProcessingError("sync", f"Unexpected error: {str(e)}", video_path) |
|
|
|
|
|
def adjust_audio_levels(self, input_path: str, output_path: str, |
|
|
volume_factor: float = 1.0, normalize: bool = False) -> bool: |
|
|
""" |
|
|
Adjust audio levels in a video file |
|
|
|
|
|
Args: |
|
|
input_path: Input video path |
|
|
output_path: Output video path |
|
|
volume_factor: Volume multiplication factor (1.0 = no change) |
|
|
normalize: Whether to normalize audio levels |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
if not self.ffmpeg_available: |
|
|
raise AudioProcessingError("adjust_levels", "FFmpeg not available") |
|
|
|
|
|
try: |
|
|
cmd = ['ffmpeg', '-y', '-i', input_path, '-c:v', 'copy'] |
|
|
|
|
|
|
|
|
audio_filters = [] |
|
|
|
|
|
if volume_factor != 1.0: |
|
|
audio_filters.append(f"volume={volume_factor}") |
|
|
|
|
|
if normalize: |
|
|
audio_filters.append("loudnorm") |
|
|
|
|
|
if audio_filters: |
|
|
cmd.extend(['-af', ','.join(audio_filters)]) |
|
|
|
|
|
cmd.extend(['-c:a', 'aac', '-b:a', '192k', output_path]) |
|
|
|
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=600 |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
raise AudioProcessingError( |
|
|
"adjust_levels", |
|
|
f"Level adjustment failed: {result.stderr}", |
|
|
input_path |
|
|
) |
|
|
|
|
|
return os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
|
|
|
|
|
except Exception as e: |
|
|
if isinstance(e, AudioProcessingError): |
|
|
raise |
|
|
else: |
|
|
raise AudioProcessingError("adjust_levels", f"Unexpected error: {str(e)}", input_path) |
|
|
|
|
|
def get_supported_formats(self) -> Dict[str, List[str]]: |
|
|
"""Get supported audio and video formats""" |
|
|
if not self.ffmpeg_available: |
|
|
return {'audio': [], 'video': []} |
|
|
|
|
|
try: |
|
|
|
|
|
result = subprocess.run( |
|
|
['ffmpeg', '-formats'], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
return {'audio': ['aac', 'mp3', 'wav'], 'video': ['mp4', 'avi', 'mov']} |
|
|
|
|
|
|
|
|
lines = result.stdout.split('\n') |
|
|
audio_formats = [] |
|
|
video_formats = [] |
|
|
|
|
|
for line in lines: |
|
|
if 'aac' in line.lower(): |
|
|
audio_formats.append('aac') |
|
|
elif 'mp3' in line.lower(): |
|
|
audio_formats.append('mp3') |
|
|
elif 'wav' in line.lower(): |
|
|
audio_formats.append('wav') |
|
|
elif 'mp4' in line.lower(): |
|
|
video_formats.append('mp4') |
|
|
elif 'avi' in line.lower(): |
|
|
video_formats.append('avi') |
|
|
elif 'mov' in line.lower(): |
|
|
video_formats.append('mov') |
|
|
|
|
|
return { |
|
|
'audio': list(set(audio_formats)) or ['aac', 'mp3', 'wav'], |
|
|
'video': list(set(video_formats)) or ['mp4', 'avi', 'mov'] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Could not get supported formats: {e}") |
|
|
return {'audio': ['aac', 'mp3', 'wav'], 'video': ['mp4', 'avi', 'mov']} |
|
|
|
|
|
def validate_audio_video_compatibility(self, video_path: str, audio_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Validate compatibility between video and audio files |
|
|
|
|
|
Returns: |
|
|
Dictionary with compatibility information |
|
|
""" |
|
|
if not self.ffprobe_available: |
|
|
return {'compatible': False, 'error': 'FFprobe not available'} |
|
|
|
|
|
try: |
|
|
|
|
|
video_result = subprocess.run([ |
|
|
'ffprobe', '-v', 'quiet', '-select_streams', 'v:0', |
|
|
'-show_entries', 'stream=duration', '-of', 'csv=p=0', video_path |
|
|
], capture_output=True, text=True, timeout=30) |
|
|
|
|
|
|
|
|
audio_result = subprocess.run([ |
|
|
'ffprobe', '-v', 'quiet', '-select_streams', 'a:0', |
|
|
'-show_entries', 'stream=duration', '-of', 'csv=p=0', audio_path |
|
|
], capture_output=True, text=True, timeout=30) |
|
|
|
|
|
if video_result.returncode != 0 or audio_result.returncode != 0: |
|
|
return {'compatible': False, 'error': 'Could not read file information'} |
|
|
|
|
|
try: |
|
|
video_duration = float(video_result.stdout.strip()) |
|
|
audio_duration = float(audio_result.stdout.strip()) |
|
|
|
|
|
duration_diff = abs(video_duration - audio_duration) |
|
|
duration_diff_percent = (duration_diff / max(video_duration, audio_duration)) * 100 |
|
|
|
|
|
return { |
|
|
'compatible': duration_diff_percent < 5.0, |
|
|
'video_duration': video_duration, |
|
|
'audio_duration': audio_duration, |
|
|
'duration_difference': duration_diff, |
|
|
'duration_difference_percent': duration_diff_percent, |
|
|
'recommendation': ( |
|
|
'Compatible' if duration_diff_percent < 5.0 |
|
|
else 'Duration mismatch - consider trimming/extending' |
|
|
) |
|
|
} |
|
|
|
|
|
except ValueError: |
|
|
return {'compatible': False, 'error': 'Invalid duration values'} |
|
|
|
|
|
except Exception as e: |
|
|
return {'compatible': False, 'error': str(e)} |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Get audio processing statistics""" |
|
|
return { |
|
|
'ffmpeg_available': self.ffmpeg_available, |
|
|
'ffprobe_available': self.ffprobe_available, |
|
|
'audio_extractions': self.stats['audio_extractions'], |
|
|
'audio_merges': self.stats['audio_merges'], |
|
|
'total_processing_time': self.stats['total_processing_time'], |
|
|
'failed_operations': self.stats['failed_operations'], |
|
|
'success_rate': ( |
|
|
(self.stats['audio_extractions'] + self.stats['audio_merges']) / |
|
|
max(1, self.stats['audio_extractions'] + self.stats['audio_merges'] + self.stats['failed_operations']) |
|
|
) * 100 |
|
|
} |
|
|
|
|
|
def cleanup_temp_files(self, max_age_hours: int = 24): |
|
|
"""Clean up temporary audio files older than specified age""" |
|
|
try: |
|
|
temp_path = Path(self.temp_dir) |
|
|
current_time = time.time() |
|
|
cutoff_time = current_time - (max_age_hours * 3600) |
|
|
|
|
|
cleaned_files = 0 |
|
|
for file_path in temp_path.glob("*audio*.{aac,mp3,wav,mp4}"): |
|
|
if file_path.stat().st_mtime < cutoff_time: |
|
|
try: |
|
|
file_path.unlink() |
|
|
cleaned_files += 1 |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not delete temp file {file_path}: {e}") |
|
|
|
|
|
if cleaned_files > 0: |
|
|
logger.info(f"Cleaned up {cleaned_files} temporary audio files") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error during temp file cleanup: {e}") |