""" Minimal HF Space app: Video background replacement with optional MatAnyone, CPU-safe fallback. Based on huggingface_space_setup/app_fixed.py but makes torch optional for Spaces. """ import os import sys import tempfile from pathlib import Path import cv2 import numpy as np from PIL import Image from moviepy.editor import VideoFileClip import gradio as gr # ----------------------------------------------------------------------------- # Make vendored third-party sources importable (if present) # Expected layout: # hf_space/third_party/sam2/ <- from facebookresearch/segment-anything-2 (sam2 folder only) # hf_space/third_party/matanyone/ <- from pq-yang/MatAnyone (package root) # ----------------------------------------------------------------------------- BASE_DIR = Path(__file__).resolve().parent TP_DIR = BASE_DIR / "third_party" VENDORED_PATHS = [TP_DIR / "sam2", TP_DIR / "matanyone"] for p in VENDORED_PATHS: try: if str(p) not in sys.path: sys.path.insert(0, str(p)) except Exception: pass # Optional SAM2 (Segment Anything 2) Tiny support via env # Some Spaces disallow underscores in variable NAMES; accept aliases without underscores. SAM2_CONFIG = os.getenv("SAM2_CONFIG") or os.getenv("SAM2CONFIG") # e.g. "Configs/sam2_hiera_tiny.yaml" SAM2_CHECKPOINT = os.getenv("SAM2_CHECKPOINT") or os.getenv("SAM2CHECKPOINT") # e.g. "checkpoints/sam2_hiera_tiny.pt" SAM2_AVAILABLE = False SAM2_ERR = None SAM2_PREDICTOR = None if SAM2_CONFIG and SAM2_CHECKPOINT: try: # API may differ; wrap in try to avoid breaking Space from sam2.sam2_image_predictor import SAM2ImagePredictor # type: ignore from sam2.build_sam import build_sam2 # type: ignore def _init_sam2(): model = build_sam2(SAM2_CONFIG, SAM2_CHECKPOINT, device="cuda" if torch.cuda.is_available() else "cpu") # type: ignore predictor = SAM2ImagePredictor(model) return predictor SAM2_PREDICTOR = _init_sam2() SAM2_AVAILABLE = True print("SAM2 initialized") except Exception as e: SAM2_ERR = str(e) SAM2_AVAILABLE = False print(f"SAM2 not available: {SAM2_ERR}") # torch is optional on Spaces CPU try: import torch # type: ignore TORCH_AVAILABLE = True CUDA_AVAILABLE = torch.cuda.is_available() except Exception: TORCH_AVAILABLE = False CUDA_AVAILABLE = False # Try to import MatAnyone - fallback to rembg if not available try: from matanyone import InferenceCore # type: ignore MATANYONE_AVAILABLE = True print("MatAnyone available") except Exception: from rembg import remove # type: ignore MATANYONE_AVAILABLE = False print("MatAnyone not available - using rembg fallback") class MatAnyoneProcessor: def __init__(self): self.processor = None # Only consider CUDA if torch is present self.device = "cuda" if (CUDA_AVAILABLE) else "cpu" self.initialized = False def initialize(self): if not MATANYONE_AVAILABLE: return False try: print(f"Initializing MatAnyone on {self.device}...") self.processor = InferenceCore() self.initialized = True return True except Exception as e: print(f"MatAnyone init failed: {e}") return False def create_simple_mask(self, video_path, output_path): cap = cv2.VideoCapture(video_path) ret, frame = cap.read() cap.release() if not ret: raise ValueError("Could not read video frame") # Try SAM2 Tiny if configured if SAM2_AVAILABLE and SAM2_PREDICTOR is not None: try: # SAM2 expects RGB rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) SAM2_PREDICTOR.set_image(rgb) # type: ignore[attr-defined] # Simple prompt-free approach: coarse mask of central box using some heuristic. # If API differs, this block will fall back to rectangle mask below. h, w = rgb.shape[:2] cx0, cy0 = int(w*0.25), int(h*0.2) cx1, cy1 = int(w*0.75), int(h*0.85) # Some SAM2 APIs use point or box prompts; we try a box if available. mask = None try: masks, _, _ = SAM2_PREDICTOR.predict( # type: ignore[attr-defined] box=np.array([cx0, cy0, cx1, cy1])[None, :] ) if isinstance(masks, (list, tuple)) and len(masks) > 0: mask = (masks[0].astype(np.uint8) * 255) elif hasattr(masks, 'shape'): mask = (masks.astype(np.uint8) * 255) except Exception: pass if mask is not None: cv2.imwrite(output_path, mask) return output_path except Exception as e: print(f"SAM2 mask fallback due to error: {e}") # Fallback: simple central rectangle mask h, w = frame.shape[:2] mask = np.zeros((h, w), dtype=np.uint8) mx, my = int(w * 0.2), int(h * 0.15) mask[my:h - my, mx:w - mx] = 255 cv2.imwrite(output_path, mask) return output_path def process_video(self, input_path, output_dir): if not self.initialized and not self.initialize(): return None try: mask_path = os.path.join(output_dir, "mask.png") self.create_simple_mask(input_path, mask_path) fg_path, alpha_path = self.processor.process_video( # type: ignore[attr-defined] input_path=input_path, mask_path=mask_path, output_path=output_dir, max_size=1080, save_frames=False, ) return alpha_path except Exception as e: print(f"MatAnyone processing failed: {e}") return None matanyone_processor = MatAnyoneProcessor() def process_frame_rembg(frame, bg_img): frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_im = Image.fromarray(frame_rgb) result = remove(pil_im).convert("RGBA") # type: ignore[name-defined] result_np = np.array(result) if result_np.shape[2] == 4: alpha = result_np[:, :, 3:4] / 255.0 composite = alpha * result_np[:, :, :3] + (1 - alpha) * bg_img result_np = composite.astype(np.uint8) else: result_np = result_np.astype(np.uint8) return result_np def process_frame_matanyone(alpha_frame, original_frame, bg_img): try: alpha = alpha_frame[:, :, 0:1] / 255.0 composite = alpha * original_frame + (1 - alpha) * bg_img return composite.astype(np.uint8) except Exception as e: print(f"MatAnyone frame failed: {e}") return process_frame_rembg(original_frame, bg_img) def process_video_matanyone(video_path, bg_image_path): bg_img = cv2.imread(bg_image_path) if bg_img is None: raise ValueError("Could not load background image") bg_img = cv2.cvtColor(bg_img, cv2.COLOR_BGR2RGB) cap = cv2.VideoCapture(video_path) ret, frame = cap.read() if not ret: cap.release() raise ValueError("Could not read the input video") h, w, _ = frame.shape cap.release() bg_img = cv2.resize(bg_img, (w, h)) with tempfile.TemporaryDirectory() as temp_dir: alpha_video_path = matanyone_processor.process_video(video_path, temp_dir) if alpha_video_path is None: return process_video_rembg(video_path, bg_image_path) original_clip = VideoFileClip(video_path) alpha_clip = VideoFileClip(alpha_video_path) total_frames = original_clip.reader.nframes progress_counter = {"count": 0} def process_func(get_frame, t): progress_counter["count"] += 1 original_frame = get_frame(t) alpha_frame = alpha_clip.get_frame(t) original_uint8 = (original_frame * 255).astype(np.uint8) alpha_uint8 = (alpha_frame * 255).astype(np.uint8) processed = process_frame_matanyone(alpha_uint8, original_uint8, bg_img) return processed.astype(np.float32) / 255 new_clip = original_clip.fl(process_func) output_path = "matanyone_output.mp4" new_clip.write_videofile(output_path, audio=False, logger=None) alpha_clip.close() original_clip.close() return output_path def process_video_rembg(video_path, bg_image_path): bg_img = cv2.imread(bg_image_path) if bg_img is None: raise ValueError("Could not load background image") bg_img = cv2.cvtColor(bg_img, cv2.COLOR_BGR2RGB) cap = cv2.VideoCapture(video_path) ret, frame = cap.read() if not ret: cap.release() raise ValueError("Could not read the input video") h, w, _ = frame.shape cap.release() bg_img = cv2.resize(bg_img, (w, h)) clip = VideoFileClip(video_path) total_frames = clip.reader.nframes progress_counter = {"count": 0} def process_func(get_frame, t): progress_counter["count"] += 1 frame = get_frame(t) frame_uint8 = (frame * 255).astype(np.uint8) processed = process_frame_rembg(frame_uint8, bg_img) return processed.astype(np.float32) / 255 new_clip = clip.fl(process_func) output_path = "rembg_output.mp4" new_clip.write_videofile(output_path, audio=False, logger=None) return output_path def gradio_interface(video_file, bg_image, use_matanyone=True): if video_file is None or bg_image is None: return None, None, "Please upload both video and background image" video_path = video_file.name if hasattr(video_file, "name") else video_file bg_path = bg_image.name if hasattr(bg_image, "name") else bg_image if use_matanyone and MATANYONE_AVAILABLE: out_path = process_video_matanyone(video_path, bg_path) method_used = "MatAnyone (GPU)" else: out_path = process_video_rembg(video_path, bg_path) method_used = "rembg (CPU)" status = f"Method: {method_used}\nGPU available: {CUDA_AVAILABLE}" return out_path, out_path, status with gr.Blocks(title="VideoBackgroundReplacer (HF Space)") as demo: gr.Markdown("# VideoBackgroundReplacer (HF Space)") with gr.Row(): with gr.Column(): video_input = gr.Video(label="Input Video") bg_input = gr.Image(label="Background Image", type="filepath") use_matanyone = gr.Checkbox( label="Use MatAnyone (GPU if available)", value=MATANYONE_AVAILABLE, interactive=MATANYONE_AVAILABLE, ) btn = gr.Button("Process", variant="primary") with gr.Column(): out_video = gr.Video(label="Output") out_file = gr.File(label="Download") status = gr.Textbox(label="Status", lines=4) btn.click( fn=gradio_interface, inputs=[video_input, bg_input, use_matanyone], outputs=[out_video, out_file, status], ) if __name__ == "__main__": demo.launch()