#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Core Video Processor for BackgroundFX Pro - Minimal, safe implementation used by core/app.py - Works with split/legacy loaders - Keeps exact behavior you shared; only fixes typing imports + integrates prepare_background() and related helpers. NOTE: - Requires utils.cv_processing helpers already present in your project: segment_person_hq, refine_mask_hq, replace_background_hq, create_professional_background, PROFESSIONAL_BACKGROUNDS, _create_gradient_background_local, validate_video_file """ from __future__ import annotations import os import time import shutil import logging import threading from typing import Optional, Any, Dict, Callable, Tuple, List import numpy as np import cv2 # --------------------------------------------------------------------- # Project logger (non-fatal fallback to std logging) # --------------------------------------------------------------------- try: from utils.logger import get_logger _log = get_logger("processing.video.video_processor") except Exception: logging.basicConfig(level=logging.INFO) _log = logging.getLogger("processing.video.video_processor") # --------------------------------------------------------------------- # Config type (import if available; otherwise annotations are postponed) # --------------------------------------------------------------------- try: from config.processor_config import ProcessorConfig # your project config except Exception: # keep runtime happy if only used for typing ProcessorConfig = Any # type: ignore # --------------------------------------------------------------------- # Small env helpers (use project ones if you have them) # --------------------------------------------------------------------- try: from utils.system.env_utils import env_bool as _env_bool # type: ignore from utils.system.env_utils import env_int as _env_int # type: ignore except Exception: def _env_bool(name: str, default: bool = False) -> bool: v = os.environ.get(name) if v is None: return bool(default) return str(v).strip().lower() in ("1", "true", "yes", "y", "on") def _env_int(name: str, default: int = 0) -> int: try: return int(os.environ.get(name, default)) except Exception: return int(default) # --------------------------------------------------------------------- # CV helpers from your utils module # --------------------------------------------------------------------- from utils.cv_processing import ( segment_person_hq, refine_mask_hq, replace_background_hq, create_professional_background, PROFESSIONAL_BACKGROUNDS, validate_video_file, ) # Optional local gradient helper (present in some layouts) try: from utils.cv_processing import _create_gradient_background_local # type: ignore except Exception: _create_gradient_background_local = None # type: ignore # --------------------------------------------------------------------- # Optional FFmpeg pipe; code falls back to OpenCV if unavailable # --------------------------------------------------------------------- try: from utils.video.ffmpeg_pipe import FFmpegPipe as _FFmpegPipe # type: ignore except Exception: _FFmpegPipe = None # type: ignore class CoreVideoProcessor: """ Minimal, safe implementation used by core/app.py. Orchestrates SAM2 → MatAnyone refinement → background compositing, with robust fallbacks (OpenCV writer when FFmpeg/NVENC unavailable). """ def __init__(self, config: Optional[ProcessorConfig] = None, models: Optional[Any] = None): self.log = _log self.config = config or ProcessorConfig() self.models = models if self.models is None: self.log.warning("CoreVideoProcessor initialized without a models provider; will use fallbacks.") self._ffmpeg = shutil.which("ffmpeg") # -------- Back-compat safe config flags (do not require attrs on user config) self._use_windowed = _env_bool( "MATANYONE_WINDOWED", bool(getattr(self.config, "use_windowed", False)), ) self._window_size = max(1, _env_int("MATANYONE_WINDOW", int(getattr(self.config, "window_size", 8)))) self._max_model_size = int(os.environ.get("MAX_MODEL_SIZE", getattr(self.config, "max_model_size", 1280) or 0)) or None # state for temporal smoothing self._prev_mask: Optional[np.ndarray] = None # Legacy per-frame stateful chunking (used only if windowed=False) try: self._chunk_size = max(1, int(os.environ.get("MATANYONE_CHUNK", "12"))) except Exception: self._chunk_size = 12 self._chunk_idx = 0 # ---------------- ADDED METHOD ---------------- def prepare_background(self, background_choice: str, custom_background_path: Optional[str], width: int, height: int) -> np.ndarray: """ Prepares a background image for compositing. If a valid custom background path is given, loads and resizes it. Otherwise, uses a preset. Returns: np.ndarray RGB (H, W, 3) uint8 """ from utils.cv_processing import create_professional_background if custom_background_path: try: img = cv2.imread(custom_background_path, cv2.IMREAD_COLOR) if img is not None: img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = cv2.resize(img, (width, height), interpolation=cv2.INTER_LANCZOS4) return img else: self.log.warning(f"Failed to load custom background from '{custom_background_path}', using preset.") except Exception as e: self.log.warning(f"Exception loading custom background: {e}, using preset.") # fallback to preset return create_professional_background(background_choice, width, height) # ---------- mask post-processing (stability + crispness) ---------- def _iou(self, a: np.ndarray, b: np.ndarray, thr: float = 0.5) -> float: a_bin = (a >= thr).astype(np.uint8) b_bin = (b >= thr).astype(np.uint8) inter = np.count_nonzero(cv2.bitwise_and(a_bin, b_bin)) union = np.count_nonzero(cv2.bitwise_or(a_bin, b_bin)) return (inter / union) if union else 0.0 def _harden(self, m: np.ndarray) -> np.ndarray: g = float(getattr(self.config, "mask_gamma", 0.90)) if abs(g - 1.0) > 1e-6: m = np.clip(m, 0, 1) ** g lo = float(getattr(self.config, "hard_low", 0.35)) hi = float(getattr(self.config, "hard_high", 0.70)) if hi > lo + 1e-6: m = (m - lo) / (hi - lo) m = np.clip(m, 0.0, 1.0) k = int(getattr(self.config, "dilate_px", 6)) if k > 0: se = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*k+1, 2*k+1)) m = cv2.dilate(m, se, iterations=1) eb = int(getattr(self.config, "edge_blur_px", 1)) if eb > 0: m = cv2.GaussianBlur(m, (2*eb+1, 2*eb+1), 0) return np.clip(m, 0.0, 1.0) def _stabilize(self, m: np.ndarray) -> np.ndarray: if self._prev_mask is None: self._prev_mask = m return m thr = float(getattr(self.config, "min_iou_to_accept", 0.05)) if self._iou(self._prev_mask, m, 0.5) < thr: return self._prev_mask a = float(getattr(self.config, "temporal_ema_alpha", 0.75)) m_ema = a * self._prev_mask + (1.0 - a) * m self._prev_mask = m_ema return m_ema # ---------- Single frame (fallback path) ---------- def process_frame(self, frame_bgr: np.ndarray, background_rgb: np.ndarray) -> Dict[str, Any]: H, W = frame_bgr.shape[:2] max_side = max(H, W) scale = 1.0 proc_frame_bgr = frame_bgr # Model-only downscale mms = self._max_model_size if mms and max_side > mms: scale = mms / float(max_side) newW = int(round(W * scale)) newH = int(round(H * scale)) proc_frame_bgr = cv2.resize(frame_bgr, (newW, newH), interpolation=cv2.INTER_AREA) self.log.debug(f"Model-only downscale: {W}x{H} -> {newW}x{newH} (scale={scale:.3f})") proc_frame_rgb = cv2.cvtColor(proc_frame_bgr, cv2.COLOR_BGR2RGB) predictor = None try: if self.models and hasattr(self.models, "get_sam2"): predictor = self.models.get_sam2() except Exception as e: self.log.warning(f"SAM2 predictor unavailable: {e}") mask_small = segment_person_hq(proc_frame_rgb, predictor, use_sam2=True) matanyone = None try: if self.models and hasattr(self.models, "get_matanyone"): matanyone = self.models.get_matanyone() except Exception as e: self.log.warning(f"MatAnyOne unavailable: {e}") if matanyone is not None and hasattr(matanyone, "reset") and self._chunk_idx == 0: try: matanyone.reset() except Exception: pass mask_small_ref = refine_mask_hq( proc_frame_rgb, mask_small, matanyone=matanyone, use_matanyone=True, frame_idx=self._chunk_idx, ) self._chunk_idx = (self._chunk_idx + 1) % max(1, self._chunk_size) if self._chunk_idx == 0: try: import torch if torch.cuda.is_available(): torch.cuda.empty_cache() except Exception: pass mask_small_ref = np.clip(mask_small_ref.astype(np.float32), 0.0, 1.0) mask_stable = self._stabilize(mask_small_ref) mask_stable = self._harden(mask_stable) if scale != 1.0: mask_full = cv2.resize(mask_stable, (W, H), interpolation=cv2.INTER_LINEAR) else: mask_full = mask_stable frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) out_rgb = replace_background_hq(frame_rgb, mask_full, background_rgb) out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR) return {"frame": out_bgr, "mask": mask_full} # ---------- Build background once per video ---------- def _prepare_background_from_config( self, bg_config: Optional[Dict[str, Any]], width: int, height: int ) -> np.ndarray: if bg_config and bg_config.get("custom_path"): path = bg_config["custom_path"] img_bgr = cv2.imread(path, cv2.IMREAD_COLOR) if img_bgr is None: self.log.warning(f"Custom background at '{path}' could not be read. Falling back to preset.") else: img_bgr = cv2.resize(img_bgr, (width, height), interpolation=cv2.INTER_LANCZOS4) return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) if bg_config and isinstance(bg_config.get("gradient"), dict) and _create_gradient_background_local: try: return _create_gradient_background_local(bg_config["gradient"], width, height) except Exception as e: self.log.warning(f"Gradient generation failed: {e}. Falling back to preset.") choice = None if bg_config and "background_choice" in bg_config: choice = bg_config["background_choice"] if not choice: choice = getattr(self.config, "background_preset", "office") if choice not in PROFESSIONAL_BACKGROUNDS: self.log.warning(f"Unknown background preset '{choice}'; using 'office'.") choice = "office" return create_professional_background(choice, width, height) # RGB # ---------- Windowed two-phase helpers ---------- def _model_downscale(self, frame_bgr: np.ndarray) -> Tuple[np.ndarray, float]: H, W = frame_bgr.shape[:2] max_side = max(H, W) mms = self._max_model_size if mms and max_side > mms: s = mms / float(max_side) newW = int(round(W * s)) newH = int(round(H * s)) small = cv2.resize(frame_bgr, (newW, newH), interpolation=cv2.INTER_AREA) return small, s return frame_bgr, 1.0 def _prepare_sam2_gpu(self, predictor): try: import torch if predictor is None or not torch.cuda.is_available(): return if hasattr(predictor, "to"): try: predictor.to("cuda") # type: ignore[attr-defined] return except Exception: pass if hasattr(predictor, "model") and hasattr(predictor.model, "to"): try: predictor.model.to("cuda") # type: ignore[attr-defined] except Exception: pass except Exception: pass def _release_sam2_gpu(self, predictor): try: if predictor is None: return for name in ("reset_image", "release_image", "clear_image", "clear_state"): if hasattr(predictor, name) and callable(getattr(predictor, name)): try: getattr(predictor, name)() except Exception: pass for name in ("to", "cpu"): if hasattr(predictor, name): try: if name == "to": predictor.to("cpu") # type: ignore[attr-defined] else: predictor.cpu() # type: ignore[attr-defined] except Exception: pass except Exception: pass try: import torch if torch.cuda.is_available(): torch.cuda.empty_cache() except Exception: pass # ---------- Full video ---------- def process_video( self, input_path: str, output_path: str, bg_config: Optional[Dict[str, Any]] = None, progress_callback: Optional[Callable[[int, int, float], None]] = None, stop_event: Optional[threading.Event] = None ) -> Dict[str, Any]: ok, msg = validate_video_file(input_path) if not ok: raise ValueError(f"Invalid or unreadable video: {msg}") cap = cv2.VideoCapture(input_path) if not cap.isOpened(): raise RuntimeError(f"Could not open video: {input_path}") width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps_out = getattr(self.config, "write_fps", None) or (fps if fps and fps > 0 else 25.0) background_rgb = self._prepare_background_from_config(bg_config, width, height) self._prev_mask = None ffmpeg_pipe: _FFmpegPipe | None = None # type: ignore writer: cv2.VideoWriter | None = None ffmpeg_failed_reason = None if getattr(self.config, "use_nvenc", True) and shutil.which("ffmpeg") and _FFmpegPipe is not None: try: ffmpeg_pipe = _FFmpegPipe(width, height, float(fps_out), output_path, self.config, log=self.log) # type: ignore except Exception as e: ffmpeg_failed_reason = str(e) self.log.warning("FFmpeg NVENC pipeline unavailable. Falling back to OpenCV. Reason: %s", e) if ffmpeg_pipe is None: fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height)) if not writer.isOpened(): cap.release() raise RuntimeError(f"Could not open VideoWriter for: {output_path}") predictor = None matanyone = None try: if self.models and hasattr(self.models, "get_sam2"): predictor = self.models.get_sam2() except Exception as e: self.log.warning(f"SAM2 predictor unavailable: {e}") try: if self.models and hasattr(self.models, "get_matanyone"): matanyone = self.models.get_matanyone() except Exception as e: self.log.warning(f"MatAnyOne unavailable: {e}") use_windowed = bool(self._use_windowed and predictor is not None and matanyone is not None) frame_count = 0 start_time = time.time() try: if not use_windowed: while True: ret, frame_bgr = cap.read() if not ret: break if stop_event is not None and stop_event.is_set(): self.log.info("Processing stopped by user request.") break result = self.process_frame(frame_bgr, background_rgb) out_bgr = np.ascontiguousarray(result["frame"]) if ffmpeg_pipe is not None: try: ffmpeg_pipe.write(out_bgr) # type: ignore[attr-defined] except Exception as e: self.log.warning("Switching to OpenCV writer after FFmpeg error at frame %d: %s", frame_count, e) try: ffmpeg_pipe.close() # type: ignore[attr-defined] except Exception: pass ffmpeg_pipe = None if writer is None: fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height)) if not writer.isOpened(): raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}") writer.write(out_bgr) else: writer.write(out_bgr) frame_count += 1 if progress_callback: elapsed = time.time() - start_time fps_live = frame_count / elapsed if elapsed > 0 else 0.0 try: progress_callback(frame_count, total_frames, fps_live) except Exception: pass else: WINDOW = max(1, int(self._window_size)) while True: frames_bgr: List[np.ndarray] = [] for _ in range(WINDOW): ret, fr = cap.read() if not ret: break frames_bgr.append(fr) if not frames_bgr: break if stop_event is not None and stop_event.is_set(): self.log.info("Processing stopped by user request.") break frames_small_bgr: List[np.ndarray] = [] scales: List[float] = [] for fr in frames_bgr: fr_small, s = self._model_downscale(fr) frames_small_bgr.append(fr_small) scales.append(s) scale = scales[0] if scales else 1.0 frames_small_rgb = [cv2.cvtColor(fb, cv2.COLOR_BGR2RGB) for fb in frames_small_bgr] self._prepare_sam2_gpu(predictor) try: mask_small = segment_person_hq(frames_small_rgb[0], predictor, use_sam2=True) except Exception as e: self.log.warning(f"SAM2 segmentation error on window start: {e}") mask_small = segment_person_hq(frames_small_rgb[0], None, use_sam2=False) self._release_sam2_gpu(predictor) if hasattr(matanyone, "reset"): try: matanyone.reset() except Exception: pass for j, fr_rgb_small in enumerate(frames_small_rgb): try: if j == 0: m2d = mask_small if m2d.ndim == 3: m2d = m2d[..., 0] alpha_small = matanyone(fr_rgb_small, m2d) else: alpha_small = matanyone(fr_rgb_small) alpha_small = np.clip(alpha_small.astype(np.float32), 0.0, 1.0) alpha_stable = self._stabilize(alpha_small) alpha_harden = self._harden(alpha_stable) if scale != 1.0: H, W = frames_bgr[j].shape[:2] alpha_full = cv2.resize(alpha_harden, (W, H), interpolation=cv2.INTER_LINEAR) else: alpha_full = alpha_harden frame_rgb_full = cv2.cvtColor(frames_bgr[j], cv2.COLOR_BGR2RGB) out_rgb = replace_background_hq(frame_rgb_full, alpha_full, background_rgb) out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR) out_bgr = np.ascontiguousarray(out_bgr) if ffmpeg_pipe is not None: try: ffmpeg_pipe.write(out_bgr) # type: ignore[attr-defined] except Exception as e: self.log.warning("Switching to OpenCV writer after FFmpeg error at frame %d: %s", frame_count, e) try: ffmpeg_pipe.close() # type: ignore[attr-defined] except Exception: pass ffmpeg_pipe = None if writer is None: fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height)) if not writer.isOpened(): raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}") writer.write(out_bgr) else: writer.write(out_bgr) frame_count += 1 except Exception as e: self.log.warning(f"MatAnyone failed at window frame {j}: {e}") if j == 0: alpha_small_fb = np.clip(mask_small.astype(np.float32), 0.0, 1.0) else: alpha_small_fb = self._prev_mask if self._prev_mask is not None else np.zeros_like(alpha_small, dtype=np.float32) if scale != 1.0: H, W = frames_bgr[j].shape[:2] alpha_full_fb = cv2.resize(alpha_small_fb, (W, H), interpolation=cv2.INTER_LINEAR) else: alpha_full_fb = alpha_small_fb frame_rgb_full = cv2.cvtColor(frames_bgr[j], cv2.COLOR_BGR2RGB) out_rgb_fb = replace_background_hq(frame_rgb_full, alpha_full_fb, background_rgb) out_bgr_fb = cv2.cvtColor(out_rgb_fb, cv2.COLOR_RGB2BGR) if ffmpeg_pipe is not None: try: ffmpeg_pipe.write(np.ascontiguousarray(out_bgr_fb)) # type: ignore[attr-defined] except Exception: try: ffmpeg_pipe.close() # type: ignore[attr-defined] except Exception: pass ffmpeg_pipe = None if writer is None: fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height)) if not writer.isOpened(): raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}") writer.write(np.ascontiguousarray(out_bgr_fb)) else: writer.write(np.ascontiguousarray(out_bgr_fb)) frame_count += 1 if progress_callback: elapsed = time.time() - start_time fps_live = frame_count / elapsed if elapsed > 0 else 0.0 try: progress_callback(frame_count, total_frames, fps_live) except Exception: pass del frames_bgr, frames_small_bgr, frames_small_rgb, mask_small try: import torch if torch.cuda.is_available(): torch.cuda.empty_cache() except Exception: pass finally: cap.release() if writer is not None: writer.release() if ffmpeg_pipe is not None: try: ffmpeg_pipe.close() # type: ignore[attr-defined] except Exception: pass if ffmpeg_failed_reason: self.log.info("Completed via OpenCV writer (FFmpeg initially failed): %s", ffmpeg_failed_reason) self.log.info("Processed %d frames → %s", frame_count, output_path) return { "frames": frame_count, "width": width, "height": height, "fps_out": float(fps_out), "output_path": output_path, }