""" Core Utilities Module for BackgroundFX Pro Contains FileManager, VideoUtils, ImageUtils, and ValidationUtils """ # Set OMP_NUM_THREADS at the very beginning to prevent libgomp errors import os if 'OMP_NUM_THREADS' not in os.environ: os.environ['OMP_NUM_THREADS'] = '4' os.environ['MKL_NUM_THREADS'] = '4' import shutil import tempfile import logging from pathlib import Path from typing import Optional, List, Union, Tuple, Dict, Any from datetime import datetime import subprocess import re import cv2 import numpy as np import torch from PIL import Image, ImageEnhance, ImageFilter, ImageDraw logger = logging.getLogger(__name__) # ============================================================================ # VALIDATION UTILS CLASS # ============================================================================ class ValidationUtils: """Validation utilities for BackgroundFX Pro application.""" # Supported formats SUPPORTED_VIDEO_FORMATS = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv', '.m4v'} SUPPORTED_IMAGE_FORMATS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} # Size limits (in bytes) MAX_VIDEO_SIZE = 500 * 1024 * 1024 # 500MB MAX_IMAGE_SIZE = 50 * 1024 * 1024 # 50MB MIN_VIDEO_SIZE = 1024 # 1KB (to avoid empty files) # Video constraints MAX_VIDEO_DURATION = 300 # 5 minutes in seconds MIN_VIDEO_DURATION = 1 # 1 second minimum MAX_RESOLUTION = (3840, 2160) # 4K MIN_RESOLUTION = (320, 240) # Minimum reasonable resolution MAX_FPS = 120 MIN_FPS = 10 @staticmethod def validate_video_file(file_path, check_content=False): """ Validate video file for processing. Args: file_path: Path to the video file check_content: Whether to perform deep content validation Returns: tuple: (is_valid, error_message) """ from pathlib import Path if not file_path: return False, "No file path provided" path = Path(file_path) # Check if file exists if not path.exists(): return False, f"File not found: {file_path}" # Check file extension if path.suffix.lower() not in ValidationUtils.SUPPORTED_VIDEO_FORMATS: return False, f"Unsupported video format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_VIDEO_FORMATS)}" # Check file size file_size = path.stat().st_size if file_size > ValidationUtils.MAX_VIDEO_SIZE: size_mb = file_size / (1024 * 1024) return False, f"Video file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_VIDEO_SIZE / (1024 * 1024):.0f}MB)" if file_size < ValidationUtils.MIN_VIDEO_SIZE: return False, "Video file appears to be empty or corrupted" # Deep content validation if requested if check_content: try: cap = cv2.VideoCapture(str(file_path)) if not cap.isOpened(): return False, "Unable to open video file - may be corrupted" # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Calculate duration duration = frame_count / fps if fps > 0 else 0 cap.release() # Validate properties if duration > ValidationUtils.MAX_VIDEO_DURATION: return False, f"Video too long: {duration:.1f}s (max: {ValidationUtils.MAX_VIDEO_DURATION}s)" if duration < ValidationUtils.MIN_VIDEO_DURATION: return False, f"Video too short: {duration:.1f}s (min: {ValidationUtils.MIN_VIDEO_DURATION}s)" if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]: return False, f"Video resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})" if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]: return False, f"Video resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})" if fps > ValidationUtils.MAX_FPS: return False, f"Frame rate too high: {fps:.1f} fps (max: {ValidationUtils.MAX_FPS} fps)" if fps < ValidationUtils.MIN_FPS: return False, f"Frame rate too low: {fps:.1f} fps (min: {ValidationUtils.MIN_FPS} fps)" except Exception as e: return False, f"Error validating video content: {str(e)}" return True, "Video file is valid" @staticmethod def validate_image_file(file_path, check_content=False): """ Validate image file for background replacement. Args: file_path: Path to the image file check_content: Whether to perform deep content validation Returns: tuple: (is_valid, error_message) """ from pathlib import Path if not file_path: return False, "No file path provided" path = Path(file_path) # Check if file exists if not path.exists(): return False, f"File not found: {file_path}" # Check file extension if path.suffix.lower() not in ValidationUtils.SUPPORTED_IMAGE_FORMATS: return False, f"Unsupported image format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_IMAGE_FORMATS)}" # Check file size file_size = path.stat().st_size if file_size > ValidationUtils.MAX_IMAGE_SIZE: size_mb = file_size / (1024 * 1024) return False, f"Image file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_IMAGE_SIZE / (1024 * 1024):.0f}MB)" # Deep content validation if requested if check_content: try: img = cv2.imread(str(file_path)) if img is None: return False, "Unable to read image file - may be corrupted" height, width = img.shape[:2] # Check dimensions if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]: return False, f"Image resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})" if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]: return False, f"Image resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})" except Exception as e: return False, f"Error validating image content: {str(e)}" return True, "Image file is valid" @staticmethod def validate_processing_params(params): """ Validate processing parameters. Args: params: Dictionary of processing parameters Returns: tuple: (is_valid, error_message) """ if not params: return False, "No parameters provided" # Validate confidence threshold if 'confidence_threshold' in params: conf = params['confidence_threshold'] if not isinstance(conf, (int, float)): return False, "Confidence threshold must be a number" if conf < 0 or conf > 1: return False, "Confidence threshold must be between 0 and 1" # Validate mask dilation if 'mask_dilation' in params: dilation = params['mask_dilation'] if not isinstance(dilation, int): return False, "Mask dilation must be an integer" if dilation < 0 or dilation > 50: return False, "Mask dilation must be between 0 and 50" # Validate edge smoothing if 'edge_smoothing' in params: smooth = params['edge_smoothing'] if not isinstance(smooth, int): return False, "Edge smoothing must be an integer" if smooth < 0 or smooth > 100: return False, "Edge smoothing must be between 0 and 100" # Validate color adjustment if 'color_adjustment' in params: color_adj = params['color_adjustment'] if not isinstance(color_adj, bool): return False, "Color adjustment must be a boolean" # Validate output quality if 'output_quality' in params: quality = params['output_quality'] if not isinstance(quality, int): return False, "Output quality must be an integer" if quality < 1 or quality > 100: return False, "Output quality must be between 1 and 100" # Validate processing method if 'processing_method' in params: method = params['processing_method'] valid_methods = {'sam2', 'matanyone', 'cv_fallback', 'auto'} if method not in valid_methods: return False, f"Invalid processing method. Must be one of: {', '.join(valid_methods)}" return True, "Parameters are valid" @staticmethod def validate_output_path(output_path, create_dirs=False): """ Validate output path for saving results. Args: output_path: Path where output will be saved create_dirs: Whether to create directories if they don't exist Returns: tuple: (is_valid, error_message) """ from pathlib import Path if not output_path: return False, "No output path provided" path = Path(output_path) parent_dir = path.parent # Check if parent directory exists if not parent_dir.exists(): if create_dirs: try: parent_dir.mkdir(parents=True, exist_ok=True) except Exception as e: return False, f"Failed to create output directory: {str(e)}" else: return False, f"Output directory does not exist: {parent_dir}" # Check write permissions if not os.access(parent_dir, os.W_OK): return False, f"No write permission for directory: {parent_dir}" # Check if file already exists if path.exists(): if not os.access(path, os.W_OK): return False, f"Cannot overwrite existing file: {output_path}" return True, "Output path is valid" @staticmethod def sanitize_filename(filename): """ Sanitize filename to be safe for filesystem. Args: filename: Original filename Returns: str: Sanitized filename """ from pathlib import Path # Get the stem and suffix separately path = Path(filename) stem = path.stem suffix = path.suffix # Remove or replace invalid characters # Keep only alphanumeric, dash, underscore, and dot stem = re.sub(r'[^\w\-_.]', '_', stem) # Remove multiple underscores stem = re.sub(r'_+', '_', stem) # Remove leading/trailing underscores stem = stem.strip('_') # Ensure filename is not empty if not stem: stem = 'output' # Limit length (keep it reasonable for most filesystems) max_length = 200 if len(stem) > max_length: stem = stem[:max_length] return f"{stem}{suffix}" @staticmethod def validate_memory_available(required_mb=1000): """ Check if sufficient memory is available. Args: required_mb: Required memory in megabytes Returns: tuple: (is_sufficient, available_mb, error_message) """ try: import psutil mem = psutil.virtual_memory() available_mb = mem.available / (1024 * 1024) if available_mb < required_mb: return False, available_mb, f"Insufficient memory: {available_mb:.0f}MB available, {required_mb:.0f}MB required" return True, available_mb, f"Sufficient memory available: {available_mb:.0f}MB" except ImportError: # If psutil not available, assume sufficient memory return True, -1, "Memory check skipped (psutil not available)" except Exception as e: return True, -1, f"Memory check failed: {str(e)}" @staticmethod def validate_gpu_available(): """ Check if GPU is available for processing. Returns: tuple: (is_available, device_info) """ try: if torch.cuda.is_available(): device_name = torch.cuda.get_device_name(0) memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) return True, f"GPU available: {device_name} ({memory_gb:.1f}GB)" else: return False, "No GPU available - will use CPU" except ImportError: return False, "PyTorch not available for GPU check" except Exception as e: return False, f"GPU check failed: {str(e)}" @staticmethod def validate_url(url): """ Validate URL format. Args: url: URL string to validate Returns: tuple: (is_valid, error_message) """ if not url: return False, "No URL provided" # Basic URL pattern url_pattern = re.compile( r'^https?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) if url_pattern.match(url): return True, "Valid URL" else: return False, "Invalid URL format" # ============================================================================ # FILE MANAGER CLASS # ============================================================================ class FileManager: """Manages file operations for BackgroundFX Pro""" def __init__(self, base_dir: Optional[str] = None): """Initialize FileManager""" if base_dir: self.base_dir = Path(base_dir) else: self.base_dir = Path(tempfile.gettempdir()) / "backgroundfx_pro" self.base_dir.mkdir(parents=True, exist_ok=True) # Create subdirectories self.uploads_dir = self.base_dir / "uploads" self.outputs_dir = self.base_dir / "outputs" self.temp_dir = self.base_dir / "temp" self.cache_dir = self.base_dir / "cache" for dir_path in [self.uploads_dir, self.outputs_dir, self.temp_dir, self.cache_dir]: dir_path.mkdir(parents=True, exist_ok=True) logger.info(f"FileManager initialized with base directory: {self.base_dir}") def save_upload(self, file_path: Union[str, Path], filename: Optional[str] = None) -> Path: """Save an uploaded file to the uploads directory""" file_path = Path(file_path) if filename: dest_path = self.uploads_dir / filename else: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") dest_path = self.uploads_dir / f"{timestamp}_{file_path.name}" shutil.copy2(file_path, dest_path) logger.info(f"Saved upload: {dest_path}") return dest_path def create_output_path(self, filename: str, subfolder: Optional[str] = None) -> Path: """Create a path for an output file""" if subfolder: output_dir = self.outputs_dir / subfolder output_dir.mkdir(parents=True, exist_ok=True) else: output_dir = self.outputs_dir timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") name_parts = filename.rsplit('.', 1) if len(name_parts) == 2: output_path = output_dir / f"{name_parts[0]}_{timestamp}.{name_parts[1]}" else: output_path = output_dir / f"{filename}_{timestamp}" return output_path def get_temp_path(self, filename: Optional[str] = None, extension: str = ".tmp") -> Path: """Get a temporary file path""" if filename: temp_path = self.temp_dir / filename else: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") temp_path = self.temp_dir / f"temp_{timestamp}{extension}" return temp_path def cleanup_temp(self, max_age_hours: int = 24): """Clean up old temporary files""" try: current_time = datetime.now().timestamp() max_age_seconds = max_age_hours * 3600 for temp_file in self.temp_dir.iterdir(): if temp_file.is_file(): file_age = current_time - temp_file.stat().st_mtime if file_age > max_age_seconds: temp_file.unlink() logger.debug(f"Deleted old temp file: {temp_file}") logger.info("Temp directory cleanup completed") except Exception as e: logger.warning(f"Error during temp cleanup: {e}") def get_cache_path(self, key: str, extension: str = ".cache") -> Path: """Get a cache file path based on a key""" safe_key = "".join(c if c.isalnum() or c in '-_' else '_' for c in key) return self.cache_dir / f"{safe_key}{extension}" def list_outputs(self, subfolder: Optional[str] = None, extension: Optional[str] = None) -> List[Path]: """List output files""" if subfolder: search_dir = self.outputs_dir / subfolder else: search_dir = self.outputs_dir if not search_dir.exists(): return [] if extension: pattern = f"*{extension}" else: pattern = "*" return sorted(search_dir.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True) def delete_file(self, file_path: Union[str, Path]) -> bool: """Safely delete a file""" try: file_path = Path(file_path) if file_path.exists() and file_path.is_file(): file_path.unlink() logger.info(f"Deleted file: {file_path}") return True return False except Exception as e: logger.error(f"Error deleting file {file_path}: {e}") return False def get_file_info(self, file_path: Union[str, Path]) -> dict: """Get information about a file""" file_path = Path(file_path) if not file_path.exists(): return {"exists": False} stat = file_path.stat() return { "exists": True, "name": file_path.name, "size": stat.st_size, "size_mb": stat.st_size / (1024 * 1024), "created": datetime.fromtimestamp(stat.st_ctime), "modified": datetime.fromtimestamp(stat.st_mtime), "extension": file_path.suffix, "path": str(file_path.absolute()) } # ============================================================================ # VIDEO UTILS CLASS # ============================================================================ class VideoUtils: """Utilities for video processing""" @staticmethod def get_video_info(video_path: Union[str, Path]) -> Dict[str, Any]: """Get detailed video information""" video_path = str(video_path) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"Failed to open video: {video_path}") return {"error": "Failed to open video"} try: info = { "fps": cap.get(cv2.CAP_PROP_FPS), "frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), "codec": VideoUtils._fourcc_to_string(int(cap.get(cv2.CAP_PROP_FOURCC))), "duration": cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 0 } path = Path(video_path) if path.exists(): info["file_size_mb"] = path.stat().st_size / (1024 * 1024) return info finally: cap.release() @staticmethod def _fourcc_to_string(fourcc: int) -> str: """Convert fourcc code to string""" return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) @staticmethod def extract_frames(video_path: Union[str, Path], output_dir: Union[str, Path], frame_interval: int = 1, max_frames: Optional[int] = None) -> List[Path]: """Extract frames from video""" video_path = str(video_path) output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"Failed to open video: {video_path}") return [] frame_paths = [] frame_count = 0 extracted_count = 0 try: while True: ret, frame = cap.read() if not ret: break if frame_count % frame_interval == 0: frame_path = output_dir / f"frame_{frame_count:06d}.png" cv2.imwrite(str(frame_path), frame) frame_paths.append(frame_path) extracted_count += 1 if max_frames and extracted_count >= max_frames: break frame_count += 1 logger.info(f"Extracted {len(frame_paths)} frames from video") return frame_paths finally: cap.release() @staticmethod def create_video_from_frames(frame_paths: List[Union[str, Path]], output_path: Union[str, Path], fps: float = 30.0, codec: str = 'mp4v') -> bool: """Create video from frame images""" if not frame_paths: logger.error("No frames provided") return False first_frame = cv2.imread(str(frame_paths[0])) if first_frame is None: logger.error(f"Failed to read first frame: {frame_paths[0]}") return False height, width, layers = first_frame.shape fourcc = cv2.VideoWriter_fourcc(*codec) out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) try: for frame_path in frame_paths: frame = cv2.imread(str(frame_path)) if frame is not None: out.write(frame) else: logger.warning(f"Failed to read frame: {frame_path}") logger.info(f"Created video: {output_path}") return True except Exception as e: logger.error(f"Error creating video: {e}") return False finally: out.release() @staticmethod def resize_video(input_path: Union[str, Path], output_path: Union[str, Path], target_width: Optional[int] = None, target_height: Optional[int] = None, maintain_aspect: bool = True) -> bool: """Resize video to target dimensions""" cap = cv2.VideoCapture(str(input_path)) if not cap.isOpened(): logger.error(f"Failed to open video: {input_path}") return False orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) if maintain_aspect: if target_width and not target_height: aspect = orig_width / orig_height target_height = int(target_width / aspect) elif target_height and not target_width: aspect = orig_width / orig_height target_width = int(target_height * aspect) if not target_width: target_width = orig_width if not target_height: target_height = orig_height out = cv2.VideoWriter(str(output_path), fourcc, fps, (target_width, target_height)) try: while True: ret, frame = cap.read() if not ret: break resized = cv2.resize(frame, (target_width, target_height)) out.write(resized) logger.info(f"Resized video saved to: {output_path}") return True except Exception as e: logger.error(f"Error resizing video: {e}") return False finally: cap.release() out.release() @staticmethod def extract_audio(video_path: Union[str, Path], audio_path: Union[str, Path]) -> bool: """Extract audio from video using ffmpeg""" try: cmd = [ 'ffmpeg', '-i', str(video_path), '-vn', '-acodec', 'copy', str(audio_path), '-y' ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: logger.info(f"Audio extracted to: {audio_path}") return True else: logger.error(f"Failed to extract audio: {result.stderr}") return False except FileNotFoundError: logger.error("ffmpeg not found. Please install ffmpeg.") return False except Exception as e: logger.error(f"Error extracting audio: {e}") return False @staticmethod def add_audio_to_video(video_path: Union[str, Path], audio_path: Union[str, Path], output_path: Union[str, Path]) -> bool: """Add audio track to video using ffmpeg""" try: cmd = [ 'ffmpeg', '-i', str(video_path), '-i', str(audio_path), '-c:v', 'copy', '-c:a', 'aac', '-map', '0:v:0', '-map', '1:a:0', str(output_path), '-y' ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: logger.info(f"Video with audio saved to: {output_path}") return True else: logger.error(f"Failed to add audio: {result.stderr}") return False except FileNotFoundError: logger.error("ffmpeg not found. Please install ffmpeg.") return False except Exception as e: logger.error(f"Error adding audio: {e}") return False # ============================================================================ # IMAGE UTILS CLASS # ============================================================================ class ImageUtils: """Utilities for image processing and manipulation""" @staticmethod def load_image(image_path: Union[str, Path]) -> Optional[Image.Image]: """Load an image using PIL""" try: return Image.open(str(image_path)) except Exception as e: logger.error(f"Failed to load image {image_path}: {e}") return None @staticmethod def resize_image(image: Image.Image, max_width: Optional[int] = None, max_height: Optional[int] = None, maintain_aspect: bool = True) -> Image.Image: """Resize image to fit within max dimensions""" if not max_width and not max_height: return image width, height = image.size if maintain_aspect: scale = 1.0 if max_width: scale = min(scale, max_width / width) if max_height: scale = min(scale, max_height / height) new_width = int(width * scale) new_height = int(height * scale) else: new_width = max_width or width new_height = max_height or height return image.resize((new_width, new_height), Image.Resampling.LANCZOS) @staticmethod def convert_to_cv2(pil_image: Image.Image) -> np.ndarray: """Convert PIL Image to OpenCV format""" if pil_image.mode != 'RGB': pil_image = pil_image.convert('RGB') np_image = np.array(pil_image) return cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR) @staticmethod def convert_from_cv2(cv2_image: np.ndarray) -> Image.Image: """Convert OpenCV image to PIL format""" rgb_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2RGB) return Image.fromarray(rgb_image) @staticmethod def apply_blur(image: Image.Image, radius: float = 5.0) -> Image.Image: """Apply Gaussian blur to image""" return image.filter(ImageFilter.GaussianBlur(radius=radius)) @staticmethod def adjust_brightness(image: Image.Image, factor: float = 1.0) -> Image.Image: """Adjust image brightness""" enhancer = ImageEnhance.Brightness(image) return enhancer.enhance(factor) @staticmethod def adjust_contrast(image: Image.Image, factor: float = 1.0) -> Image.Image: """Adjust image contrast""" enhancer = ImageEnhance.Contrast(image) return enhancer.enhance(factor) @staticmethod def adjust_saturation(image: Image.Image, factor: float = 1.0) -> Image.Image: """Adjust image saturation""" enhancer = ImageEnhance.Color(image) return enhancer.enhance(factor) @staticmethod def crop_center(image: Image.Image, crop_width: int, crop_height: int) -> Image.Image: """Crop image from center""" width, height = image.size left = (width - crop_width) // 2 top = (height - crop_height) // 2 right = left + crop_width bottom = top + crop_height return image.crop((left, top, right, bottom)) @staticmethod def create_thumbnail(image: Image.Image, size: Tuple[int, int] = (128, 128)) -> Image.Image: """Create thumbnail preserving aspect ratio""" img_copy = image.copy() img_copy.thumbnail(size, Image.Resampling.LANCZOS) return img_copy @staticmethod def apply_mask(image: Image.Image, mask: Image.Image, alpha: float = 1.0) -> Image.Image: """Apply mask to image""" if image.mode != 'RGBA': image = image.convert('RGBA') if mask.mode != 'L': mask = mask.convert('L') if mask.size != image.size: mask = mask.resize(image.size, Image.Resampling.LANCZOS) if alpha < 1.0: mask = ImageEnhance.Brightness(mask).enhance(alpha) image.putalpha(mask) return image @staticmethod def composite_images(foreground: Image.Image, background: Image.Image, position: Tuple[int, int] = (0, 0), alpha: float = 1.0) -> Image.Image: """Composite foreground image over background""" if foreground.mode != 'RGBA': foreground = foreground.convert('RGBA') if background.mode != 'RGBA': background = background.convert('RGBA') if alpha < 1.0: foreground = foreground.copy() foreground.putalpha( ImageEnhance.Brightness(foreground.split()[3]).enhance(alpha) ) output = background.copy() output.paste(foreground, position, foreground) return output @staticmethod def get_image_info(image_path: Union[str, Path]) -> Dict[str, Any]: """Get image file information""" try: image_path = Path(image_path) if not image_path.exists(): return {"exists": False} with Image.open(str(image_path)) as img: info = { "exists": True, "filename": image_path.name, "format": img.format, "mode": img.mode, "size": img.size, "width": img.width, "height": img.height, "file_size_mb": image_path.stat().st_size / (1024 * 1024) } if hasattr(img, '_getexif') and img._getexif(): info["has_exif"] = True else: info["has_exif"] = False return info except Exception as e: logger.error(f"Error getting image info for {image_path}: {e}") return {"exists": False, "error": str(e)} @staticmethod def save_image(image: Image.Image, output_path: Union[str, Path], quality: int = 95, optimize: bool = True) -> bool: """Save image with specified quality""" try: output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) save_kwargs = {} ext = output_path.suffix.lower() if ext in ['.jpg', '.jpeg']: save_kwargs['quality'] = quality save_kwargs['optimize'] = optimize elif ext == '.png': save_kwargs['optimize'] = optimize image.save(str(output_path), **save_kwargs) logger.info(f"Saved image to: {output_path}") return True except Exception as e: logger.error(f"Failed to save image to {output_path}: {e}") return False # ============================================================================ # DEFAULT INSTANCES # ============================================================================ def validate_video_file(file_path: str) -> tuple: """Validate if file is a valid video file.""" import os import cv2 if not os.path.exists(file_path): return False, f"File not found: {file_path}" try: cap = cv2.VideoCapture(file_path) ret = cap.isOpened() cap.release() if ret: return True, "Video file is valid" else: return False, "Unable to open video file - may be corrupted" except Exception as e: return False, f"Error validating video: {str(e)}"