#!/usr/bin/env python3 """ BackgroundFX Pro – Main Application Entry Point Refactored modular architecture – orchestrates specialised components """ from __future__ import annotations # ── Critical environment defaults (set before any imports) ──────────────── import os # Set critical defaults directly - HF now supports underscores in env vars os.environ.setdefault("OMP_NUM_THREADS", "2") os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "max_split_size_mb:128") os.environ.setdefault("LOG_LEVEL", "info") os.environ.setdefault("APP_ENV", "production") # Ensure reasonable cache defaults if not set from pathlib import Path _base_cache = Path.home() / ".cache" os.environ.setdefault("HF_HOME", str(_base_cache / "huggingface")) os.environ.setdefault("TRANSFORMERS_CACHE", str(_base_cache / "huggingface" / "hub")) os.environ.setdefault("TORCH_HOME", str(_base_cache / "torch")) # Synthesize CLOUDINARY_URL from parts if missing def _ensure_cloudinary_url(): if os.getenv("CLOUDINARY_URL"): return key = os.getenv("CLOUDINARY_API_KEY") sec = os.getenv("CLOUDINARY_API_SECRET") name = os.getenv("CLOUDINARY_CLOUD_NAME") if key and sec and name: os.environ["CLOUDINARY_URL"] = f"cloudinary://{key}:{sec}@{name}" _ensure_cloudinary_url() # If you use early_env in your project, keep this import (harmless if absent) try: import early_env # sets OMP/MKL/OPENBLAS + torch threads safely except Exception: pass import logging import threading import traceback import sys import time from typing import Optional, Tuple, Dict, Any, Callable # ── Logging ────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("core.app") # ── Ensure project root importable ─────────────────────────────────────────── PROJECT_FILE = Path(__file__).resolve() CORE_DIR = PROJECT_FILE.parent ROOT = CORE_DIR.parent if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) # Create loader directories if they don't exist loaders_dir = ROOT / "models" / "loaders" loaders_dir.mkdir(parents=True, exist_ok=True) # ── Gradio schema patch (HF quirk) ─────────────────────────────────────────── try: import gradio_client.utils as gc_utils _orig_get_type = gc_utils.get_type def _patched_get_type(schema): if not isinstance(schema, dict): if isinstance(schema, bool): return "boolean" if isinstance(schema, str): return "string" if isinstance(schema, (int, float)): return "number" return "string" return _orig_get_type(schema) gc_utils.get_type = _patched_get_type logger.info("Gradio schema patch applied") except Exception as e: logger.warning(f"Gradio patch failed: {e}") # ── Core config + components ───────────────────────────────────────────────── try: from config.app_config import get_config except ImportError: # Dummy if missing class DummyConfig: def to_dict(self): return {} get_config = lambda: DummyConfig() from utils.hardware.device_manager import DeviceManager from utils.system.memory_manager import MemoryManager # Try to import the new split loaders first, fall back to old if needed try: from models.loaders.model_loader import ModelLoader logger.info("Using split loader architecture") except ImportError: logger.warning("Split loaders not found, using legacy loader") # Fall back to old loader if split architecture isn't available yet from models.model_loader import ModelLoader # type: ignore from processing.video.video_processor import CoreVideoProcessor from processing.audio.audio_processor import AudioProcessor from utils.monitoring.progress_tracker import ProgressTracker from utils.cv_processing import validate_video_file # ── Optional Two-Stage import ──────────────────────────────────────────────── TWO_STAGE_AVAILABLE = False TWO_STAGE_IMPORT_ORIGIN = "" TWO_STAGE_IMPORT_ERROR = "" CHROMA_PRESETS: Dict[str, Dict[str, Any]] = {"standard": {}} TwoStageProcessor = None # type: ignore # Try multiple import paths for two-stage processor two_stage_paths = [ "processors.two_stage", # Your fixed version "processing.two_stage.two_stage_processor", "processing.two_stage", ] for import_path in two_stage_paths: try: exec(f"from {import_path} import TwoStageProcessor, CHROMA_PRESETS") TWO_STAGE_AVAILABLE = True TWO_STAGE_IMPORT_ORIGIN = import_path logger.info(f"Two-stage import OK ({import_path})") break except Exception as e: TWO_STAGE_IMPORT_ERROR = str(e) continue if not TWO_STAGE_AVAILABLE: logger.warning(f"Two-stage import FAILED from all paths: {TWO_STAGE_IMPORT_ERROR}") # ── Quiet startup self-check (async by default) ────────────────────────────── # Place the helper in tools/startup_selfcheck.py (with tools/__init__.py present) try: from tools.startup_selfcheck import schedule_startup_selfcheck except Exception: schedule_startup_selfcheck = None # graceful if the helper isn't shipped # Dummy exceptions if core.exceptions not available class ModelLoadingError(Exception): pass class VideoProcessingError(Exception): pass # ╔══════════════════════════════════════════════════════════════════════════╗ # ║ VideoProcessor class ║ # ╚══════════════════════════════════════════════════════════════════════════╝ class VideoProcessor: """ Main orchestrator – coordinates all specialised components. """ def __init__(self): self.config = get_config() self._patch_config_defaults(self.config) # avoid AttributeError on older configs self.device_manager = DeviceManager() self.memory_manager = MemoryManager(self.device_manager.get_optimal_device()) self.model_loader = ModelLoader(self.device_manager, self.memory_manager) self.audio_processor = AudioProcessor() self.core_processor: Optional[CoreVideoProcessor] = None self.two_stage_processor: Optional[Any] = None self.models_loaded = False self.loading_lock = threading.Lock() self.cancel_event = threading.Event() self.progress_tracker: Optional[ProgressTracker] = None logger.info(f"VideoProcessor on device: {self.device_manager.get_optimal_device()}") # ── Config hardening: add missing fields safely ─────────────────────────── @staticmethod def _patch_config_defaults(cfg: Any) -> None: defaults = { # video / i/o "use_nvenc": False, "prefer_mp4": True, "video_codec": "mp4v", "audio_copy": True, "ffmpeg_path": "ffmpeg", # model/resource guards "max_model_size": 0, "max_model_size_bytes": 0, # housekeeping "output_dir": str((Path(__file__).resolve().parent.parent) / "outputs"), # MatAnyone settings to ensure it's enabled "matanyone_enabled": True, "use_matanyone": True, } for k, v in defaults.items(): if not hasattr(cfg, k): setattr(cfg, k, v) Path(cfg.output_dir).mkdir(parents=True, exist_ok=True) # ── Progress helper ─────────────────────────────────────────────────────── def _init_progress(self, video_path: str, cb: Optional[Callable] = None): try: import cv2 cap = cv2.VideoCapture(video_path) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() if total <= 0: total = 100 self.progress_tracker = ProgressTracker(total, cb) except Exception as e: logger.warning(f"Progress init failed: {e}") self.progress_tracker = ProgressTracker(100, cb) # ── Model loading ───────────────────────────────────────────────────────── def load_models(self, progress_callback: Optional[Callable] = None) -> str: with self.loading_lock: if self.models_loaded: return "Models already loaded and validated" try: self.cancel_event.clear() if progress_callback: progress_callback(0.0, f"Loading on {self.device_manager.get_optimal_device()}") sam2_loaded, mat_loaded = self.model_loader.load_all_models( progress_callback=progress_callback, cancel_event=self.cancel_event ) if self.cancel_event.is_set(): return "Model loading cancelled" # Get the actual models sam2_predictor = sam2_loaded.model if sam2_loaded else None mat_model = mat_loaded.model if mat_loaded else None # NOTE: stateful callable adapter # Initialize core processor self.core_processor = CoreVideoProcessor(config=self.config, models=self.model_loader) # Initialize two-stage processor if available self.two_stage_processor = None if TWO_STAGE_AVAILABLE and TwoStageProcessor and (sam2_predictor or mat_model): try: self.two_stage_processor = TwoStageProcessor( sam2_predictor=sam2_predictor, matanyone_model=mat_model ) logger.info("Two-stage processor initialised") except Exception as e: logger.warning(f"Two-stage init failed: {e}") self.two_stage_processor = None self.models_loaded = True msg = self.model_loader.get_load_summary() # Add status about processors if self.two_stage_processor: msg += "\n✅ Two-stage processor ready" else: msg += "\n⚠️ Two-stage processor not available" if mat_model: msg += "\n✅ MatAnyone refinement active" else: msg += "\n⚠️ MatAnyone not loaded (edges may be rough)" logger.info(msg) return msg except (AttributeError, ModelLoadingError) as e: self.models_loaded = False err = f"Model loading failed: {e}" logger.error(err) return err except Exception as e: self.models_loaded = False err = f"Unexpected error during model loading: {e}" logger.error(f"{err}\n{traceback.format_exc()}") return err # ── Public entry – process video ───────────────────────────────────────── def process_video( self, video_path: str, background_choice: str, custom_background_path: Optional[str] = None, progress_callback: Optional[Callable] = None, use_two_stage: bool = False, chroma_preset: str = "standard", key_color_mode: str = "auto", preview_mask: bool = False, preview_greenscreen: bool = False, ) -> Tuple[Optional[str], Optional[str], str]: # ===== BACKGROUND PATH DEBUG & FIX ===== logger.info("=" * 60) logger.info("BACKGROUND PATH DEBUGGING") logger.info(f"background_choice: {background_choice}") logger.info(f"custom_background_path type: {type(custom_background_path)}") logger.info(f"custom_background_path value: {custom_background_path}") # Fix 1: Handle if Gradio sends a dict if isinstance(custom_background_path, dict): original = custom_background_path custom_background_path = custom_background_path.get('name') or custom_background_path.get('path') logger.info(f"Extracted path from dict: {original} -> {custom_background_path}") # Fix 2: Handle PIL Image objects try: from PIL import Image if isinstance(custom_background_path, Image.Image): import tempfile with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp: custom_background_path.save(tmp.name) custom_background_path = tmp.name logger.info(f"Saved PIL Image to: {custom_background_path}") except ImportError: pass # Fix 3: Verify file exists when using custom background if background_choice == "custom" or custom_background_path: if custom_background_path: if Path(custom_background_path).exists(): logger.info(f"✅ Background file exists: {custom_background_path}") else: logger.warning(f"⚠️ Background file does not exist: {custom_background_path}") # Try to find it in Gradio temp directories import glob patterns = [ "/tmp/gradio*/**/*.jpg", "/tmp/gradio*/**/*.jpeg", "/tmp/gradio*/**/*.png", "/tmp/**/*.jpg", "/tmp/**/*.jpeg", "/tmp/**/*.png", ] for pattern in patterns: files = glob.glob(pattern, recursive=True) if files: # Get the most recent file newest = max(files, key=os.path.getmtime) logger.info(f"Found potential background: {newest}") # Only use it if it was created in the last 5 minutes if (time.time() - os.path.getmtime(newest)) < 300: custom_background_path = newest logger.info(f"✅ Using recent temp file: {custom_background_path}") break else: logger.error("❌ Custom background mode but path is None!") logger.info(f"Final custom_background_path: {custom_background_path}") logger.info("=" * 60) if not self.models_loaded or not self.core_processor: return None, None, "Models not loaded. Please click 'Load Models' first." if self.cancel_event.is_set(): return None, None, "Processing cancelled" self._init_progress(video_path, progress_callback) ok, why = validate_video_file(video_path) if not ok: return None, None, f"Invalid video: {why}" try: # Log which mode we're using mode = "two-stage" if use_two_stage else "single-stage" matanyone_status = "enabled" if self.model_loader.get_matanyone() else "disabled" logger.info(f"Processing video in {mode} mode, MatAnyone: {matanyone_status}") # IMPORTANT: start each video with a clean MatAnyone memory self._reset_matanyone_session() if use_two_stage: if not TWO_STAGE_AVAILABLE or self.two_stage_processor is None: return None, None, "Two-stage processing not available" final, green, msg = self._process_two_stage( video_path, background_choice, custom_background_path, progress_callback, chroma_preset, key_color_mode, ) return final, green, msg else: final, green, msg = self._process_single_stage( video_path, background_choice, custom_background_path, progress_callback, preview_mask, preview_greenscreen, ) return final, green, msg except VideoProcessingError as e: logger.error(f"Processing failed: {e}") return None, None, f"Processing failed: {e}" except Exception as e: logger.error(f"Unexpected processing error: {e}\n{traceback.format_exc()}") return None, None, f"Unexpected error: {e}" # ── Private – per-video MatAnyone reset ────────────────────────────────── def _reset_matanyone_session(self): """ Ensure a fresh MatAnyone memory per video. The MatAnyone loader we use returns a callable *stateful adapter*. If present, reset() clears its InferenceCore memory. """ try: mat = self.model_loader.get_matanyone() except Exception: mat = None if mat is not None and hasattr(mat, "reset") and callable(mat.reset): try: mat.reset() logger.info("MatAnyone session reset for new video") except Exception as e: logger.warning(f"MatAnyone session reset failed (continuing): {e}") # ── Private – single-stage ─────────────────────────────────────────────── def _process_single_stage( self, video_path: str, background_choice: str, custom_background_path: Optional[str], progress_callback: Optional[Callable], preview_mask: bool, preview_greenscreen: bool, ) -> Tuple[Optional[str], Optional[str], str]: # Additional debug logging for single-stage logger.info(f"[Single-stage] background_choice: {background_choice}") logger.info(f"[Single-stage] custom_background_path: {custom_background_path}") ts = int(time.time()) out_dir = Path(self.config.output_dir) / "single_stage" out_dir.mkdir(parents=True, exist_ok=True) out_path = str(out_dir / f"processed_{ts}.mp4") # Process video via your CoreVideoProcessor result = self.core_processor.process_video( input_path=video_path, output_path=out_path, bg_config={ "background_choice": background_choice, "custom_path": custom_background_path, }, progress_callback=progress_callback, ) if not result: return None, None, "Video processing failed" # Mux audio unless preview-only if not (preview_mask or preview_greenscreen): try: final_path = self.audio_processor.add_audio_to_video( original_video=video_path, processed_video=out_path ) except Exception as e: logger.warning(f"Audio mux failed, returning video without audio: {e}") final_path = out_path else: final_path = out_path # Build status message try: mat_loaded = bool(self.model_loader.get_matanyone()) except Exception: mat_loaded = False matanyone_status = "✓" if mat_loaded else "✗" msg = ( "Processing completed.\n" f"Frames: {result.get('frames', 'unknown')}\n" f"Background: {background_choice}\n" f"Mode: Single-stage\n" f"MatAnyone: {matanyone_status}\n" f"Device: {self.device_manager.get_optimal_device()}" ) return final_path, None, msg # No green in single-stage # ── Private – two-stage ───────────────────────────────────────────────── def _process_two_stage( self, video_path: str, background_choice: str, custom_background_path: Optional[str], progress_callback: Optional[Callable], chroma_preset: str, key_color_mode: str, ) -> Tuple[Optional[str], Optional[str], str]: if self.two_stage_processor is None: return None, None, "Two-stage processor not available" # Additional debug logging for two-stage logger.info(f"[Two-stage] background_choice: {background_choice}") logger.info(f"[Two-stage] custom_background_path: {custom_background_path}") import cv2 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, None, "Could not open input video" w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 1280 h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 720 cap.release() # Prepare background try: background = self.core_processor.prepare_background( background_choice, custom_background_path, w, h ) except Exception as e: logger.error(f"Background preparation failed: {e}") return None, None, f"Failed to prepare background: {e}" if background is None: return None, None, "Failed to prepare background" ts = int(time.time()) out_dir = Path(self.config.output_dir) / "two_stage" out_dir.mkdir(parents=True, exist_ok=True) final_out = str(out_dir / f"final_{ts}.mp4") chroma_cfg = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS.get("standard", {})) logger.info(f"Two-stage with preset: {chroma_preset} | key_color: {key_color_mode}") # (Per-video reset already called in process_video) final_path, green_path, stage2_msg = self.two_stage_processor.process_full_pipeline( video_path, background, final_out, key_color_mode=key_color_mode, chroma_settings=chroma_cfg, progress_callback=progress_callback, ) if final_path is None: return None, None, stage2_msg # Mux audio try: final_with_audio = self.audio_processor.add_audio_to_video( original_video=video_path, processed_video=final_path ) except Exception as e: logger.warning(f"Audio mux failed: {e}") final_with_audio = final_path try: mat_loaded = bool(self.model_loader.get_matanyone()) except Exception: mat_loaded = False matanyone_status = "✓" if mat_loaded else "✗" msg = ( "Two-stage processing completed.\n" f"Background: {background_choice}\n" f"Chroma Preset: {chroma_preset}\n" f"MatAnyone: {matanyone_status}\n" f"Device: {self.device_manager.get_optimal_device()}" ) return final_with_audio, green_path, msg # ── Status helpers ─────────────────────────────────────────────────────── def get_status(self) -> Dict[str, Any]: status = { "models_loaded": self.models_loaded, "two_stage_available": bool(TWO_STAGE_AVAILABLE and self.two_stage_processor), "two_stage_origin": TWO_STAGE_IMPORT_ORIGIN or "", "device": str(self.device_manager.get_optimal_device()), "core_processor_loaded": self.core_processor is not None, "config": self._safe_config_dict(), "memory_usage": self._safe_memory_usage(), } try: status["sam2_loaded"] = self.model_loader.get_sam2() is not None status["matanyone_loaded"] = self.model_loader.get_matanyone() is not None status["model_info"] = self.model_loader.get_model_info() except Exception: status["sam2_loaded"] = False status["matanyone_loaded"] = False if self.progress_tracker: status["progress"] = self.progress_tracker.get_all_progress() return status def _safe_config_dict(self) -> Dict[str, Any]: try: return self.config.to_dict() except Exception: keys = ["use_nvenc", "prefer_mp4", "video_codec", "audio_copy", "ffmpeg_path", "max_model_size", "max_model_size_bytes", "output_dir", "matanyone_enabled"] return {k: getattr(self.config, k, None) for k in keys} def _safe_memory_usage(self) -> Dict[str, Any]: try: return self.memory_manager.get_memory_usage() except Exception: return {} def cancel_processing(self): self.cancel_event.set() logger.info("Cancellation requested") def cleanup_resources(self): try: self.memory_manager.cleanup_aggressive() except Exception: pass try: self.model_loader.cleanup() except Exception: pass logger.info("Resources cleaned up") # ── Singleton + thin wrappers (used by UI callbacks) ──────────────────────── processor = VideoProcessor() def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str: return processor.load_models(progress_callback) def process_video_fixed( video_path: str, background_choice: str, custom_background_path: Optional[str], progress_callback: Optional[Callable] = None, use_two_stage: bool = False, chroma_preset: str = "standard", key_color_mode: str = "auto", preview_mask: bool = False, preview_greenscreen: bool = False, ) -> Tuple[Optional[str], Optional[str], str]: return processor.process_video( video_path, background_choice, custom_background_path, progress_callback, use_two_stage, chroma_preset, key_color_mode, preview_mask, preview_greenscreen, ) def get_model_status() -> Dict[str, Any]: return processor.get_status() def get_cache_status() -> Dict[str, Any]: return processor.get_status() PROCESS_CANCELLED = processor.cancel_event # ── CLI entrypoint (must exist; app.py imports main) ───────────────────────── def main(): try: logger.info("Starting BackgroundFX Pro") logger.info(f"Device: {processor.device_manager.get_optimal_device()}") logger.info(f"Two-stage available: {TWO_STAGE_AVAILABLE}") # 🔹 Quiet model self-check (defaults to async; set SELF_CHECK_MODE=sync to block) if schedule_startup_selfcheck is not None: try: schedule_startup_selfcheck(mode=os.getenv("SELF_CHECK_MODE", "async")) except Exception as e: logger.error(f"Startup self-check skipped: {e}", exc_info=True) # Log model loader type try: from models.loaders.model_loader import ModelLoader logger.info("Using split loader architecture") except Exception: logger.info("Using legacy loader") # FIXED: Move UI import inside main() to avoid circular dependency # and add better error handling try: # Import here to break circular dependency from ui import ui_components # Now get the create_interface function if hasattr(ui_components, 'create_interface'): create_interface = ui_components.create_interface else: logger.error("create_interface not found in ui_components") logger.error(f"Available attributes: {dir(ui_components)}") raise ImportError("create_interface function not found") except ImportError as e: logger.error(f"Failed to import UI components: {e}") import traceback traceback.print_exc() # Try alternate import method try: logger.info("Trying alternate import method...") import importlib ui_components = importlib.import_module('ui.ui_components') create_interface = getattr(ui_components, 'create_interface') logger.info("Alternate import successful") except Exception as e2: logger.error(f"Alternate import also failed: {e2}") logger.info("System initialized but UI unavailable. Exiting.") return # Create and launch the interface try: demo = create_interface() demo.queue().launch( server_name="0.0.0.0", server_port=7860, show_error=True, debug=False, ) except Exception as e: logger.error(f"Failed to launch Gradio interface: {e}") import traceback traceback.print_exc() except Exception as e: logger.error(f"Fatal error in main: {e}") import traceback traceback.print_exc() finally: processor.cleanup_resources() if __name__ == "__main__": main()