| """OneVision Encoder Codec View. |
| |
| A simplified, dependency-light port of the codec_tools pipeline from |
| lmms-eval-ov2. The original tool relies on a bitcost-patched ffmpeg 5.1 to |
| score every macroblock by its actual encoding bit cost; we approximate that |
| saliency signal with a Sobel gradient magnitude per patch (high gradient = |
| high local complexity = roughly what the encoder would spend bits on). |
| |
| Pipeline (mirrors codec_tools/pipeline/process_video_bitcost_readiness.py): |
| 1. Uniformly sample N frames from the input video. |
| 2. smart_resize each frame so dims are multiples of `patch` and the |
| total pixel count <= max_pixels. |
| 3. Slice every frame into a patch grid; score each patch by its |
| Sobel gradient magnitude mean. |
| 4. Pick the top-K highest-scoring patches under the selected GOP |
| grouping. |
| 5. Render a "selection visualization" video: kept patches stay in |
| full color, dropped patches are faded to a gray-white wash so the |
| viewer can see exactly which patches the codec stage chose. |
| 6. Pack one canvas per GOP group: the first frame of each group is |
| kept whole as the I-frame, and later frames contribute only their |
| selected patches packed below it in time order. |
| """ |
|
|
| import json |
| import math |
| import os |
| import shutil |
| import subprocess |
| import tempfile |
| import time |
| from typing import List, Tuple |
|
|
| import cv2 |
| import gradio as gr |
| import imageio_ffmpeg |
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
| import numpy as np |
|
|
|
|
| PATCH_CHOICES = [14, 16, 28] |
| PATCH_OUTLINE_OUTER_BGR = (42, 23, 15) |
| PATCH_OUTLINE_INNER_BGR = (11, 158, 245) |
|
|
| DEMO_VIDEO_PATH = os.path.join( |
| os.path.dirname(os.path.abspath(__file__)), |
| "examples", "demo_codec_heatmap.mp4", |
| ) |
| DEMO_PRESET = ( |
| DEMO_VIDEO_PATH, |
| 32, |
| 14, |
| 1024, |
| 150000, |
| "sbs", |
| 0.55, |
| 0.0, 0.0, |
| "combined", |
| True, |
| 96.0, |
| 0.55, |
| "dynamic", |
| ) |
|
|
|
|
| def smart_resize(frame: np.ndarray, max_pixels: int, factor: int) -> np.ndarray: |
| """Resize so h,w are multiples of `factor` and h*w <= max_pixels.""" |
| h, w = frame.shape[:2] |
| pixels = h * w |
| if pixels > max_pixels: |
| scale = math.sqrt(max_pixels / pixels) |
| h = max(factor, int(h * scale)) |
| w = max(factor, int(w * scale)) |
| h = max(factor, (h // factor) * factor) |
| w = max(factor, (w // factor) * factor) |
| return cv2.resize(frame, (w, h), interpolation=cv2.INTER_AREA) |
|
|
|
|
| def sample_frame_ids(total: int, n: int) -> List[int]: |
| if total <= 0: |
| return [] |
| if n >= total: |
| return list(range(total)) |
| return [int(round(i)) for i in np.linspace(0, total - 1, n)] |
|
|
|
|
| def split_budget_evenly(total_k: int, n_parts: int) -> List[int]: |
| total = max(0, int(total_k)) |
| n = max(0, int(n_parts)) |
| if n == 0: |
| return [] |
| base, rem = divmod(total, n) |
| return [base + (1 if i < rem else 0) for i in range(n)] |
|
|
|
|
| def sample_window_frame_ids(start: int, end: int, n: int) -> List[int]: |
| start_i = int(start) |
| end_i = int(end) |
| count = max(0, int(n)) |
| if end_i < start_i or count <= 0: |
| return [] |
| total = end_i - start_i + 1 |
| if count >= total: |
| return list(range(start_i, end_i + 1)) |
| return [start_i + x for x in sample_frame_ids(total, count)] |
|
|
|
|
| def decode_frames(video_path: str, frame_ids: List[int]) -> List[np.ndarray]: |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| return [] |
| frames: List[np.ndarray] = [] |
| for fid in frame_ids: |
| cap.set(cv2.CAP_PROP_POS_FRAMES, int(fid)) |
| ok, fr = cap.read() |
| if ok: |
| frames.append(fr) |
| cap.release() |
| return frames |
|
|
|
|
| def video_metadata(video_path: str) -> dict: |
| cap = cv2.VideoCapture(video_path) |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0) |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| cap.release() |
| meta = { |
| "total_frames": total, |
| "fps": round(fps, 3), |
| "width": w, |
| "height": h, |
| } |
| if shutil.which("ffprobe"): |
| try: |
| r = subprocess.run( |
| [ |
| "ffprobe", "-v", "quiet", "-select_streams", "v:0", |
| "-show_entries", "stream=codec_name,bit_rate,pix_fmt,profile", |
| "-of", "json", video_path, |
| ], |
| capture_output=True, text=True, check=True, timeout=15, |
| ) |
| data = json.loads(r.stdout).get("streams", [{}])[0] |
| meta["codec"] = data.get("codec_name") |
| meta["pix_fmt"] = data.get("pix_fmt") |
| meta["profile"] = data.get("profile") |
| meta["bitrate_bps"] = data.get("bit_rate") |
| except Exception as e: |
| meta["ffprobe_error"] = str(e) |
| return meta |
|
|
|
|
| def patch_score_grid(frame_bgr: np.ndarray, patch: int) -> np.ndarray: |
| """Return [hb, wb] grid of Sobel gradient magnitude means per patch.""" |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY).astype(np.float32) |
| gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3) |
| gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3) |
| mag = np.sqrt(gx * gx + gy * gy) |
| h, w = mag.shape |
| hb, wb = h // patch, w // patch |
| mag = mag[: hb * patch, : wb * patch] |
| grid = mag.reshape(hb, patch, wb, patch).mean(axis=(1, 3)) |
| return grid.astype(np.float32) |
|
|
|
|
| def patch_score_frame_diff( |
| prev_bgr: np.ndarray, cur_bgr: np.ndarray, patch: int, |
| ) -> np.ndarray: |
| """Inter-frame absdiff per patch β proxy for motion / temporal complexity.""" |
| if prev_bgr is None or prev_bgr.shape != cur_bgr.shape: |
| return patch_score_grid(cur_bgr, patch) |
| diff = cv2.absdiff(prev_bgr, cur_bgr).mean(axis=2).astype(np.float32) |
| h, w = diff.shape |
| hb, wb = h // patch, w // patch |
| diff = diff[: hb * patch, : wb * patch] |
| return diff.reshape(hb, patch, wb, patch).mean(axis=(1, 3)) |
|
|
|
|
| def compute_score_grids( |
| frames: List[np.ndarray], patch: int, signal: str, |
| ) -> List[np.ndarray]: |
| """Build per-frame patch score grids from one of three signals: |
| - 'gradient' β Sobel magnitude only (intra-frame complexity) |
| - 'frame_diff' β absdiff vs previous frame (temporal motion) |
| - 'combined' β 0.5 * gradient_norm + 0.5 * frame_diff_norm |
| For 'combined', each component is independently shifted to [0,1] across |
| the whole sample so they contribute on equal footing.""" |
| sig = (signal or "gradient").lower() |
| if sig == "gradient": |
| return [patch_score_grid(f, patch) for f in frames] |
| if sig == "frame_diff": |
| out = [] |
| prev = None |
| for f in frames: |
| out.append(patch_score_frame_diff(prev, f, patch)) |
| prev = f |
| return out |
| |
| g = np.stack([patch_score_grid(f, patch) for f in frames], axis=0) |
| d_list = [] |
| prev = None |
| for f in frames: |
| d_list.append(patch_score_frame_diff(prev, f, patch)) |
| prev = f |
| d = np.stack(d_list, axis=0) |
|
|
| def _norm01(a: np.ndarray) -> np.ndarray: |
| a = a.astype(np.float32) - a.min() |
| m = a.max() |
| return a / m if m > 1e-8 else a |
|
|
| combined = 0.5 * _norm01(g) + 0.5 * _norm01(d) |
| return [combined[i] for i in range(combined.shape[0])] |
|
|
|
|
| def topk_mask(score: np.ndarray, k: int) -> np.ndarray: |
| """Per-frame top-K mask (legacy helper, no longer used by process()).""" |
| flat = score.flatten() |
| if k >= flat.size: |
| return np.ones_like(score, dtype=np.uint8) |
| if k <= 0: |
| return np.zeros_like(score, dtype=np.uint8) |
| out = np.zeros(flat.size, dtype=np.uint8) |
| keep_idx = np.argpartition(flat, -k)[-k:] |
| out[keep_idx] = 1 |
| return out.reshape(score.shape) |
|
|
|
|
| def global_topk_masks( |
| grids: List[np.ndarray], total_k: int, |
| ) -> Tuple[List[np.ndarray], int]: |
| """Pick the top `total_k` highest-scoring patches GLOBALLY across all |
| sampled frames, return one mask per frame plus the actual count. |
| |
| Some frames may end up with zero patches (low energy throughout) while |
| others may contribute many β that's the whole point: the codec-style |
| saliency lets the budget concentrate where it matters.""" |
| if not grids: |
| return [], 0 |
| arr = np.stack(grids, axis=0).astype(np.float32) |
| N, hb, wb = arr.shape |
| flat = arr.reshape(-1) |
| k = int(total_k) |
| if k >= flat.size: |
| masks = [np.ones((hb, wb), dtype=np.uint8) for _ in range(N)] |
| return masks, int(flat.size) |
| if k <= 0: |
| return [np.zeros((hb, wb), dtype=np.uint8) for _ in range(N)], 0 |
| mask_flat = np.zeros(flat.size, dtype=np.uint8) |
| keep_idx = np.argpartition(flat, -k)[-k:] |
| mask_flat[keep_idx] = 1 |
| bool_mask = mask_flat.reshape(N, hb, wb) |
| return [bool_mask[i].astype(np.uint8) for i in range(N)], k |
|
|
|
|
| def build_dynamic_groups( |
| grids: List[np.ndarray], |
| min_group_frames: int = 8, |
| max_group_frames: int = 64, |
| preferred_group_frames: int = 32, |
| ) -> List[Tuple[int, int]]: |
| """Adaptive temporal grouping by cumulative saliency energy. |
| |
| Groups are energy-adaptive, but constrained to a practical codec-stream |
| range: by default each group spans roughly 8-64 sampled frames, with a |
| preference around 32 frames/group. Each group later becomes exactly one |
| IPPP canvas whose first frame is kept whole as the I-frame.""" |
| n = len(grids) |
| if n == 0: |
| return [] |
|
|
| min_len = max(1, int(min_group_frames)) |
| max_len = max(min_len, int(max_group_frames)) |
| preferred = min(max_len, max(min_len, int(preferred_group_frames))) |
|
|
| if n <= max_len: |
| return [(0, n - 1)] |
|
|
| min_groups = max(1, math.ceil(n / max_len)) |
| max_groups = max(1, n // min_len) |
| target_groups = max(1, math.ceil(n / preferred)) |
| target_groups = min(max(target_groups, min_groups), max_groups) |
| if target_groups <= 1: |
| return [(0, n - 1)] |
|
|
| energies = np.array([float(g.sum()) for g in grids], dtype=np.float64) |
| total = energies.sum() |
| if total <= 1e-8: |
| |
| size = max(min_len, min(max_len, math.ceil(n / target_groups))) |
| groups: List[Tuple[int, int]] = [] |
| cursor = 0 |
| while cursor < n and len(groups) < target_groups: |
| end = min(n - 1, cursor + size - 1) |
| if len(groups) == target_groups - 1: |
| end = n - 1 |
| groups.append((cursor, end)) |
| cursor = end + 1 |
| return groups |
|
|
| target_per_group = total / target_groups |
| groups = [] |
| start = 0 |
| cum = 0.0 |
| for i in range(n): |
| cum += energies[i] |
| group_len = i - start + 1 |
| groups_left = target_groups - len(groups) - 1 |
| frames_left_after = n - i - 1 |
| min_room_ok = frames_left_after >= groups_left * min_len |
| threshold_hit = cum >= target_per_group and group_len >= min_len |
| force_close = group_len >= max_len |
| if len(groups) < target_groups - 1 and min_room_ok and (threshold_hit or force_close): |
| groups.append((start, i)) |
| start = i + 1 |
| cum = 0.0 |
| if start <= n - 1: |
| groups.append((start, n - 1)) |
| return groups |
|
|
|
|
| def grouped_topk_masks( |
| grids: List[np.ndarray], total_k: int, gop: str, |
| ) -> Tuple[List[np.ndarray], int, List[Tuple[int, int]], str]: |
| """Select patches under a GOP grouping strategy. |
| |
| GOP modes: |
| - "global": one big group across the whole video β top-K global. |
| - "<int>" (e.g. "4"/"8"/"16"): fixed group size in frames; the |
| budget is split equally across groups, top-K picked within each. |
| - "dynamic": codec-stream-style adaptive groups (see |
| build_dynamic_groups), defaulting to roughly 8-64 frames/group. |
| |
| Returns (per-frame masks, actual selected count, [(start,end),...] groups, resolved_label). |
| """ |
| n = len(grids) |
| if n == 0: |
| return [], 0, [], gop |
|
|
| mode = (gop or "global").strip().lower() |
|
|
| if mode in ("global", "none", "0", ""): |
| masks, actual = global_topk_masks(grids, int(total_k)) |
| return masks, actual, [(0, n - 1)], "global" |
|
|
| if mode == "dynamic": |
| groups = build_dynamic_groups(grids) |
| resolved_label = "codec-stream" |
| else: |
| try: |
| g_size = max(1, int(mode)) |
| except ValueError: |
| g_size = n |
| groups = [] |
| cursor = 0 |
| while cursor < n: |
| end = min(n - 1, cursor + g_size - 1) |
| groups.append((cursor, end)) |
| cursor = end + 1 |
| resolved_label = mode |
|
|
| num_groups = max(1, len(groups)) |
| target_k = max(0, int(total_k)) |
|
|
| capacities = [ |
| sum(int(g.size) for g in grids[s:e + 1]) |
| for (s, e) in groups |
| ] |
| alloc = split_budget_evenly(target_k, num_groups) |
|
|
| leftover = 0 |
| for i, cap in enumerate(capacities): |
| if alloc[i] > cap: |
| leftover += alloc[i] - cap |
| alloc[i] = cap |
| while leftover > 0: |
| progressed = False |
| for i, cap in enumerate(capacities): |
| if alloc[i] < cap and leftover > 0: |
| alloc[i] += 1 |
| leftover -= 1 |
| progressed = True |
| if not progressed: |
| break |
|
|
| |
| out_masks = [np.zeros(g.shape, dtype=np.uint8) for g in grids] |
| actual_total = 0 |
| for (s, e), group_k in zip(groups, alloc): |
| sub = grids[s:e + 1] |
| sub_masks, sub_actual = global_topk_masks(sub, group_k) |
| for i, sm in enumerate(sub_masks): |
| out_masks[s + i] = sm |
| actual_total += sub_actual |
| return out_masks, actual_total, groups, resolved_label |
|
|
|
|
| def faded_background(frame_bgr: np.ndarray, fade: float = 0.55) -> np.ndarray: |
| """Convert to gray-white wash: gray * (1-fade) + white * fade.""" |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) |
| gray_bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR).astype(np.float32) |
| white = np.full_like(gray_bgr, 255.0) |
| out = gray_bgr * (1.0 - fade) + white * fade |
| return out.astype(np.uint8) |
|
|
|
|
| def overlay_selection( |
| frame_bgr: np.ndarray, mask_grid: np.ndarray, patch: int, |
| outline: bool = True, fade: float = 0.55, |
| ) -> np.ndarray: |
| """Composite: kept patches keep color; dropped patches become gray-white. |
| Optionally draw a thin outline around kept patches.""" |
| h, w = frame_bgr.shape[:2] |
| hb, wb = mask_grid.shape |
| pix_mask = np.kron(mask_grid, np.ones((patch, patch), dtype=np.uint8)) |
| pix_mask = pix_mask[:h, :w] |
| bg = faded_background(frame_bgr, fade=float(fade)) |
| keep = pix_mask.astype(bool)[..., None] |
| out = np.where(keep, frame_bgr, bg) |
| if outline: |
| |
| |
| outer_thickness = 2 if patch >= 20 else 1 |
| for i in range(hb): |
| for j in range(wb): |
| if mask_grid[i, j]: |
| y0, x0 = i * patch, j * patch |
| y1, x1 = y0 + patch - 1, x0 + patch - 1 |
| cv2.rectangle( |
| out, (x0, y0), (x1, y1), |
| PATCH_OUTLINE_OUTER_BGR, outer_thickness, |
| lineType=cv2.LINE_AA, |
| ) |
| if patch >= 6 and (x1 - x0) >= 3 and (y1 - y0) >= 3: |
| cv2.rectangle( |
| out, (x0 + 1, y0 + 1), (x1 - 1, y1 - 1), |
| PATCH_OUTLINE_INNER_BGR, 1, |
| lineType=cv2.LINE_AA, |
| ) |
| return out |
|
|
|
|
| def _normalize_scores(grids: List[np.ndarray], pct: float = 99.0) -> np.ndarray: |
| """Stack into [N, hb, wb], shift by per-video min, divide by global pct. |
| Using the percentile (instead of max) suppresses outlier patches the same |
| way codec_tools does with bitcost_pct=99.""" |
| arr = np.stack(grids, axis=0).astype(np.float32) |
| arr = arr - arr.min() |
| cap = np.percentile(arr, pct) if arr.size else 1.0 |
| if cap <= 1e-8: |
| cap = float(arr.max() or 1.0) |
| arr = np.clip(arr / cap, 0.0, 1.0) |
| return arr |
|
|
|
|
| def overlay_heatmap( |
| frame_bgr: np.ndarray, score_grid: np.ndarray, patch: int, |
| alpha: float = 0.55, |
| ) -> np.ndarray: |
| """Render a continuous JET heatmap of patch scores blended over the frame. |
| Low score = blue, high score = red. `score_grid` is in [0, 1].""" |
| h, w = frame_bgr.shape[:2] |
| score = (np.clip(score_grid, 0.0, 1.0) * 255.0).astype(np.uint8) |
| pix = np.kron(score, np.ones((patch, patch), dtype=np.uint8)) |
| pix = pix[:h, :w] |
| heat = cv2.applyColorMap(pix, cv2.COLORMAP_JET) |
| out = cv2.addWeighted(frame_bgr, 1.0 - alpha, heat, alpha, 0.0) |
| return out |
|
|
|
|
| def overlay_sbs( |
| frame_bgr: np.ndarray, mask_grid: np.ndarray, score_grid: np.ndarray, |
| patch: int, alpha: float = 0.55, fade: float = 0.55, |
| ) -> np.ndarray: |
| """Side-by-side: [selection | heatmap] with a thin separator.""" |
| left = overlay_selection(frame_bgr, mask_grid, patch, outline=True, fade=fade) |
| right = overlay_heatmap(frame_bgr, score_grid, patch, alpha=alpha) |
| h, w = left.shape[:2] |
| sep = np.full((h, 4, 3), 30, dtype=np.uint8) |
| sbs = np.concatenate([left, sep, right], axis=1) |
| cv2.putText(sbs, "selection", (8, 22), cv2.FONT_HERSHEY_SIMPLEX, |
| 0.6, (255, 255, 255), 2, cv2.LINE_AA) |
| cv2.putText(sbs, "heatmap", (w + 12, 22), cv2.FONT_HERSHEY_SIMPLEX, |
| 0.6, (255, 255, 255), 2, cv2.LINE_AA) |
| return sbs |
|
|
|
|
| def write_mp4(frames: List[np.ndarray], path: str, fps: float) -> None: |
| """Write H.264 mp4 via imageio-ffmpeg's bundled ffmpeg (browser-friendly).""" |
| if not frames: |
| raise ValueError("no frames to write") |
| h, w = frames[0].shape[:2] |
| ff = imageio_ffmpeg.get_ffmpeg_exe() |
| cmd = [ |
| ff, "-y", "-loglevel", "error", |
| "-f", "rawvideo", "-vcodec", "rawvideo", |
| "-s", f"{w}x{h}", "-pix_fmt", "bgr24", |
| "-r", f"{fps:.3f}", "-i", "-", |
| "-an", "-vcodec", "libx264", "-pix_fmt", "yuv420p", |
| "-preset", "veryfast", "-crf", "23", |
| "-movflags", "+faststart", |
| path, |
| ] |
| proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
| try: |
| for f in frames: |
| if f.shape[0] % 2 or f.shape[1] % 2: |
| f = f[: f.shape[0] // 2 * 2, : f.shape[1] // 2 * 2] |
| proc.stdin.write(np.ascontiguousarray(f).tobytes()) |
| proc.stdin.close() |
| err = proc.stderr.read().decode("utf-8", errors="ignore") |
| rc = proc.wait() |
| if rc != 0: |
| raise RuntimeError(f"ffmpeg failed (rc={rc}): {err}") |
| finally: |
| if proc.poll() is None: |
| proc.kill() |
|
|
|
|
| def _build_ippp_canvas( |
| frames: List[np.ndarray], masks: List[np.ndarray], |
| i_idx: int, p_range: range, patch: int, |
| ) -> Tuple[np.ndarray, int]: |
| """Build one GOP canvas with explicit I/P sections. |
| |
| Layout: |
| 1. The group's first frame is copied whole as the I-frame. |
| 2. Each later P-frame gets its own packed section below, in time order. |
| So GOP=4 becomes I|P|P|P, GOP=5 becomes I|P|P|P|P, etc. |
| |
| Returns (canvas, n_patches) where n_patches is the number of selected |
| P-frame patches packed under the I-frame.""" |
| i_frame = frames[i_idx] |
| h, w = i_frame.shape[:2] |
| hb, wb = h // patch, w // patch |
| frame_h, frame_w = hb * patch, wb * patch |
| i_crop = i_frame[:frame_h, :frame_w].copy() |
|
|
| divider_h = 2 |
| p_sections: List[np.ndarray] = [] |
| n_patches = 0 |
| for k in p_range: |
| if k >= len(frames): |
| break |
| f, m = frames[k], masks[k] |
| packed_patches: List[np.ndarray] = [] |
| for i in range(m.shape[0]): |
| for j in range(m.shape[1]): |
| if m[i, j]: |
| packed_patches.append( |
| f[ |
| i * patch:(i + 1) * patch, |
| j * patch:(j + 1) * patch, |
| ].copy() |
| ) |
| n_patches += len(packed_patches) |
| packed_rows = max(1, int(math.ceil(len(packed_patches) / max(1, wb)))) |
| packed_h = packed_rows * patch |
| section_bg = np.full((packed_h, frame_w, 3), 246, dtype=np.uint8) |
| for idx, tile in enumerate(packed_patches): |
| row = idx // wb |
| col = idx % wb |
| y0 = row * patch |
| x0 = col * patch |
| section_bg[y0:y0 + patch, x0:x0 + patch] = tile |
| p_sections.append(section_bg) |
|
|
| total_h = frame_h + sum(divider_h + sec.shape[0] for sec in p_sections) |
| canvas = np.full((total_h, frame_w, 3), 250, dtype=np.uint8) |
| canvas[:frame_h, :frame_w] = i_crop |
|
|
| y = frame_h |
| for section in p_sections: |
| canvas[y:y + divider_h, :] = (99, 102, 241) |
| y += divider_h |
| sec_h = section.shape[0] |
| canvas[y:y + sec_h, :frame_w] = section |
| y += sec_h |
|
|
| return canvas, n_patches |
|
|
|
|
| def pack_canvases_per_group( |
| frames: List[np.ndarray], |
| masks: List[np.ndarray], |
| groups: List[Tuple[int, int]], |
| patch: int, |
| target_canvases: int = 1, |
| ) -> Tuple[List[np.ndarray], List[Tuple[int, int, int]], int]: |
| """Pack exactly one IPPP canvas per GOP group. |
| |
| Each group's first frame is kept whole as the I-frame, and every |
| later frame gets its own packed P section below it. `target_canvases` |
| is kept only for API compatibility and is ignored. |
| |
| Returns: |
| canvases β list of np.ndarray, length == number of groups. |
| sub_ranges β list of (group_idx, sub_start, sub_end) parallel to |
| canvases, for caption / debugging. |
| total_selected β I-frame patches (counted as full grid) + P-frame |
| selected patches across all canvases. |
| """ |
| canvases: List[np.ndarray] = [] |
| sub_ranges: List[Tuple[int, int, int]] = [] |
| total_selected = 0 |
| if not groups or not frames: |
| return [np.full((patch, patch, 3), 255, dtype=np.uint8)], [(0, 0, 0)], 0 |
|
|
| for g_idx, (s, e) in enumerate(groups): |
| if s >= len(frames): |
| continue |
| ss, ee = s, e |
| canvas, n_patches = _build_ippp_canvas( |
| frames, masks, i_idx=ss, p_range=range(ss + 1, ee + 1), |
| patch=patch, |
| ) |
| canvases.append(canvas) |
| sub_ranges.append((g_idx, ss, ee)) |
| hb, wb = frames[ss].shape[0] // patch, frames[ss].shape[1] // patch |
| total_selected += hb * wb + n_patches |
|
|
| if not canvases: |
| canvases = [np.full((patch, patch, 3), 255, dtype=np.uint8)] |
| sub_ranges = [(0, 0, 0)] |
| return canvases, sub_ranges, total_selected |
|
|
|
|
| def make_charts( |
| grids: List[np.ndarray], |
| masks: List[np.ndarray], |
| codec_frame_ids: List[int], |
| uniform_frame_ids: List[int], |
| uniform_requested_frames: int, |
| uniform_full_frame_patches: int, |
| fps: float, |
| total_duration_sec: float, |
| total_patches_budget: int, |
| saliency_signal: str, |
| groups: List[Tuple[int, int]] = None, |
| gop_label: str = "global", |
| ): |
| """One overlaid step chart: cumulative patches selected vs time, for |
| the codec saliency curve and a uniform full-frame sampling baseline. |
| |
| X = time (s) |
| Y = cumulative count of selected patches |
| The codec curve rises in bursts where saliency is high; the uniform |
| baseline rises in equal steps because every sampled full frame |
| contributes `patch_size^2` patches.""" |
| |
| |
| fig, ax = plt.subplots( |
| figsize=(11.4, 4.8), dpi=220, constrained_layout=True, |
| ) |
|
|
| fps_safe = float(fps) if fps and fps > 0 else 25.0 |
| if grids: |
| hb, wb = grids[0].shape |
| else: |
| hb = wb = 1 |
| grid_size = hb * wb |
| all_frame_ids = list(codec_frame_ids) + list(uniform_frame_ids) |
| duration = float(total_duration_sec) if total_duration_sec and total_duration_sec > 0 else ( |
| (max(all_frame_ids) / fps_safe) if all_frame_ids else 1.0 |
| ) |
|
|
| |
| def _step(xs, cum): |
| """Return (xx, yy) for a left-continuous step plot through (xs, cum).""" |
| if not xs: |
| return [0.0, duration], [0.0, 0.0] |
| xx, yy = [0.0], [0.0] |
| prev = 0.0 |
| for x, c in zip(xs, cum): |
| xx.extend([x, x]); yy.extend([prev, c]) |
| prev = c |
| xx.append(duration); yy.append(prev) |
| return xx, yy |
|
|
| times = [fid / fps_safe for fid in codec_frame_ids] |
| counts = [int(m.sum()) for m in masks] |
| codec_cum = list(np.cumsum(counts)) if counts else [] |
| codec_total = int(codec_cum[-1]) if codec_cum else 0 |
| xx_c, yy_c = _step(times, codec_cum) |
|
|
| |
| |
| |
| budget_int = int(total_patches_budget) |
| requested_uniform = max(0, int(uniform_requested_frames)) |
| full_frame_patches = max(1, int(uniform_full_frame_patches)) |
| n_uniform = len(uniform_frame_ids) |
| uni_per_step = [full_frame_patches for _ in uniform_frame_ids] |
| uni_cum = list(np.cumsum(uni_per_step)) if uni_per_step else [] |
| uni_total = int(uni_cum[-1]) if uni_cum else 0 |
| uni_times = [fid / fps_safe for fid in uniform_frame_ids] |
| xx_u, yy_u = _step(uni_times, uni_cum) |
|
|
| |
| |
| if counts: |
| c_min, c_max = int(min(counts)), int(max(counts)) |
| c_avg = codec_total / max(1, len(counts)) |
| codec_lbl = ( |
| f"Codec Β· {saliency_signal} ({codec_total:,} total Β· " |
| f"per-frame min {c_min} Β· avg {c_avg:.1f} Β· max {c_max})" |
| ) |
| else: |
| codec_lbl = f"Codec Β· {saliency_signal} ({codec_total:,} patches)" |
| if uni_per_step: |
| unused = max(0, budget_int - uni_total) |
| frame_part = ( |
| f"{n_uniform}/{requested_uniform} frames fit budget" |
| if requested_uniform != n_uniform else f"{n_uniform} frames" |
| ) |
| uni_lbl = ( |
| f"Uniform full frames ({frame_part} Β· {full_frame_patches}/frame Β· " |
| f"{uni_total:,} total" |
| + (f" Β· {unused:,} budget unused" if unused else "") |
| + ")" |
| ) |
| else: |
| uni_lbl = ( |
| f"Uniform full frames (0/{requested_uniform} frames fit budget " |
| f"{budget_int:,}; need {full_frame_patches} patches/frame)" |
| ) |
|
|
| ax.fill_between(xx_c, yy_c, step=None, alpha=0.12, color="#4f46e5") |
| ax.plot(xx_c, yy_c, color="#4f46e5", linewidth=2.8, label=codec_lbl) |
| ax.fill_between(xx_u, yy_u, step=None, alpha=0.10, color="#06b6d4") |
| ax.plot( |
| xx_u, yy_u, color="#06b6d4", linewidth=2.8, linestyle="--", |
| label=uni_lbl, |
| ) |
|
|
| |
| budget = int(total_patches_budget) |
| ax.axhline(budget, color="#94a3b8", linestyle=":", linewidth=1.1, alpha=0.85) |
| ax.text( |
| duration * 0.995, budget * 1.015, |
| f"budget {budget:,}", color="#475569", |
| fontsize=10.0, va="bottom", ha="right", |
| ) |
|
|
| |
| if groups and len(groups) > 1 and times: |
| for (_, end_idx) in groups[:-1]: |
| if end_idx + 1 < len(times): |
| bx = (times[end_idx] + times[end_idx + 1]) / 2.0 |
| else: |
| bx = times[end_idx] |
| ax.axvline( |
| bx, color="#cbd5e1", linestyle=(0, (3, 3)), |
| alpha=0.55, linewidth=1.0, |
| ) |
|
|
| n_groups = len(groups) if groups else 1 |
| gop_str = gop_label if gop_label in ("global", "codec-stream") else f"GOP={gop_label}" |
| ax.set_title( |
| f"Cumulative patches selected over time Β· {saliency_signal} Β· " |
| f"{gop_str} ({n_groups} groups)", |
| fontsize=13, color="#1e293b", |
| ) |
| ax.set_xlabel("time (s)", fontsize=11) |
| ax.set_ylabel("# patches selected (cumulative)", fontsize=11) |
| ax.set_xlim(-duration * 0.02, duration * 1.02) |
| ymax = max(budget, codec_total, uni_total) * 1.08 + 1 |
| ax.set_ylim(0, ymax) |
| ax.tick_params(axis="both", labelsize=10) |
| ax.grid(True, alpha=0.25, linestyle="--", axis="y") |
| ax.spines[["top", "right"]].set_visible(False) |
| ax.legend(loc="upper left", fontsize=10, frameon=False, handlelength=2.8) |
|
|
| fig.patch.set_facecolor("white") |
| return fig |
|
|
|
|
| def process( |
| video_path, |
| sample_frames: int, |
| patch_size: int, |
| total_patches: int, |
| max_pixels: int, |
| viz_mode: str = "selection", |
| heatmap_alpha: float = 0.55, |
| start_sec: float = 0.0, |
| end_sec: float = 0.0, |
| saliency_signal: str = "gradient", |
| score_log_scale: bool = False, |
| bitcost_pct: float = 99.0, |
| fade_strength: float = 0.55, |
| gop: str = "global", |
| target_canvases: int = 1, |
| progress=gr.Progress(track_tqdm=False), |
| ): |
| if not video_path: |
| return None, [], "Please upload a video.", None |
|
|
| t0 = time.time() |
| progress(0.05, desc="Reading metadata") |
| meta = video_metadata(video_path) |
| total = meta.get("total_frames") or 0 |
| if total <= 0: |
| return None, [], json.dumps( |
| {"error": "Could not read frame count.", "metadata": meta}, |
| indent=2, ensure_ascii=False, |
| ), None |
|
|
| progress(0.10, desc="Sampling frames") |
| fps = float(meta.get("fps") or 0.0) |
| s_sec = max(0.0, float(start_sec or 0.0)) |
| e_sec = float(end_sec or 0.0) |
| if fps > 0 and (s_sec > 0 or e_sec > 0): |
| f_start = max(0, int(round(s_sec * fps))) |
| f_end = ( |
| min(total - 1, int(round(e_sec * fps)) - 1) |
| if e_sec > 0 else total - 1 |
| ) |
| if f_end <= f_start: |
| f_end = total - 1 |
| window_total = f_end - f_start + 1 |
| if int(sample_frames) >= window_total: |
| fids = list(range(f_start, f_end + 1)) |
| else: |
| fids = [ |
| int(round(x)) |
| for x in np.linspace(f_start, f_end, int(sample_frames)) |
| ] |
| else: |
| f_start, f_end = 0, total - 1 |
| fids = sample_frame_ids(total, int(sample_frames)) |
| raw = decode_frames(video_path, fids) |
| if not raw: |
| return None, [], json.dumps( |
| {"error": "Failed to decode frames.", "metadata": meta}, |
| indent=2, ensure_ascii=False, |
| ), None |
|
|
| progress(0.25, desc="smart_resize") |
| resized = [smart_resize(f, int(max_pixels), int(patch_size)) for f in raw] |
| th, tw = resized[0].shape[:2] |
| resized = [ |
| cv2.resize(f, (tw, th), interpolation=cv2.INTER_AREA) |
| if f.shape[:2] != (th, tw) else f |
| for f in resized |
| ] |
|
|
| progress(0.40, desc=f"Scoring patches ({saliency_signal})") |
| grids = compute_score_grids(resized, int(patch_size), saliency_signal) |
| if score_log_scale: |
| grids = [np.log1p(np.clip(g, 0.0, None)) for g in grids] |
| masks, actual_selected, groups, gop_resolved = grouped_topk_masks( |
| grids, int(total_patches), str(gop or "global"), |
| ) |
| norm_scores = _normalize_scores(grids, pct=float(bitcost_pct)) |
|
|
| mode = (viz_mode or "selection").lower() |
| if mode not in ("selection", "heatmap", "sbs"): |
| mode = "selection" |
| progress(0.60, desc=f"Rendering {mode} video") |
| if mode == "heatmap": |
| vis = [ |
| overlay_heatmap(f, s, int(patch_size), alpha=float(heatmap_alpha)) |
| for f, s in zip(resized, norm_scores) |
| ] |
| elif mode == "sbs": |
| vis = [ |
| overlay_sbs( |
| f, m, s, int(patch_size), |
| alpha=float(heatmap_alpha), fade=float(fade_strength), |
| ) |
| for f, m, s in zip(resized, masks, norm_scores) |
| ] |
| else: |
| vis = [ |
| overlay_selection(f, m, int(patch_size), fade=float(fade_strength)) |
| for f, m in zip(resized, masks) |
| ] |
|
|
| out_dir = tempfile.mkdtemp(prefix="codec_view_") |
| vis_path = os.path.join(out_dir, f"{mode}_vis.mp4") |
| vis_fps = max(2.0, min(8.0, (meta.get("fps") or 25.0) / 4.0)) |
| write_mp4(vis, vis_path, vis_fps) |
|
|
| progress(0.85, desc="Packing canvases (IPPP)") |
| canvases, sub_ranges, n_selected = pack_canvases_per_group( |
| resized, masks, groups, int(patch_size), |
| target_canvases=1, |
| ) |
| canvas_items: List[Tuple[str, str]] = [] |
| for idx, canv in enumerate(canvases): |
| cp = os.path.join(out_dir, f"canvas_{idx:03d}.png") |
| cv2.imwrite(cp, canv) |
| g_idx, ss, ee = sub_ranges[idx] if idx < len(sub_ranges) else (0, idx, idx) |
| src_start = int(fids[ss]) if ss < len(fids) else None |
| src_end = int(fids[ee]) if ee < len(fids) else None |
| p_frame_count = max(0, ee - ss) |
| structure_label = " ".join(["I"] + ["P"] * p_frame_count) |
| p_patch_count = int(sum(int(m.sum()) for m in masks[ss + 1:ee + 1])) |
| caption = ( |
| f"Canvas {idx + 1}/{len(canvases)} Β· group {g_idx + 1} Β· " |
| f"{structure_label} Β· sampled #{ss}-{ee} Β· src {src_start}-{src_end} Β· " |
| f"I src#{src_start} + {p_patch_count} P patches from " |
| f"{p_frame_count} frame{'s' if p_frame_count != 1 else ''}" |
| ) |
| canvas_items.append((cp, caption)) |
|
|
| hb, wb = grids[0].shape |
| grid_size = int(grids[0].shape[0] * grids[0].shape[1]) if grids else 0 |
| uniform_full_frame_patches = int(patch_size) * int(patch_size) |
| |
| |
| requested_budget = int(total_patches) |
| uniform_requested_frames = len(fids) |
| uniform_frame_count = min( |
| uniform_requested_frames, |
| requested_budget // max(1, uniform_full_frame_patches), |
| ) |
| uniform_frame_ids = sample_window_frame_ids(f_start, f_end, uniform_frame_count) |
| uniform_total = int(len(uniform_frame_ids) * uniform_full_frame_patches) |
| info = { |
| "input": meta, |
| "params": { |
| "sample_frames": int(sample_frames), |
| "patch_size": int(patch_size), |
| "total_patches_budget": int(total_patches), |
| "max_pixels": int(max_pixels), |
| "start_sec": float(s_sec), |
| "end_sec": float(e_sec) if e_sec > 0 else None, |
| "saliency_signal": saliency_signal, |
| "score_log_scale": bool(score_log_scale), |
| "bitcost_pct": float(bitcost_pct), |
| "fade_strength": float(fade_strength), |
| "gop": gop_resolved, |
| "canvas_policy": "one_canvas_per_group_with_per_frame_p_sections", |
| "i_frame_policy": "first_frame_full_in_each_group", |
| }, |
| "gop_groups": [ |
| { |
| "start_frame_idx": int(s), |
| "end_frame_idx": int(e), |
| "start_sample_idx": int(s), |
| "end_sample_idx": int(e), |
| "start_source_frame_id": int(fids[s]) if s < len(fids) else None, |
| "end_source_frame_id": int(fids[e]) if e < len(fids) else None, |
| "source_frame_ids": [int(fids[i]) for i in range(s, e + 1)], |
| "n_frames": int(e - s + 1), |
| "structure_label": " ".join(["I"] + ["P"] * max(0, e - s)), |
| "i_frame_source_id": int(fids[s]) if s < len(fids) else None, |
| "p_source_frame_ids": [int(fids[i]) for i in range(s + 1, e + 1)], |
| "p_frame_count": int(max(0, e - s)), |
| "p_frame_patch_counts": [int(masks[i].sum()) for i in range(s + 1, e + 1)], |
| "p_frame_selected_patches": int(sum(int(m.sum()) for m in masks[s + 1:e + 1])), |
| "selected": int(sum(int(m.sum()) for m in masks[s:e + 1])), |
| } |
| for (s, e) in groups |
| ], |
| "frame_window": { |
| "first_decoded": int(f_start), |
| "last_decoded": int(f_end), |
| "codec_frame_ids": [int(x) for x in fids], |
| "uniform_full_frame_ids": [int(x) for x in uniform_frame_ids], |
| }, |
| "codec_per_frame_patches": [int(m.sum()) for m in masks], |
| "uniform_baseline": { |
| "mode": "uniform_full_frame_sampling", |
| "requested_frames": int(uniform_requested_frames), |
| "frames": int(len(uniform_frame_ids)), |
| "patches_per_frame": int(uniform_full_frame_patches), |
| "frame_ids": [int(x) for x in uniform_frame_ids], |
| "requested_budget": requested_budget, |
| "unused_budget": int(max(0, requested_budget - uniform_total)), |
| "total_patches": uniform_total, |
| "explanation": ( |
| "Uniformly sample complete frames from the same time window. " |
| f"The baseline targets the same sampled-frame count as codec " |
| f"({uniform_requested_frames}), but each full frame costs " |
| f"{uniform_full_frame_patches} patches (= patch_size^2), so " |
| f"only {len(uniform_frame_ids)} full " |
| "frames may fit inside the requested budget." |
| ), |
| }, |
| "resized_frame_size": f"{tw}x{th}", |
| "patch_grid_per_frame": f"{hb}x{wb} = {hb * wb} patches", |
| "uniform_full_frame_patch_cost": int(uniform_full_frame_patches), |
| "actual_selected_total": int(actual_selected), |
| "total_selected_patches_incl_i_frames": int(n_selected), |
| "canvases": [ |
| { |
| "index": i, |
| "size": f"{canvases[i].shape[1]}x{canvases[i].shape[0]}", |
| "group": int(sub_ranges[i][0]) if i < len(sub_ranges) else None, |
| "sub_range": list(sub_ranges[i][1:3]) if i < len(sub_ranges) else None, |
| "sampled_indices": ( |
| [int(x) for x in range(sub_ranges[i][1], sub_ranges[i][2] + 1)] |
| if i < len(sub_ranges) else [] |
| ), |
| "source_frame_ids": ( |
| [int(fids[x]) for x in range(sub_ranges[i][1], sub_ranges[i][2] + 1)] |
| if i < len(sub_ranges) else [] |
| ), |
| "structure_label": ( |
| " ".join(["I"] + ["P"] * max(0, sub_ranges[i][2] - sub_ranges[i][1])) |
| if i < len(sub_ranges) else "I" |
| ), |
| "i_frame_source_id": ( |
| int(fids[sub_ranges[i][1]]) if i < len(sub_ranges) else None |
| ), |
| "p_source_frame_ids": ( |
| [int(fids[x]) for x in range(sub_ranges[i][1] + 1, sub_ranges[i][2] + 1)] |
| if i < len(sub_ranges) else [] |
| ), |
| "p_frame_count": ( |
| int(max(0, sub_ranges[i][2] - sub_ranges[i][1])) |
| if i < len(sub_ranges) else 0 |
| ), |
| "p_frame_patch_counts": ( |
| [int(masks[x].sum()) for x in range(sub_ranges[i][1] + 1, sub_ranges[i][2] + 1)] |
| if i < len(sub_ranges) else [] |
| ), |
| "p_frame_selected_patches": ( |
| int(sum(int(m.sum()) for m in masks[sub_ranges[i][1] + 1:sub_ranges[i][2] + 1])) |
| if i < len(sub_ranges) else 0 |
| ), |
| "structure": "Full I-frame on top; one packed P section per " |
| "later frame, in time order.", |
| } |
| for i in range(len(canvases)) |
| ], |
| "n_canvases": int(len(canvases)), |
| "vis_video_fps": round(vis_fps, 2), |
| "viz_mode": mode, |
| "heatmap_alpha": float(heatmap_alpha) if mode != "selection" else None, |
| "score_normalization": f"shift-min, /p{bitcost_pct:.1f}, clip" |
| + (" (log1p applied)" if score_log_scale else ""), |
| "elapsed_sec": round(time.time() - t0, 2), |
| } |
| progress(0.95, desc="Building charts") |
| duration_sec = (total / fps) if fps > 0 else 0.0 |
| chart_fig = make_charts( |
| grids, masks, fids, uniform_frame_ids, uniform_requested_frames, |
| uniform_full_frame_patches, |
| fps, duration_sec, |
| int(total_patches), saliency_signal, |
| groups=groups, gop_label=gop_resolved, |
| ) |
|
|
| progress(1.0, desc="Done") |
| return ( |
| vis_path, canvas_items, |
| json.dumps(info, indent=2, ensure_ascii=False), |
| chart_fig, |
| ) |
|
|
|
|
| CUSTOM_CSS = """ |
| :root, .gradio-container, .gradio-container.dark { |
| --ovc-grad: linear-gradient(135deg, #4f46e5 0%, #2563eb 50%, #06b6d4 100%); |
| --ovc-grad-soft: linear-gradient(135deg, rgba(79,70,229,0.10), rgba(6,182,212,0.10)); |
| --ovc-ring: rgba(79,70,229,0.18); |
| --ovc-ring-strong: rgba(79,70,229,0.30); |
| --ovc-line-soft: rgba(148,163,184,0.18); |
| --ovc-line: rgba(148,163,184,0.26); |
| --ovc-line-strong: rgba(100,116,139,0.38); |
| --ovc-line-accent-soft: rgba(79,70,229,0.18); |
| --ovc-line-accent: rgba(79,70,229,0.28); |
| --ovc-line-accent-strong: rgba(79,70,229,0.40); |
| --ovc-line-cyan: rgba(6,182,212,0.18); |
| --ovc-surface-tint: rgba(248,250,252,0.74); |
| --ovc-surface-accent: rgba(79,70,229,0.035); |
| --ovc-shadow-soft: 0 1px 2px rgba(15,23,42,0.04), 0 10px 30px rgba(15,23,42,0.03); |
| --ovc-shadow-accent: 0 8px 28px rgba(79,70,229,0.08); |
| } |
| .gradio-container { |
| max-width: 1480px !important; |
| margin: 0 auto !important; |
| padding-left: 10px !important; |
| padding-right: 10px !important; |
| } |
| .ovc-main { |
| gap: 18px !important; |
| align-items: flex-start !important; |
| } |
| .ovc-bottom { |
| gap: 16px !important; |
| align-items: stretch !important; |
| } |
| @keyframes ovc-shift { |
| 0% { background-position: 0% 50%; } |
| 50% { background-position: 100% 50%; } |
| 100% { background-position: 0% 50%; } |
| } |
| @keyframes ovc-pulse { |
| 0%, 100% { box-shadow: 0 6px 18px rgba(37, 99, 235, 0.32); } |
| 50% { box-shadow: 0 8px 26px rgba(37, 99, 235, 0.50); } |
| } |
| @keyframes ovc-fade-in { |
| from { opacity: 0; transform: translateY(4px); } |
| to { opacity: 1; transform: translateY(0); } |
| } |
| |
| /* Hero */ |
| #ovc-hero { |
| text-align: center; |
| padding: 44px 16px 22px; |
| border-radius: 22px; |
| background: |
| radial-gradient(120% 80% at 50% -10%, rgba(79,70,229,0.20), transparent 60%), |
| linear-gradient(180deg, rgba(79,70,229,0.06), rgba(6,182,212,0.03)), |
| repeating-linear-gradient(0deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px), |
| repeating-linear-gradient(90deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px); |
| border: 1px solid var(--ovc-line-accent-soft); |
| box-shadow: 0 10px 30px rgba(15,23,42,0.04); |
| margin-bottom: 18px; |
| position: relative; |
| overflow: hidden; |
| } |
| #ovc-hero::after { |
| content: ""; |
| position: absolute; inset: auto -20% -40% -20%; |
| height: 60%; |
| background: radial-gradient(60% 80% at 50% 0%, rgba(6,182,212,0.22), transparent 70%); |
| pointer-events: none; |
| } |
| #ovc-hero h1 { |
| font-size: 2.7rem; |
| font-weight: 800; |
| background: var(--ovc-grad); |
| background-size: 200% 200%; |
| animation: ovc-shift 9s ease-in-out infinite; |
| -webkit-background-clip: text; |
| background-clip: text; |
| color: transparent; |
| margin: 0 0 6px; |
| letter-spacing: -0.028em; |
| line-height: 1.04; |
| } |
| #ovc-hero p.tagline { |
| font-size: 1.05rem; |
| color: var(--body-text-color-subdued); |
| margin: 0 auto 16px; |
| max-width: 760px; |
| line-height: 1.6; |
| } |
| .ovc-links { |
| display: flex; flex-wrap: wrap; gap: 10px; |
| justify-content: center; margin: 14px auto 6px; |
| position: relative; z-index: 1; |
| } |
| .ovc-links a { |
| text-decoration: none; |
| font-weight: 600; |
| font-size: 0.9rem; |
| padding: 7px 14px; |
| border-radius: 999px; |
| background: var(--background-fill-primary, #fff); |
| border: 1px solid var(--ovc-line-accent-soft); |
| color: #4338ca; |
| transition: transform 0.12s ease, box-shadow 0.18s ease, |
| background 0.18s ease, color 0.18s ease, border-color 0.18s ease; |
| display: inline-flex; align-items: center; |
| box-shadow: 0 1px 2px rgba(15,23,42,0.04); |
| } |
| .ovc-links a:hover { |
| background: var(--ovc-grad); |
| color: #fff; |
| border-color: transparent; |
| transform: translateY(-1px); |
| box-shadow: 0 8px 20px rgba(79,70,229,0.18); |
| } |
| .gradio-container.dark .ovc-links a { |
| background: rgba(30,41,59,0.7); |
| color: #c7d2fe; |
| border-color: var(--ovc-line-accent-strong); |
| } |
| |
| /* Cards */ |
| .ovc-card { |
| border-radius: 16px !important; |
| padding: 16px 18px !important; |
| border: 1px solid var(--ovc-line-soft) !important; |
| background: var(--background-fill-primary) !important; |
| box-shadow: var(--ovc-shadow-soft); |
| transition: box-shadow 0.18s ease, border-color 0.18s ease, transform 0.18s ease; |
| animation: ovc-fade-in 0.32s ease-out; |
| } |
| .ovc-card:hover { |
| border-color: var(--ovc-line) !important; |
| box-shadow: 0 8px 24px rgba(15,23,42,0.06); |
| } |
| .ovc-card + .ovc-card { |
| margin-top: 2px; |
| } |
| /* Primary outputs: subtle accent ring + lift */ |
| .ovc-card-primary { |
| border: 1px solid var(--ovc-line-accent-soft) !important; |
| background: |
| linear-gradient(180deg, rgba(79,70,229,0.028), rgba(6,182,212,0.014)), |
| var(--background-fill-primary) !important; |
| box-shadow: |
| inset 0 0 0 1px rgba(255,255,255,0.65), |
| var(--ovc-shadow-accent) !important; |
| } |
| .ovc-card-primary:hover { |
| border-color: var(--ovc-line-accent) !important; |
| box-shadow: |
| inset 0 0 0 1px rgba(255,255,255,0.72), |
| 0 10px 32px rgba(79,70,229,0.10) !important; |
| } |
| .ovc-card h3 { |
| display: inline-flex; |
| align-items: center; |
| gap: 8px; |
| font-size: 0.74rem !important; |
| font-weight: 700 !important; |
| text-transform: uppercase; |
| letter-spacing: 0.10em; |
| color: #3730a3 !important; |
| background: rgba(79,70,229,0.06); |
| border: 1px solid rgba(79,70,229,0.10); |
| padding: 4px 10px !important; |
| border-radius: 999px; |
| margin: 0 0 12px !important; |
| } |
| .ovc-card h3::before { |
| content: ""; |
| display: inline-block; |
| width: 6px; height: 6px; border-radius: 50%; |
| background: var(--ovc-grad); |
| transform: translateY(0); |
| } |
| |
| /* Run button */ |
| #ovc-run button { |
| width: 100%; |
| height: 54px !important; |
| font-size: 1.06rem !important; |
| font-weight: 700 !important; |
| letter-spacing: 0.01em; |
| background: var(--ovc-grad) !important; |
| background-size: 200% 200% !important; |
| animation: ovc-shift 6s ease-in-out infinite, ovc-pulse 2.6s ease-in-out infinite; |
| border: none !important; |
| color: #fff !important; |
| border-radius: 14px !important; |
| transition: transform 0.06s ease; |
| } |
| #ovc-run button:hover { |
| transform: translateY(-1px); |
| animation-play-state: paused; |
| } |
| #ovc-run button:active { transform: translateY(0); } |
| |
| /* Preset buttons */ |
| .ovc-preset button { |
| background: var(--ovc-grad-soft) !important; |
| color: #4338ca !important; |
| border: 1px solid rgba(79,70,229,0.16) !important; |
| border-radius: 10px !important; |
| font-weight: 600 !important; |
| transition: all 0.15s ease; |
| } |
| .ovc-preset button:hover { |
| background: var(--ovc-grad) !important; |
| color: #fff !important; |
| border-color: transparent !important; |
| } |
| |
| /* Footer */ |
| #ovc-footer { |
| text-align: center; |
| color: var(--body-text-color-subdued); |
| font-size: 0.80rem; |
| padding: 22px 8px 10px; |
| margin-top: 14px; |
| border-top: 1px solid rgba(100,116,139,0.22); |
| } |
| #ovc-footer code { |
| background: rgba(79,70,229,0.08); |
| padding: 1px 6px; |
| border-radius: 4px; |
| } |
| |
| /* Tighter spacing for sliders inside cards */ |
| .ovc-card .gradio-slider { margin-bottom: 4px !important; } |
| .ovc-card .gradio-number, |
| .ovc-card .gradio-radio, |
| .ovc-card .gradio-checkbox, |
| .ovc-card .gradio-code, |
| .ovc-card .gradio-gallery, |
| .ovc-card .gradio-video, |
| .ovc-card .gradio-plot { |
| width: 100% !important; |
| } |
| |
| /* Tame Gradio's dark default placeholders inside our cards: blanket-override |
| any background on the inner wrappers, then paint a brand-tinted gradient on |
| the canonical containers. This lights up the empty Video/Image/Plot zones |
| so they no longer look like black holes. */ |
| .ovc-card .video-container, |
| .ovc-card .image-container, |
| .ovc-card .image-frame, |
| .ovc-card .preview, |
| .ovc-card .plot-container, |
| .ovc-card .empty, |
| .ovc-card video, |
| .ovc-card [data-testid="video"], |
| .ovc-card [data-testid="image"], |
| .ovc-card .icon-button, |
| .ovc-card .options, |
| .ovc-card .source-selection, |
| .ovc-card .upload-container { |
| background: transparent !important; |
| background-color: transparent !important; |
| } |
| .ovc-card .container, |
| .ovc-card .wrap, |
| .ovc-card .block, |
| .ovc-card fieldset, |
| .ovc-card [data-testid="block"], |
| .ovc-card [data-testid="video"], |
| .ovc-card [data-testid="image"], |
| .ovc-card [data-testid="plot"], |
| .ovc-card [data-testid="file-upload"], |
| .ovc-card [data-testid="upload"], |
| .ovc-card .video-container, |
| .ovc-card .image-container, |
| .ovc-card .plot-container { |
| border-radius: 12px !important; |
| } |
| .ovc-card .block, |
| .ovc-card fieldset, |
| .ovc-card [data-testid="block"], |
| .ovc-card [data-testid="video"], |
| .ovc-card [data-testid="image"], |
| .ovc-card [data-testid="plot"], |
| .ovc-card [data-testid="file-upload"], |
| .ovc-card [data-testid="upload"], |
| .ovc-card .wrap, |
| .ovc-card .container { |
| border: 1px solid transparent !important; |
| box-shadow: none !important; |
| outline: none !important; |
| background: transparent !important; |
| } |
| .ovc-card .video-container, |
| .ovc-card .image-container, |
| .ovc-card .plot-container, |
| .ovc-card-primary .video-container, |
| .ovc-card-primary .image-container, |
| .ovc-card-primary .plot-container { |
| background: |
| linear-gradient(180deg, rgba(79,70,229,0.028), rgba(6,182,212,0.018)), |
| var(--ovc-surface-tint) !important; |
| border: 1px solid rgba(79,70,229,0.18) !important; |
| box-shadow: inset 0 1px 0 rgba(255,255,255,0.68) !important; |
| } |
| .ovc-card .upload-container, |
| .ovc-card .video-container, |
| .ovc-card .image-container, |
| .ovc-card .plot-container { |
| width: 100% !important; |
| min-width: 0 !important; |
| } |
| .ovc-card .upload-container, |
| .ovc-card [data-testid="file-upload"] { |
| min-height: 220px !important; |
| background: |
| linear-gradient(180deg, rgba(79,70,229,0.03), rgba(6,182,212,0.02)), |
| rgba(248,250,252,0.72) !important; |
| border: 1px solid rgba(79,70,229,0.16) !important; |
| } |
| .ovc-card .plot-container, |
| .ovc-card [data-testid="plot"] { |
| min-height: 300px !important; |
| } |
| .ovc-card-primary .video-container, |
| .ovc-card [data-testid="video"] { |
| min-height: 260px !important; |
| } |
| .ovc-card .gradio-video, .ovc-card .gradio-image, .ovc-card .gradio-plot { |
| border-color: transparent !important; |
| background: transparent !important; |
| box-shadow: none !important; |
| outline: none !important; |
| } |
| .ovc-card video, |
| .ovc-card img, |
| .ovc-card canvas, |
| .ovc-card svg { |
| border: none !important; |
| outline: none !important; |
| box-shadow: none !important; |
| } |
| /* Empty placeholder text inside Gradio components */ |
| .ovc-card .empty, .ovc-card .empty p, .ovc-card .empty span { |
| color: #94a3b8 !important; |
| } |
| |
| /* Stats tile grid (rendered into a gr.HTML by render_stats_html) */ |
| .ovc-stats { |
| display: grid; |
| grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); |
| gap: 10px; |
| } |
| .ovc-stat { |
| padding: 12px 14px; |
| border-radius: 14px; |
| background: linear-gradient(135deg, rgba(79,70,229,0.055), rgba(6,182,212,0.03)); |
| border: 1px solid rgba(79,70,229,0.12); |
| transition: transform 0.18s ease, box-shadow 0.18s ease; |
| } |
| .ovc-stat:hover { |
| transform: translateY(-1px); |
| box-shadow: 0 6px 18px rgba(79,70,229,0.10); |
| } |
| .ovc-stat .value { |
| font-size: 1.55rem; font-weight: 800; |
| background: var(--ovc-grad); |
| -webkit-background-clip: text; background-clip: text; color: transparent; |
| letter-spacing: -0.02em; |
| line-height: 1.1; |
| word-break: break-word; |
| } |
| .ovc-stat .label { |
| font-size: 0.74rem; color: #64748b; |
| text-transform: uppercase; letter-spacing: 0.06em; |
| margin-top: 4px; |
| font-weight: 600; |
| } |
| |
| /* βββ Mobile / narrow viewport adjustments βββββββββββββββββββββββββββ */ |
| @media (max-width: 768px) { |
| .gradio-container { padding: 6px !important; } |
| |
| /* Force the controls/outputs row to stack vertically on phones */ |
| .gradio-container .ovc-main { |
| flex-direction: column !important; |
| gap: 12px !important; |
| } |
| .gradio-container .ovc-bottom { |
| flex-direction: column !important; |
| gap: 12px !important; |
| } |
| .gradio-container .ovc-main > div { |
| width: 100% !important; |
| min-width: 0 !important; |
| max-width: 100% !important; |
| flex: 1 1 100% !important; |
| } |
| |
| /* Hero scales down */ |
| #ovc-hero { padding: 28px 14px 16px; border-radius: 16px; margin-bottom: 12px; } |
| #ovc-hero h1 { font-size: 2.05rem; letter-spacing: -0.02em; } |
| #ovc-hero p.tagline { font-size: 0.96rem; line-height: 1.5; margin-bottom: 12px; } |
| .ovc-links { gap: 6px; margin-top: 10px; } |
| .ovc-links a { font-size: 0.78rem; padding: 5px 10px; } |
| /* Cards tighter */ |
| .ovc-card { padding: 12px 14px !important; border-radius: 14px !important; } |
| .ovc-card h3 { font-size: 0.70rem !important; margin-bottom: 8px !important; } |
| |
| /* Run button */ |
| #ovc-run button { height: 48px !important; font-size: 0.98rem !important; } |
| |
| /* Stats tile sizing */ |
| .ovc-stats { grid-template-columns: repeat(auto-fit, minmax(115px, 1fr)); gap: 8px; } |
| .ovc-stat { padding: 10px 12px; } |
| .ovc-stat .value { font-size: 1.25rem; } |
| .ovc-stat .label { font-size: 0.68rem; } |
| |
| /* Outputs: shorter video so it does not dominate the screen */ |
| .ovc-card video { max-height: 280px !important; } |
| .ovc-card .upload-container, |
| .ovc-card [data-testid="file-upload"] { |
| min-height: 180px !important; |
| } |
| } |
| |
| @media (max-width: 480px) { |
| #ovc-hero { padding: 22px 12px 14px; } |
| #ovc-hero h1 { font-size: 1.7rem; } |
| #ovc-hero p.tagline { font-size: 0.9rem; } |
| /* Put each link on a row of two (browsers will pack 2 per row at this size) */ |
| .ovc-links a { font-size: 0.74rem; padding: 4px 9px; } |
| #ovc-run button { height: 46px !important; font-size: 0.94rem !important; } |
| } |
| """ |
|
|
| THEME = gr.themes.Soft( |
| primary_hue="indigo", |
| secondary_hue="blue", |
| neutral_hue="slate", |
| font=[gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"], |
| ).set( |
| body_background_fill="*neutral_50", |
| block_radius="14px", |
| button_primary_background_fill="*primary_500", |
| button_primary_background_fill_hover="*primary_600", |
| ) |
|
|
| HERO_HTML = """ |
| <div id="ovc-hero"> |
| <h1>OneVision Encoder</h1> |
| <p class="tagline"> |
| Codec-style patch saliency for video understanding — see which |
| patches the encoder picks from your video and pack them into the |
| canvas LLaVA-OneVision consumes. |
| </p> |
| <div class="ovc-links"> |
| <a href="https://www.lmms-lab.com/onevision-encoder/index.html" target="_blank" rel="noopener">π Homepage</a> |
| <a href="https://huggingface.co/collections/lmms-lab-encoder/onevision-encoder" target="_blank" rel="noopener">π€ Models</a> |
| <a href="https://arxiv.org/abs/2602.08683" target="_blank" rel="noopener">π Tech Report</a> |
| <a href="docs/model_card.md" target="_blank" rel="noopener">π Model Card</a> |
| <a href="docs/data_card.md" target="_blank" rel="noopener">π Data Card</a> |
| </div> |
| </div> |
| """ |
|
|
| try: |
| _GR_MAJOR = int(gr.__version__.split(".")[0]) |
| except Exception: |
| _GR_MAJOR = 4 |
| _BLOCK_KW: dict = {"title": "OneVision Encoder"} |
| _LAUNCH_KW: dict = {} |
| if _GR_MAJOR >= 6: |
| |
| _LAUNCH_KW["theme"] = THEME |
| _LAUNCH_KW["css"] = CUSTOM_CSS |
| else: |
| _BLOCK_KW["theme"] = THEME |
| _BLOCK_KW["css"] = CUSTOM_CSS |
|
|
|
|
| VIZ_CHOICES = [ |
| ("Selection β kept patches in color, others fade to gray-white", "selection"), |
| ("Heatmap β full-frame JET overlay (blue=low, red=high)", "heatmap"), |
| ("Both", "sbs"), |
| ] |
| SIGNAL_CHOICES = [ |
| ("Gradient β intra-frame Sobel (sharp edges, textures, text)", "gradient"), |
| ("Frame diff β inter-frame motion (movers, action)", "frame_diff"), |
| ("Combined β 0.5Β·gradient + 0.5Β·frame_diff (general purpose)", "combined"), |
| ] |
|
|
|
|
|
|
|
|
| with gr.Blocks(**_BLOCK_KW) as demo: |
| gr.HTML(HERO_HTML) |
|
|
| with gr.Row(equal_height=False, elem_classes="ovc-main"): |
| |
| with gr.Column(scale=4, min_width=360): |
| with gr.Group(elem_classes="ovc-card"): |
| gr.Markdown("### Input") |
| video_in = gr.Video(label="Video", sources=["upload"], height=260) |
| with gr.Row(elem_classes="ovc-preset"): |
| btn_demo = gr.Button( |
| "Load demo video", size="sm", |
| visible=os.path.exists(DEMO_VIDEO_PATH), |
| ) |
|
|
| with gr.Group(elem_classes="ovc-card"): |
| gr.Markdown("### Pipeline") |
| viz_mode = gr.Radio( |
| VIZ_CHOICES, value="selection", |
| label="Visualization mode", |
| ) |
| sample_frames = gr.Slider( |
| 4, 64, value=32, step=1, label="Sampled frames", |
| ) |
| top_k = gr.Slider( |
| 16, 16384, value=1024, step=16, |
| label="Total patches budget (whole video)", |
| info="The single budget shared across the whole video. " |
| "The uniform full-frame baseline will fit as many " |
| "complete frames as this budget allows, where one full " |
| "frame costs patch_size^2 patches; the codec path spends " |
| "the same budget on saliency-selected patches.", |
| ) |
| patch_size = gr.Radio( |
| PATCH_CHOICES, value=14, label="Patch size (px)", |
| ) |
| gop = gr.Radio( |
| [ |
| ("GOP = 4 β fixed 4-frame groups", "4"), |
| ("GOP = 8 β fixed 8-frame groups", "8"), |
| ("GOP = 16 β fixed 16-frame groups", "16"), |
| ("Codec-stream: adaptive groups by saliency energy", "dynamic"), |
| ], |
| value="8", |
| label="GOP (group of pictures)", |
| info="Splits sampled frames into GOP groups. Each group " |
| "produces exactly one GOP canvas: the group's first " |
| "frame stays whole as the I-frame, and each later " |
| "frame gets its own P section below it. So GOP=4 " |
| "means each group is I P P P. For fixed GOP=N, the " |
| "number of packed canvases is ceil(sampled_frames / N). " |
| "Example: 32 sampled frames with GOP=4 gives 8 " |
| "canvases; 32 sampled frames with GOP=8 gives 4. " |
| "Codec-stream mode adaptively groups by saliency " |
| "energy, targeting roughly 8-64 sampled frames per group.", |
| ) |
|
|
| with gr.Accordion("Time window", open=False): |
| with gr.Row(): |
| start_sec = gr.Number(value=0.0, precision=2, label="Start (s)") |
| end_sec = gr.Number(value=0.0, precision=2, label="End (s)") |
| gr.Markdown( |
| "<small>Set both to 0 to use the full video.</small>", |
| ) |
|
|
| with gr.Accordion("Saliency", open=False): |
| saliency_signal = gr.Radio( |
| SIGNAL_CHOICES, value="gradient", |
| label="Scoring signal", |
| ) |
| score_log_scale = gr.Checkbox( |
| value=False, |
| label="Apply log1p to scores", |
| info="Compresses dynamic range β brings up mid-energy patches.", |
| ) |
| bitcost_pct = gr.Slider( |
| 80.0, 99.9, value=99.0, step=0.1, |
| label="Heatmap normalization percentile", |
| info="Higher = harder to saturate red; lower = more vivid.", |
| ) |
|
|
| with gr.Accordion("Visual style", open=False): |
| heatmap_alpha = gr.Slider( |
| 0.1, 0.9, value=0.55, step=0.05, |
| label="Heatmap blend Ξ±", |
| ) |
| fade_strength = gr.Slider( |
| 0.0, 0.9, value=0.55, step=0.05, |
| label="Selection fade strength", |
| ) |
| max_pixels = gr.Slider( |
| 40000, 400000, value=150000, step=10000, |
| label="Max pixels per frame", |
| ) |
|
|
| with gr.Row(elem_id="ovc-run"): |
| run_btn = gr.Button("Run pipeline", variant="primary") |
|
|
| |
| with gr.Column(scale=7, min_width=560): |
| with gr.Group(elem_classes="ovc-card ovc-card-primary"): |
| gr.Markdown("### Patch selection visualization") |
| vis_out = gr.Video( |
| label="", show_label=False, autoplay=True, height=460, |
| ) |
|
|
| with gr.Group(elem_classes="ovc-card ovc-card-primary"): |
| gr.Markdown("### Cumulative patches over time") |
| gr.Markdown( |
| "<small><b>Indigo</b>: codec method β selects patches " |
| "within frames according to saliency, so the curve rises " |
| "in bursts. <b>Cyan (dashed)</b>: uniform full-frame " |
| "sampling β evenly samples complete frames from the same " |
| "time window, targeting the same sampled-frame count as " |
| "codec when the budget allows. Each step costs " |
| "<b>patch_size^2</b> patches, regardless of the preview " |
| "frame resolution. The dotted line marks the requested " |
| "budget.</small>" |
| ) |
| chart_out = gr.Plot(label="", show_label=False) |
|
|
| with gr.Row(equal_height=False, elem_classes="ovc-bottom"): |
| with gr.Column(scale=7, min_width=420): |
| with gr.Group(elem_classes="ovc-card"): |
| gr.Markdown("### Packed canvases (one per GOP group)") |
| gr.Markdown( |
| "<small>Each canvas is one GOP group rendered in " |
| "<b>I/P structure</b>: the group's first frame is " |
| "the <b>I-frame</b> kept whole on top, and each " |
| "later frame gets its own packed <b>P-frame</b> " |
| "section below in time order. Fixed GOP=N means " |
| "<b>one canvas per N sampled frames</b>.</small>" |
| ) |
| canvas_out = gr.Gallery( |
| label="", show_label=False, |
| columns=2, rows=2, height=520, |
| object_fit="contain", |
| preview=True, |
| ) |
| with gr.Column(scale=5, min_width=340): |
| with gr.Group(elem_classes="ovc-card"): |
| gr.Markdown("### Raw JSON") |
| gr.Markdown( |
| "<small>Full reproducible record of this run " |
| "(params, frame ids, group spans). Collapsed by " |
| "default β click to expand.</small>" |
| ) |
| with gr.Accordion("Show full JSON", open=False): |
| info_out = gr.Code( |
| label="", language="json", lines=22, |
| ) |
|
|
| gr.HTML( |
| '<div id="ovc-footer">' |
| '<b>OneVision Encoder</b> Β· codec-style patch saliency demo Β· ' |
| 'Sobel + frame-diff stand in for the ffmpeg bitcost patch Β· ' |
| 'GOP-aware top-K patch selection with one IPPP canvas per group.' |
| '</div>' |
| ) |
|
|
| run_btn.click( |
| process, |
| inputs=[ |
| video_in, sample_frames, patch_size, top_k, max_pixels, |
| viz_mode, heatmap_alpha, |
| start_sec, end_sec, |
| saliency_signal, score_log_scale, bitcost_pct, fade_strength, |
| gop, |
| ], |
| outputs=[vis_out, canvas_out, info_out, chart_out], |
| ) |
|
|
| btn_demo.click( |
| lambda: DEMO_PRESET, |
| inputs=None, |
| outputs=[ |
| video_in, sample_frames, patch_size, top_k, max_pixels, |
| viz_mode, heatmap_alpha, start_sec, end_sec, |
| saliency_signal, score_log_scale, bitcost_pct, fade_strength, |
| gop, |
| ], |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=int(os.environ.get("PORT", 7860)), |
| **_LAUNCH_KW, |
| ) |
|
|