Spaces:
Running
Running
| """Performance optimization utilities for the caption generation tool.""" | |
| import os | |
| import hashlib | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Union | |
| from contextlib import contextmanager | |
| from config import MODEL_CACHE_DIR, MAX_AUDIO_LENGTH_SEC, TEMP_FILE_PREFIX | |
| logger = logging.getLogger(__name__) | |
| class ModelCacheManager: | |
| """Manages local model caching to avoid repeated downloads.""" | |
| def __init__(self, cache_dir: str = MODEL_CACHE_DIR): | |
| self.cache_dir = Path(cache_dir) | |
| self.cache_dir.mkdir(exist_ok=True) | |
| def get_model_path(self, model_id: str) -> Optional[Path]: | |
| """Check if model is cached locally.""" | |
| model_hash = hashlib.md5(model_id.encode()).hexdigest()[:8] | |
| model_path = self.cache_dir / f"model_{model_hash}" | |
| return model_path if model_path.exists() else None | |
| def cache_model(self, model_id: str, model_data: bytes) -> Path: | |
| """Cache model data locally.""" | |
| model_hash = hashlib.md5(model_id.encode()).hexdigest()[:8] | |
| model_path = self.cache_dir / f"model_{model_hash}" | |
| with open(model_path, 'wb') as f: | |
| f.write(model_data) | |
| logger.info(f"Cached model {model_id} to {model_path}") | |
| return model_path | |
| class AudioValidator: | |
| """Enhanced audio validation with performance checks.""" | |
| def validate_audio_duration(audio_path: Union[str, Path]) -> float: | |
| """Validate audio duration is within processing limits.""" | |
| import subprocess | |
| audio_path = Path(audio_path) | |
| # Use ffprobe to get duration quickly without loading audio | |
| cmd = [ | |
| 'ffprobe', '-v', 'quiet', '-show_entries', | |
| 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', | |
| str(audio_path) | |
| ] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) | |
| duration = float(result.stdout.strip()) | |
| if duration > MAX_AUDIO_LENGTH_SEC: | |
| raise ValueError( | |
| f"Audio too long: {duration:.1f}s (max: {MAX_AUDIO_LENGTH_SEC}s). " | |
| "Consider splitting into smaller segments." | |
| ) | |
| return duration | |
| except (subprocess.TimeoutExpired, subprocess.CalledProcessError, ValueError) as e: | |
| raise RuntimeError(f"Failed to validate audio duration: {e}") | |
| def temp_file_manager(suffix: str = ".tmp", prefix: str = TEMP_FILE_PREFIX): | |
| """Context manager for safe temporary file handling.""" | |
| import tempfile | |
| temp_files = [] | |
| try: | |
| with tempfile.NamedTemporaryFile( | |
| suffix=suffix, prefix=prefix, delete=False | |
| ) as f: | |
| temp_files.append(f.name) | |
| yield f.name | |
| finally: | |
| # Clean up all temp files | |
| for temp_file in temp_files: | |
| try: | |
| Path(temp_file).unlink() | |
| except OSError: | |
| logger.warning(f"Failed to clean up temp file: {temp_file}") | |
| class MemoryOptimizer: | |
| """Memory usage optimization utilities.""" | |
| def estimate_memory_usage(audio_duration: float, word_count: int) -> Dict[str, float]: | |
| """Estimate memory requirements for processing.""" | |
| # Rough estimates based on typical usage patterns | |
| audio_mb = audio_duration * 0.5 # ~500KB per second for 16kHz mono | |
| model_mb = 1200 # facebook/mms-300m model size | |
| alignment_mb = word_count * 0.01 # Alignment metadata | |
| total_mb = audio_mb + model_mb + alignment_mb | |
| return { | |
| "audio_mb": audio_mb, | |
| "model_mb": model_mb, | |
| "alignment_mb": alignment_mb, | |
| "total_mb": total_mb, | |
| "recommended_ram_gb": max(4.0, total_mb / 1024 * 1.5) | |
| } | |
| def check_available_memory() -> float: | |
| """Check available system memory in GB.""" | |
| import psutil | |
| memory = psutil.virtual_memory() | |
| return memory.available / (1024**3) | |
| class BatchProcessor: | |
| """Optimized batch processing with concurrency control.""" | |
| def __init__(self, max_concurrent: int = 4): | |
| self.max_concurrent = max_concurrent | |
| def process_batch_optimized(self, audio_script_pairs: List[tuple], | |
| output_dir: Path) -> List[Dict]: | |
| """Process multiple files with optimal resource usage.""" | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| results = [] | |
| # Sort by file size for better load balancing | |
| pairs_with_size = [] | |
| for audio_path, script_path in audio_script_pairs: | |
| audio_size = Path(audio_path).stat().st_size | |
| pairs_with_size.append((audio_size, audio_path, script_path)) | |
| # Process largest files first to minimize idle time | |
| pairs_with_size.sort(reverse=True) | |
| with ThreadPoolExecutor(max_workers=self.max_concurrent) as executor: | |
| futures = [] | |
| for _, audio_path, script_path in pairs_with_size: | |
| future = executor.submit( | |
| self._process_single_optimized, | |
| audio_path, script_path, output_dir | |
| ) | |
| futures.append(future) | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| except Exception as e: | |
| logger.error(f"Batch processing error: {e}") | |
| results.append({"error": str(e)}) | |
| return results | |
| def _process_single_optimized(self, audio_path: str, script_path: str, | |
| output_dir: Path) -> Dict: | |
| """Process single file with optimizations.""" | |
| # This would call the main align function with optimizations | |
| # Implementation would go here | |
| return { | |
| "audio_path": audio_path, | |
| "script_path": script_path, | |
| "status": "processed", | |
| "output_path": output_dir / f"{Path(audio_path).stem}.srt" | |
| } |