FeilongTang's picture
Increase cumulative chart resolution
46cbd59
"""OneVision Encoder Codec View.
A simplified, dependency-light port of the codec_tools pipeline from
lmms-eval-ov2. The original tool relies on a bitcost-patched ffmpeg 5.1 to
score every macroblock by its actual encoding bit cost; we approximate that
saliency signal with a Sobel gradient magnitude per patch (high gradient =
high local complexity = roughly what the encoder would spend bits on).
Pipeline (mirrors codec_tools/pipeline/process_video_bitcost_readiness.py):
1. Uniformly sample N frames from the input video.
2. smart_resize each frame so dims are multiples of `patch` and the
total pixel count <= max_pixels.
3. Slice every frame into a patch grid; score each patch by its
Sobel gradient magnitude mean.
4. Pick the top-K highest-scoring patches under the selected GOP
grouping.
5. Render a "selection visualization" video: kept patches stay in
full color, dropped patches are faded to a gray-white wash so the
viewer can see exactly which patches the codec stage chose.
6. Pack one canvas per GOP group: the first frame of each group is
kept whole as the I-frame, and later frames contribute only their
selected patches packed below it in time order.
"""
import json
import math
import os
import shutil
import subprocess
import tempfile
import time
from typing import List, Tuple
import cv2
import gradio as gr
import imageio_ffmpeg
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
PATCH_CHOICES = [14, 16, 28]
PATCH_OUTLINE_OUTER_BGR = (42, 23, 15) # dark slate
PATCH_OUTLINE_INNER_BGR = (11, 158, 245) # amber
DEMO_VIDEO_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"examples", "demo_codec_heatmap.mp4",
)
DEMO_PRESET = (
DEMO_VIDEO_PATH, # video_in
32, # sample_frames
14, # patch_size
1024, # total_patches
150000, # max_pixels
"sbs", # viz_mode
0.55, # heatmap_alpha
0.0, 0.0, # start_sec, end_sec
"combined", # saliency_signal
True, # score_log_scale
96.0, # bitcost_pct
0.55, # fade_strength
"dynamic", # gop
)
def smart_resize(frame: np.ndarray, max_pixels: int, factor: int) -> np.ndarray:
"""Resize so h,w are multiples of `factor` and h*w <= max_pixels."""
h, w = frame.shape[:2]
pixels = h * w
if pixels > max_pixels:
scale = math.sqrt(max_pixels / pixels)
h = max(factor, int(h * scale))
w = max(factor, int(w * scale))
h = max(factor, (h // factor) * factor)
w = max(factor, (w // factor) * factor)
return cv2.resize(frame, (w, h), interpolation=cv2.INTER_AREA)
def sample_frame_ids(total: int, n: int) -> List[int]:
if total <= 0:
return []
if n >= total:
return list(range(total))
return [int(round(i)) for i in np.linspace(0, total - 1, n)]
def split_budget_evenly(total_k: int, n_parts: int) -> List[int]:
total = max(0, int(total_k))
n = max(0, int(n_parts))
if n == 0:
return []
base, rem = divmod(total, n)
return [base + (1 if i < rem else 0) for i in range(n)]
def sample_window_frame_ids(start: int, end: int, n: int) -> List[int]:
start_i = int(start)
end_i = int(end)
count = max(0, int(n))
if end_i < start_i or count <= 0:
return []
total = end_i - start_i + 1
if count >= total:
return list(range(start_i, end_i + 1))
return [start_i + x for x in sample_frame_ids(total, count)]
def decode_frames(video_path: str, frame_ids: List[int]) -> List[np.ndarray]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return []
frames: List[np.ndarray] = []
for fid in frame_ids:
cap.set(cv2.CAP_PROP_POS_FRAMES, int(fid))
ok, fr = cap.read()
if ok:
frames.append(fr)
cap.release()
return frames
def video_metadata(video_path: str) -> dict:
cap = cv2.VideoCapture(video_path)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
meta = {
"total_frames": total,
"fps": round(fps, 3),
"width": w,
"height": h,
}
if shutil.which("ffprobe"):
try:
r = subprocess.run(
[
"ffprobe", "-v", "quiet", "-select_streams", "v:0",
"-show_entries", "stream=codec_name,bit_rate,pix_fmt,profile",
"-of", "json", video_path,
],
capture_output=True, text=True, check=True, timeout=15,
)
data = json.loads(r.stdout).get("streams", [{}])[0]
meta["codec"] = data.get("codec_name")
meta["pix_fmt"] = data.get("pix_fmt")
meta["profile"] = data.get("profile")
meta["bitrate_bps"] = data.get("bit_rate")
except Exception as e:
meta["ffprobe_error"] = str(e)
return meta
def patch_score_grid(frame_bgr: np.ndarray, patch: int) -> np.ndarray:
"""Return [hb, wb] grid of Sobel gradient magnitude means per patch."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY).astype(np.float32)
gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
mag = np.sqrt(gx * gx + gy * gy)
h, w = mag.shape
hb, wb = h // patch, w // patch
mag = mag[: hb * patch, : wb * patch]
grid = mag.reshape(hb, patch, wb, patch).mean(axis=(1, 3))
return grid.astype(np.float32)
def patch_score_frame_diff(
prev_bgr: np.ndarray, cur_bgr: np.ndarray, patch: int,
) -> np.ndarray:
"""Inter-frame absdiff per patch β€” proxy for motion / temporal complexity."""
if prev_bgr is None or prev_bgr.shape != cur_bgr.shape:
return patch_score_grid(cur_bgr, patch)
diff = cv2.absdiff(prev_bgr, cur_bgr).mean(axis=2).astype(np.float32)
h, w = diff.shape
hb, wb = h // patch, w // patch
diff = diff[: hb * patch, : wb * patch]
return diff.reshape(hb, patch, wb, patch).mean(axis=(1, 3))
def compute_score_grids(
frames: List[np.ndarray], patch: int, signal: str,
) -> List[np.ndarray]:
"""Build per-frame patch score grids from one of three signals:
- 'gradient' β€” Sobel magnitude only (intra-frame complexity)
- 'frame_diff' β€” absdiff vs previous frame (temporal motion)
- 'combined' β€” 0.5 * gradient_norm + 0.5 * frame_diff_norm
For 'combined', each component is independently shifted to [0,1] across
the whole sample so they contribute on equal footing."""
sig = (signal or "gradient").lower()
if sig == "gradient":
return [patch_score_grid(f, patch) for f in frames]
if sig == "frame_diff":
out = []
prev = None
for f in frames:
out.append(patch_score_frame_diff(prev, f, patch))
prev = f
return out
# combined
g = np.stack([patch_score_grid(f, patch) for f in frames], axis=0)
d_list = []
prev = None
for f in frames:
d_list.append(patch_score_frame_diff(prev, f, patch))
prev = f
d = np.stack(d_list, axis=0)
def _norm01(a: np.ndarray) -> np.ndarray:
a = a.astype(np.float32) - a.min()
m = a.max()
return a / m if m > 1e-8 else a
combined = 0.5 * _norm01(g) + 0.5 * _norm01(d)
return [combined[i] for i in range(combined.shape[0])]
def topk_mask(score: np.ndarray, k: int) -> np.ndarray:
"""Per-frame top-K mask (legacy helper, no longer used by process())."""
flat = score.flatten()
if k >= flat.size:
return np.ones_like(score, dtype=np.uint8)
if k <= 0:
return np.zeros_like(score, dtype=np.uint8)
out = np.zeros(flat.size, dtype=np.uint8)
keep_idx = np.argpartition(flat, -k)[-k:]
out[keep_idx] = 1
return out.reshape(score.shape)
def global_topk_masks(
grids: List[np.ndarray], total_k: int,
) -> Tuple[List[np.ndarray], int]:
"""Pick the top `total_k` highest-scoring patches GLOBALLY across all
sampled frames, return one mask per frame plus the actual count.
Some frames may end up with zero patches (low energy throughout) while
others may contribute many β€” that's the whole point: the codec-style
saliency lets the budget concentrate where it matters."""
if not grids:
return [], 0
arr = np.stack(grids, axis=0).astype(np.float32) # [N, hb, wb]
N, hb, wb = arr.shape
flat = arr.reshape(-1)
k = int(total_k)
if k >= flat.size:
masks = [np.ones((hb, wb), dtype=np.uint8) for _ in range(N)]
return masks, int(flat.size)
if k <= 0:
return [np.zeros((hb, wb), dtype=np.uint8) for _ in range(N)], 0
mask_flat = np.zeros(flat.size, dtype=np.uint8)
keep_idx = np.argpartition(flat, -k)[-k:]
mask_flat[keep_idx] = 1
bool_mask = mask_flat.reshape(N, hb, wb)
return [bool_mask[i].astype(np.uint8) for i in range(N)], k
def build_dynamic_groups(
grids: List[np.ndarray],
min_group_frames: int = 8,
max_group_frames: int = 64,
preferred_group_frames: int = 32,
) -> List[Tuple[int, int]]:
"""Adaptive temporal grouping by cumulative saliency energy.
Groups are energy-adaptive, but constrained to a practical codec-stream
range: by default each group spans roughly 8-64 sampled frames, with a
preference around 32 frames/group. Each group later becomes exactly one
IPPP canvas whose first frame is kept whole as the I-frame."""
n = len(grids)
if n == 0:
return []
min_len = max(1, int(min_group_frames))
max_len = max(min_len, int(max_group_frames))
preferred = min(max_len, max(min_len, int(preferred_group_frames)))
if n <= max_len:
return [(0, n - 1)]
min_groups = max(1, math.ceil(n / max_len))
max_groups = max(1, n // min_len)
target_groups = max(1, math.ceil(n / preferred))
target_groups = min(max(target_groups, min_groups), max_groups)
if target_groups <= 1:
return [(0, n - 1)]
energies = np.array([float(g.sum()) for g in grids], dtype=np.float64)
total = energies.sum()
if total <= 1e-8:
# Degenerate: pure even split, still respecting the group-size range.
size = max(min_len, min(max_len, math.ceil(n / target_groups)))
groups: List[Tuple[int, int]] = []
cursor = 0
while cursor < n and len(groups) < target_groups:
end = min(n - 1, cursor + size - 1)
if len(groups) == target_groups - 1:
end = n - 1
groups.append((cursor, end))
cursor = end + 1
return groups
target_per_group = total / target_groups
groups = []
start = 0
cum = 0.0
for i in range(n):
cum += energies[i]
group_len = i - start + 1
groups_left = target_groups - len(groups) - 1
frames_left_after = n - i - 1
min_room_ok = frames_left_after >= groups_left * min_len
threshold_hit = cum >= target_per_group and group_len >= min_len
force_close = group_len >= max_len
if len(groups) < target_groups - 1 and min_room_ok and (threshold_hit or force_close):
groups.append((start, i))
start = i + 1
cum = 0.0
if start <= n - 1:
groups.append((start, n - 1))
return groups
def grouped_topk_masks(
grids: List[np.ndarray], total_k: int, gop: str,
) -> Tuple[List[np.ndarray], int, List[Tuple[int, int]], str]:
"""Select patches under a GOP grouping strategy.
GOP modes:
- "global": one big group across the whole video β€” top-K global.
- "<int>" (e.g. "4"/"8"/"16"): fixed group size in frames; the
budget is split equally across groups, top-K picked within each.
- "dynamic": codec-stream-style adaptive groups (see
build_dynamic_groups), defaulting to roughly 8-64 frames/group.
Returns (per-frame masks, actual selected count, [(start,end),...] groups, resolved_label).
"""
n = len(grids)
if n == 0:
return [], 0, [], gop
mode = (gop or "global").strip().lower()
if mode in ("global", "none", "0", ""):
masks, actual = global_topk_masks(grids, int(total_k))
return masks, actual, [(0, n - 1)], "global"
if mode == "dynamic":
groups = build_dynamic_groups(grids)
resolved_label = "codec-stream"
else:
try:
g_size = max(1, int(mode))
except ValueError:
g_size = n
groups = []
cursor = 0
while cursor < n:
end = min(n - 1, cursor + g_size - 1)
groups.append((cursor, end))
cursor = end + 1
resolved_label = mode
num_groups = max(1, len(groups))
target_k = max(0, int(total_k))
capacities = [
sum(int(g.size) for g in grids[s:e + 1])
for (s, e) in groups
]
alloc = split_budget_evenly(target_k, num_groups)
leftover = 0
for i, cap in enumerate(capacities):
if alloc[i] > cap:
leftover += alloc[i] - cap
alloc[i] = cap
while leftover > 0:
progressed = False
for i, cap in enumerate(capacities):
if alloc[i] < cap and leftover > 0:
alloc[i] += 1
leftover -= 1
progressed = True
if not progressed:
break
# Initialize empty masks, then fill per-group selections.
out_masks = [np.zeros(g.shape, dtype=np.uint8) for g in grids]
actual_total = 0
for (s, e), group_k in zip(groups, alloc):
sub = grids[s:e + 1]
sub_masks, sub_actual = global_topk_masks(sub, group_k)
for i, sm in enumerate(sub_masks):
out_masks[s + i] = sm
actual_total += sub_actual
return out_masks, actual_total, groups, resolved_label
def faded_background(frame_bgr: np.ndarray, fade: float = 0.55) -> np.ndarray:
"""Convert to gray-white wash: gray * (1-fade) + white * fade."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
gray_bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR).astype(np.float32)
white = np.full_like(gray_bgr, 255.0)
out = gray_bgr * (1.0 - fade) + white * fade
return out.astype(np.uint8)
def overlay_selection(
frame_bgr: np.ndarray, mask_grid: np.ndarray, patch: int,
outline: bool = True, fade: float = 0.55,
) -> np.ndarray:
"""Composite: kept patches keep color; dropped patches become gray-white.
Optionally draw a thin outline around kept patches."""
h, w = frame_bgr.shape[:2]
hb, wb = mask_grid.shape
pix_mask = np.kron(mask_grid, np.ones((patch, patch), dtype=np.uint8))
pix_mask = pix_mask[:h, :w]
bg = faded_background(frame_bgr, fade=float(fade))
keep = pix_mask.astype(bool)[..., None]
out = np.where(keep, frame_bgr, bg)
if outline:
# A two-tone stroke survives browser H.264/yuv420p encoding better
# than a single 1 px saturated line.
outer_thickness = 2 if patch >= 20 else 1
for i in range(hb):
for j in range(wb):
if mask_grid[i, j]:
y0, x0 = i * patch, j * patch
y1, x1 = y0 + patch - 1, x0 + patch - 1
cv2.rectangle(
out, (x0, y0), (x1, y1),
PATCH_OUTLINE_OUTER_BGR, outer_thickness,
lineType=cv2.LINE_AA,
)
if patch >= 6 and (x1 - x0) >= 3 and (y1 - y0) >= 3:
cv2.rectangle(
out, (x0 + 1, y0 + 1), (x1 - 1, y1 - 1),
PATCH_OUTLINE_INNER_BGR, 1,
lineType=cv2.LINE_AA,
)
return out
def _normalize_scores(grids: List[np.ndarray], pct: float = 99.0) -> np.ndarray:
"""Stack into [N, hb, wb], shift by per-video min, divide by global pct.
Using the percentile (instead of max) suppresses outlier patches the same
way codec_tools does with bitcost_pct=99."""
arr = np.stack(grids, axis=0).astype(np.float32)
arr = arr - arr.min()
cap = np.percentile(arr, pct) if arr.size else 1.0
if cap <= 1e-8:
cap = float(arr.max() or 1.0)
arr = np.clip(arr / cap, 0.0, 1.0)
return arr
def overlay_heatmap(
frame_bgr: np.ndarray, score_grid: np.ndarray, patch: int,
alpha: float = 0.55,
) -> np.ndarray:
"""Render a continuous JET heatmap of patch scores blended over the frame.
Low score = blue, high score = red. `score_grid` is in [0, 1]."""
h, w = frame_bgr.shape[:2]
score = (np.clip(score_grid, 0.0, 1.0) * 255.0).astype(np.uint8)
pix = np.kron(score, np.ones((patch, patch), dtype=np.uint8))
pix = pix[:h, :w]
heat = cv2.applyColorMap(pix, cv2.COLORMAP_JET)
out = cv2.addWeighted(frame_bgr, 1.0 - alpha, heat, alpha, 0.0)
return out
def overlay_sbs(
frame_bgr: np.ndarray, mask_grid: np.ndarray, score_grid: np.ndarray,
patch: int, alpha: float = 0.55, fade: float = 0.55,
) -> np.ndarray:
"""Side-by-side: [selection | heatmap] with a thin separator."""
left = overlay_selection(frame_bgr, mask_grid, patch, outline=True, fade=fade)
right = overlay_heatmap(frame_bgr, score_grid, patch, alpha=alpha)
h, w = left.shape[:2]
sep = np.full((h, 4, 3), 30, dtype=np.uint8)
sbs = np.concatenate([left, sep, right], axis=1)
cv2.putText(sbs, "selection", (8, 22), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (255, 255, 255), 2, cv2.LINE_AA)
cv2.putText(sbs, "heatmap", (w + 12, 22), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (255, 255, 255), 2, cv2.LINE_AA)
return sbs
def write_mp4(frames: List[np.ndarray], path: str, fps: float) -> None:
"""Write H.264 mp4 via imageio-ffmpeg's bundled ffmpeg (browser-friendly)."""
if not frames:
raise ValueError("no frames to write")
h, w = frames[0].shape[:2]
ff = imageio_ffmpeg.get_ffmpeg_exe()
cmd = [
ff, "-y", "-loglevel", "error",
"-f", "rawvideo", "-vcodec", "rawvideo",
"-s", f"{w}x{h}", "-pix_fmt", "bgr24",
"-r", f"{fps:.3f}", "-i", "-",
"-an", "-vcodec", "libx264", "-pix_fmt", "yuv420p",
"-preset", "veryfast", "-crf", "23",
"-movflags", "+faststart",
path,
]
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
try:
for f in frames:
if f.shape[0] % 2 or f.shape[1] % 2:
f = f[: f.shape[0] // 2 * 2, : f.shape[1] // 2 * 2]
proc.stdin.write(np.ascontiguousarray(f).tobytes())
proc.stdin.close()
err = proc.stderr.read().decode("utf-8", errors="ignore")
rc = proc.wait()
if rc != 0:
raise RuntimeError(f"ffmpeg failed (rc={rc}): {err}")
finally:
if proc.poll() is None:
proc.kill()
def _build_ippp_canvas(
frames: List[np.ndarray], masks: List[np.ndarray],
i_idx: int, p_range: range, patch: int,
) -> Tuple[np.ndarray, int]:
"""Build one GOP canvas with explicit I/P sections.
Layout:
1. The group's first frame is copied whole as the I-frame.
2. Each later P-frame gets its own packed section below, in time order.
So GOP=4 becomes I|P|P|P, GOP=5 becomes I|P|P|P|P, etc.
Returns (canvas, n_patches) where n_patches is the number of selected
P-frame patches packed under the I-frame."""
i_frame = frames[i_idx]
h, w = i_frame.shape[:2]
hb, wb = h // patch, w // patch
frame_h, frame_w = hb * patch, wb * patch
i_crop = i_frame[:frame_h, :frame_w].copy()
divider_h = 2
p_sections: List[np.ndarray] = []
n_patches = 0
for k in p_range:
if k >= len(frames):
break
f, m = frames[k], masks[k]
packed_patches: List[np.ndarray] = []
for i in range(m.shape[0]):
for j in range(m.shape[1]):
if m[i, j]:
packed_patches.append(
f[
i * patch:(i + 1) * patch,
j * patch:(j + 1) * patch,
].copy()
)
n_patches += len(packed_patches)
packed_rows = max(1, int(math.ceil(len(packed_patches) / max(1, wb))))
packed_h = packed_rows * patch
section_bg = np.full((packed_h, frame_w, 3), 246, dtype=np.uint8)
for idx, tile in enumerate(packed_patches):
row = idx // wb
col = idx % wb
y0 = row * patch
x0 = col * patch
section_bg[y0:y0 + patch, x0:x0 + patch] = tile
p_sections.append(section_bg)
total_h = frame_h + sum(divider_h + sec.shape[0] for sec in p_sections)
canvas = np.full((total_h, frame_w, 3), 250, dtype=np.uint8)
canvas[:frame_h, :frame_w] = i_crop
y = frame_h
for section in p_sections:
canvas[y:y + divider_h, :] = (99, 102, 241)
y += divider_h
sec_h = section.shape[0]
canvas[y:y + sec_h, :frame_w] = section
y += sec_h
return canvas, n_patches
def pack_canvases_per_group(
frames: List[np.ndarray],
masks: List[np.ndarray],
groups: List[Tuple[int, int]],
patch: int,
target_canvases: int = 1,
) -> Tuple[List[np.ndarray], List[Tuple[int, int, int]], int]:
"""Pack exactly one IPPP canvas per GOP group.
Each group's first frame is kept whole as the I-frame, and every
later frame gets its own packed P section below it. `target_canvases`
is kept only for API compatibility and is ignored.
Returns:
canvases β€” list of np.ndarray, length == number of groups.
sub_ranges β€” list of (group_idx, sub_start, sub_end) parallel to
canvases, for caption / debugging.
total_selected β€” I-frame patches (counted as full grid) + P-frame
selected patches across all canvases.
"""
canvases: List[np.ndarray] = []
sub_ranges: List[Tuple[int, int, int]] = []
total_selected = 0
if not groups or not frames:
return [np.full((patch, patch, 3), 255, dtype=np.uint8)], [(0, 0, 0)], 0
for g_idx, (s, e) in enumerate(groups):
if s >= len(frames):
continue
ss, ee = s, e
canvas, n_patches = _build_ippp_canvas(
frames, masks, i_idx=ss, p_range=range(ss + 1, ee + 1),
patch=patch,
)
canvases.append(canvas)
sub_ranges.append((g_idx, ss, ee))
hb, wb = frames[ss].shape[0] // patch, frames[ss].shape[1] // patch
total_selected += hb * wb + n_patches
if not canvases:
canvases = [np.full((patch, patch, 3), 255, dtype=np.uint8)]
sub_ranges = [(0, 0, 0)]
return canvases, sub_ranges, total_selected
def make_charts(
grids: List[np.ndarray],
masks: List[np.ndarray],
codec_frame_ids: List[int],
uniform_frame_ids: List[int],
uniform_requested_frames: int,
uniform_full_frame_patches: int,
fps: float,
total_duration_sec: float,
total_patches_budget: int,
saliency_signal: str,
groups: List[Tuple[int, int]] = None,
gop_label: str = "global",
):
"""One overlaid step chart: cumulative patches selected vs time, for
the codec saliency curve and a uniform full-frame sampling baseline.
X = time (s)
Y = cumulative count of selected patches
The codec curve rises in bursts where saliency is high; the uniform
baseline rises in equal steps because every sampled full frame
contributes `patch_size^2` patches."""
# Use a larger physical canvas plus high DPI so Gradio/HF does not
# upscale a small rasterized plot into a blurry chart.
fig, ax = plt.subplots(
figsize=(11.4, 4.8), dpi=220, constrained_layout=True,
)
fps_safe = float(fps) if fps and fps > 0 else 25.0
if grids:
hb, wb = grids[0].shape
else:
hb = wb = 1
grid_size = hb * wb
all_frame_ids = list(codec_frame_ids) + list(uniform_frame_ids)
duration = float(total_duration_sec) if total_duration_sec and total_duration_sec > 0 else (
(max(all_frame_ids) / fps_safe) if all_frame_ids else 1.0
)
# ─── Build step curves ──────────────────────────────────────────────
def _step(xs, cum):
"""Return (xx, yy) for a left-continuous step plot through (xs, cum)."""
if not xs:
return [0.0, duration], [0.0, 0.0]
xx, yy = [0.0], [0.0]
prev = 0.0
for x, c in zip(xs, cum):
xx.extend([x, x]); yy.extend([prev, c])
prev = c
xx.append(duration); yy.append(prev)
return xx, yy
times = [fid / fps_safe for fid in codec_frame_ids]
counts = [int(m.sum()) for m in masks]
codec_cum = list(np.cumsum(counts)) if counts else []
codec_total = int(codec_cum[-1]) if codec_cum else 0
xx_c, yy_c = _step(times, codec_cum)
# Uniform baseline: evenly sample COMPLETE frames from the same time
# window, no codec saliency involved. Each sampled frame contributes a
# fixed `patch_size^2` patch cost, independent of visualization resize.
budget_int = int(total_patches_budget)
requested_uniform = max(0, int(uniform_requested_frames))
full_frame_patches = max(1, int(uniform_full_frame_patches))
n_uniform = len(uniform_frame_ids)
uni_per_step = [full_frame_patches for _ in uniform_frame_ids]
uni_cum = list(np.cumsum(uni_per_step)) if uni_per_step else []
uni_total = int(uni_cum[-1]) if uni_cum else 0
uni_times = [fid / fps_safe for fid in uniform_frame_ids]
xx_u, yy_u = _step(uni_times, uni_cum)
# ─── Plot ───────────────────────────────────────────────────────────
# Per-frame breakdown for the legend.
if counts:
c_min, c_max = int(min(counts)), int(max(counts))
c_avg = codec_total / max(1, len(counts))
codec_lbl = (
f"Codec Β· {saliency_signal} ({codec_total:,} total Β· "
f"per-frame min {c_min} Β· avg {c_avg:.1f} Β· max {c_max})"
)
else:
codec_lbl = f"Codec Β· {saliency_signal} ({codec_total:,} patches)"
if uni_per_step:
unused = max(0, budget_int - uni_total)
frame_part = (
f"{n_uniform}/{requested_uniform} frames fit budget"
if requested_uniform != n_uniform else f"{n_uniform} frames"
)
uni_lbl = (
f"Uniform full frames ({frame_part} Β· {full_frame_patches}/frame Β· "
f"{uni_total:,} total"
+ (f" Β· {unused:,} budget unused" if unused else "")
+ ")"
)
else:
uni_lbl = (
f"Uniform full frames (0/{requested_uniform} frames fit budget "
f"{budget_int:,}; need {full_frame_patches} patches/frame)"
)
ax.fill_between(xx_c, yy_c, step=None, alpha=0.12, color="#4f46e5")
ax.plot(xx_c, yy_c, color="#4f46e5", linewidth=2.8, label=codec_lbl)
ax.fill_between(xx_u, yy_u, step=None, alpha=0.10, color="#06b6d4")
ax.plot(
xx_u, yy_u, color="#06b6d4", linewidth=2.8, linestyle="--",
label=uni_lbl,
)
# Budget reference line
budget = int(total_patches_budget)
ax.axhline(budget, color="#94a3b8", linestyle=":", linewidth=1.1, alpha=0.85)
ax.text(
duration * 0.995, budget * 1.015,
f"budget {budget:,}", color="#475569",
fontsize=10.0, va="bottom", ha="right",
)
# Group boundaries
if groups and len(groups) > 1 and times:
for (_, end_idx) in groups[:-1]:
if end_idx + 1 < len(times):
bx = (times[end_idx] + times[end_idx + 1]) / 2.0
else:
bx = times[end_idx]
ax.axvline(
bx, color="#cbd5e1", linestyle=(0, (3, 3)),
alpha=0.55, linewidth=1.0,
)
n_groups = len(groups) if groups else 1
gop_str = gop_label if gop_label in ("global", "codec-stream") else f"GOP={gop_label}"
ax.set_title(
f"Cumulative patches selected over time Β· {saliency_signal} Β· "
f"{gop_str} ({n_groups} groups)",
fontsize=13, color="#1e293b",
)
ax.set_xlabel("time (s)", fontsize=11)
ax.set_ylabel("# patches selected (cumulative)", fontsize=11)
ax.set_xlim(-duration * 0.02, duration * 1.02)
ymax = max(budget, codec_total, uni_total) * 1.08 + 1
ax.set_ylim(0, ymax)
ax.tick_params(axis="both", labelsize=10)
ax.grid(True, alpha=0.25, linestyle="--", axis="y")
ax.spines[["top", "right"]].set_visible(False)
ax.legend(loc="upper left", fontsize=10, frameon=False, handlelength=2.8)
fig.patch.set_facecolor("white")
return fig
def process(
video_path,
sample_frames: int,
patch_size: int,
total_patches: int,
max_pixels: int,
viz_mode: str = "selection",
heatmap_alpha: float = 0.55,
start_sec: float = 0.0,
end_sec: float = 0.0,
saliency_signal: str = "gradient",
score_log_scale: bool = False,
bitcost_pct: float = 99.0,
fade_strength: float = 0.55,
gop: str = "global",
target_canvases: int = 1,
progress=gr.Progress(track_tqdm=False),
):
if not video_path:
return None, [], "Please upload a video.", None
t0 = time.time()
progress(0.05, desc="Reading metadata")
meta = video_metadata(video_path)
total = meta.get("total_frames") or 0
if total <= 0:
return None, [], json.dumps(
{"error": "Could not read frame count.", "metadata": meta},
indent=2, ensure_ascii=False,
), None
progress(0.10, desc="Sampling frames")
fps = float(meta.get("fps") or 0.0)
s_sec = max(0.0, float(start_sec or 0.0))
e_sec = float(end_sec or 0.0)
if fps > 0 and (s_sec > 0 or e_sec > 0):
f_start = max(0, int(round(s_sec * fps)))
f_end = (
min(total - 1, int(round(e_sec * fps)) - 1)
if e_sec > 0 else total - 1
)
if f_end <= f_start:
f_end = total - 1
window_total = f_end - f_start + 1
if int(sample_frames) >= window_total:
fids = list(range(f_start, f_end + 1))
else:
fids = [
int(round(x))
for x in np.linspace(f_start, f_end, int(sample_frames))
]
else:
f_start, f_end = 0, total - 1
fids = sample_frame_ids(total, int(sample_frames))
raw = decode_frames(video_path, fids)
if not raw:
return None, [], json.dumps(
{"error": "Failed to decode frames.", "metadata": meta},
indent=2, ensure_ascii=False,
), None
progress(0.25, desc="smart_resize")
resized = [smart_resize(f, int(max_pixels), int(patch_size)) for f in raw]
th, tw = resized[0].shape[:2]
resized = [
cv2.resize(f, (tw, th), interpolation=cv2.INTER_AREA)
if f.shape[:2] != (th, tw) else f
for f in resized
]
progress(0.40, desc=f"Scoring patches ({saliency_signal})")
grids = compute_score_grids(resized, int(patch_size), saliency_signal)
if score_log_scale:
grids = [np.log1p(np.clip(g, 0.0, None)) for g in grids]
masks, actual_selected, groups, gop_resolved = grouped_topk_masks(
grids, int(total_patches), str(gop or "global"),
)
norm_scores = _normalize_scores(grids, pct=float(bitcost_pct))
mode = (viz_mode or "selection").lower()
if mode not in ("selection", "heatmap", "sbs"):
mode = "selection"
progress(0.60, desc=f"Rendering {mode} video")
if mode == "heatmap":
vis = [
overlay_heatmap(f, s, int(patch_size), alpha=float(heatmap_alpha))
for f, s in zip(resized, norm_scores)
]
elif mode == "sbs":
vis = [
overlay_sbs(
f, m, s, int(patch_size),
alpha=float(heatmap_alpha), fade=float(fade_strength),
)
for f, m, s in zip(resized, masks, norm_scores)
]
else:
vis = [
overlay_selection(f, m, int(patch_size), fade=float(fade_strength))
for f, m in zip(resized, masks)
]
out_dir = tempfile.mkdtemp(prefix="codec_view_")
vis_path = os.path.join(out_dir, f"{mode}_vis.mp4")
vis_fps = max(2.0, min(8.0, (meta.get("fps") or 25.0) / 4.0))
write_mp4(vis, vis_path, vis_fps)
progress(0.85, desc="Packing canvases (IPPP)")
canvases, sub_ranges, n_selected = pack_canvases_per_group(
resized, masks, groups, int(patch_size),
target_canvases=1,
)
canvas_items: List[Tuple[str, str]] = []
for idx, canv in enumerate(canvases):
cp = os.path.join(out_dir, f"canvas_{idx:03d}.png")
cv2.imwrite(cp, canv)
g_idx, ss, ee = sub_ranges[idx] if idx < len(sub_ranges) else (0, idx, idx)
src_start = int(fids[ss]) if ss < len(fids) else None
src_end = int(fids[ee]) if ee < len(fids) else None
p_frame_count = max(0, ee - ss)
structure_label = " ".join(["I"] + ["P"] * p_frame_count)
p_patch_count = int(sum(int(m.sum()) for m in masks[ss + 1:ee + 1]))
caption = (
f"Canvas {idx + 1}/{len(canvases)} Β· group {g_idx + 1} Β· "
f"{structure_label} Β· sampled #{ss}-{ee} Β· src {src_start}-{src_end} Β· "
f"I src#{src_start} + {p_patch_count} P patches from "
f"{p_frame_count} frame{'s' if p_frame_count != 1 else ''}"
)
canvas_items.append((cp, caption))
hb, wb = grids[0].shape
grid_size = int(grids[0].shape[0] * grids[0].shape[1]) if grids else 0
uniform_full_frame_patches = int(patch_size) * int(patch_size)
# Uniform full-frame sampling baseline: evenly sample complete frames
# from the same time window, independent of codec saliency.
requested_budget = int(total_patches)
uniform_requested_frames = len(fids)
uniform_frame_count = min(
uniform_requested_frames,
requested_budget // max(1, uniform_full_frame_patches),
)
uniform_frame_ids = sample_window_frame_ids(f_start, f_end, uniform_frame_count)
uniform_total = int(len(uniform_frame_ids) * uniform_full_frame_patches)
info = {
"input": meta,
"params": {
"sample_frames": int(sample_frames),
"patch_size": int(patch_size),
"total_patches_budget": int(total_patches),
"max_pixels": int(max_pixels),
"start_sec": float(s_sec),
"end_sec": float(e_sec) if e_sec > 0 else None,
"saliency_signal": saliency_signal,
"score_log_scale": bool(score_log_scale),
"bitcost_pct": float(bitcost_pct),
"fade_strength": float(fade_strength),
"gop": gop_resolved,
"canvas_policy": "one_canvas_per_group_with_per_frame_p_sections",
"i_frame_policy": "first_frame_full_in_each_group",
},
"gop_groups": [
{
"start_frame_idx": int(s),
"end_frame_idx": int(e),
"start_sample_idx": int(s),
"end_sample_idx": int(e),
"start_source_frame_id": int(fids[s]) if s < len(fids) else None,
"end_source_frame_id": int(fids[e]) if e < len(fids) else None,
"source_frame_ids": [int(fids[i]) for i in range(s, e + 1)],
"n_frames": int(e - s + 1),
"structure_label": " ".join(["I"] + ["P"] * max(0, e - s)),
"i_frame_source_id": int(fids[s]) if s < len(fids) else None,
"p_source_frame_ids": [int(fids[i]) for i in range(s + 1, e + 1)],
"p_frame_count": int(max(0, e - s)),
"p_frame_patch_counts": [int(masks[i].sum()) for i in range(s + 1, e + 1)],
"p_frame_selected_patches": int(sum(int(m.sum()) for m in masks[s + 1:e + 1])),
"selected": int(sum(int(m.sum()) for m in masks[s:e + 1])),
}
for (s, e) in groups
],
"frame_window": {
"first_decoded": int(f_start),
"last_decoded": int(f_end),
"codec_frame_ids": [int(x) for x in fids],
"uniform_full_frame_ids": [int(x) for x in uniform_frame_ids],
},
"codec_per_frame_patches": [int(m.sum()) for m in masks],
"uniform_baseline": {
"mode": "uniform_full_frame_sampling",
"requested_frames": int(uniform_requested_frames),
"frames": int(len(uniform_frame_ids)),
"patches_per_frame": int(uniform_full_frame_patches),
"frame_ids": [int(x) for x in uniform_frame_ids],
"requested_budget": requested_budget,
"unused_budget": int(max(0, requested_budget - uniform_total)),
"total_patches": uniform_total,
"explanation": (
"Uniformly sample complete frames from the same time window. "
f"The baseline targets the same sampled-frame count as codec "
f"({uniform_requested_frames}), but each full frame costs "
f"{uniform_full_frame_patches} patches (= patch_size^2), so "
f"only {len(uniform_frame_ids)} full "
"frames may fit inside the requested budget."
),
},
"resized_frame_size": f"{tw}x{th}",
"patch_grid_per_frame": f"{hb}x{wb} = {hb * wb} patches",
"uniform_full_frame_patch_cost": int(uniform_full_frame_patches),
"actual_selected_total": int(actual_selected),
"total_selected_patches_incl_i_frames": int(n_selected),
"canvases": [
{
"index": i,
"size": f"{canvases[i].shape[1]}x{canvases[i].shape[0]}",
"group": int(sub_ranges[i][0]) if i < len(sub_ranges) else None,
"sub_range": list(sub_ranges[i][1:3]) if i < len(sub_ranges) else None,
"sampled_indices": (
[int(x) for x in range(sub_ranges[i][1], sub_ranges[i][2] + 1)]
if i < len(sub_ranges) else []
),
"source_frame_ids": (
[int(fids[x]) for x in range(sub_ranges[i][1], sub_ranges[i][2] + 1)]
if i < len(sub_ranges) else []
),
"structure_label": (
" ".join(["I"] + ["P"] * max(0, sub_ranges[i][2] - sub_ranges[i][1]))
if i < len(sub_ranges) else "I"
),
"i_frame_source_id": (
int(fids[sub_ranges[i][1]]) if i < len(sub_ranges) else None
),
"p_source_frame_ids": (
[int(fids[x]) for x in range(sub_ranges[i][1] + 1, sub_ranges[i][2] + 1)]
if i < len(sub_ranges) else []
),
"p_frame_count": (
int(max(0, sub_ranges[i][2] - sub_ranges[i][1]))
if i < len(sub_ranges) else 0
),
"p_frame_patch_counts": (
[int(masks[x].sum()) for x in range(sub_ranges[i][1] + 1, sub_ranges[i][2] + 1)]
if i < len(sub_ranges) else []
),
"p_frame_selected_patches": (
int(sum(int(m.sum()) for m in masks[sub_ranges[i][1] + 1:sub_ranges[i][2] + 1]))
if i < len(sub_ranges) else 0
),
"structure": "Full I-frame on top; one packed P section per "
"later frame, in time order.",
}
for i in range(len(canvases))
],
"n_canvases": int(len(canvases)),
"vis_video_fps": round(vis_fps, 2),
"viz_mode": mode,
"heatmap_alpha": float(heatmap_alpha) if mode != "selection" else None,
"score_normalization": f"shift-min, /p{bitcost_pct:.1f}, clip"
+ (" (log1p applied)" if score_log_scale else ""),
"elapsed_sec": round(time.time() - t0, 2),
}
progress(0.95, desc="Building charts")
duration_sec = (total / fps) if fps > 0 else 0.0
chart_fig = make_charts(
grids, masks, fids, uniform_frame_ids, uniform_requested_frames,
uniform_full_frame_patches,
fps, duration_sec,
int(total_patches), saliency_signal,
groups=groups, gop_label=gop_resolved,
)
progress(1.0, desc="Done")
return (
vis_path, canvas_items,
json.dumps(info, indent=2, ensure_ascii=False),
chart_fig,
)
CUSTOM_CSS = """
:root, .gradio-container, .gradio-container.dark {
--ovc-grad: linear-gradient(135deg, #4f46e5 0%, #2563eb 50%, #06b6d4 100%);
--ovc-grad-soft: linear-gradient(135deg, rgba(79,70,229,0.10), rgba(6,182,212,0.10));
--ovc-ring: rgba(79,70,229,0.18);
--ovc-ring-strong: rgba(79,70,229,0.30);
--ovc-line-soft: rgba(148,163,184,0.18);
--ovc-line: rgba(148,163,184,0.26);
--ovc-line-strong: rgba(100,116,139,0.38);
--ovc-line-accent-soft: rgba(79,70,229,0.18);
--ovc-line-accent: rgba(79,70,229,0.28);
--ovc-line-accent-strong: rgba(79,70,229,0.40);
--ovc-line-cyan: rgba(6,182,212,0.18);
--ovc-surface-tint: rgba(248,250,252,0.74);
--ovc-surface-accent: rgba(79,70,229,0.035);
--ovc-shadow-soft: 0 1px 2px rgba(15,23,42,0.04), 0 10px 30px rgba(15,23,42,0.03);
--ovc-shadow-accent: 0 8px 28px rgba(79,70,229,0.08);
}
.gradio-container {
max-width: 1480px !important;
margin: 0 auto !important;
padding-left: 10px !important;
padding-right: 10px !important;
}
.ovc-main {
gap: 18px !important;
align-items: flex-start !important;
}
.ovc-bottom {
gap: 16px !important;
align-items: stretch !important;
}
@keyframes ovc-shift {
0% { background-position: 0% 50%; }
50% { background-position: 100% 50%; }
100% { background-position: 0% 50%; }
}
@keyframes ovc-pulse {
0%, 100% { box-shadow: 0 6px 18px rgba(37, 99, 235, 0.32); }
50% { box-shadow: 0 8px 26px rgba(37, 99, 235, 0.50); }
}
@keyframes ovc-fade-in {
from { opacity: 0; transform: translateY(4px); }
to { opacity: 1; transform: translateY(0); }
}
/* Hero */
#ovc-hero {
text-align: center;
padding: 44px 16px 22px;
border-radius: 22px;
background:
radial-gradient(120% 80% at 50% -10%, rgba(79,70,229,0.20), transparent 60%),
linear-gradient(180deg, rgba(79,70,229,0.06), rgba(6,182,212,0.03)),
repeating-linear-gradient(0deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px),
repeating-linear-gradient(90deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px);
border: 1px solid var(--ovc-line-accent-soft);
box-shadow: 0 10px 30px rgba(15,23,42,0.04);
margin-bottom: 18px;
position: relative;
overflow: hidden;
}
#ovc-hero::after {
content: "";
position: absolute; inset: auto -20% -40% -20%;
height: 60%;
background: radial-gradient(60% 80% at 50% 0%, rgba(6,182,212,0.22), transparent 70%);
pointer-events: none;
}
#ovc-hero h1 {
font-size: 2.7rem;
font-weight: 800;
background: var(--ovc-grad);
background-size: 200% 200%;
animation: ovc-shift 9s ease-in-out infinite;
-webkit-background-clip: text;
background-clip: text;
color: transparent;
margin: 0 0 6px;
letter-spacing: -0.028em;
line-height: 1.04;
}
#ovc-hero p.tagline {
font-size: 1.05rem;
color: var(--body-text-color-subdued);
margin: 0 auto 16px;
max-width: 760px;
line-height: 1.6;
}
.ovc-links {
display: flex; flex-wrap: wrap; gap: 10px;
justify-content: center; margin: 14px auto 6px;
position: relative; z-index: 1;
}
.ovc-links a {
text-decoration: none;
font-weight: 600;
font-size: 0.9rem;
padding: 7px 14px;
border-radius: 999px;
background: var(--background-fill-primary, #fff);
border: 1px solid var(--ovc-line-accent-soft);
color: #4338ca;
transition: transform 0.12s ease, box-shadow 0.18s ease,
background 0.18s ease, color 0.18s ease, border-color 0.18s ease;
display: inline-flex; align-items: center;
box-shadow: 0 1px 2px rgba(15,23,42,0.04);
}
.ovc-links a:hover {
background: var(--ovc-grad);
color: #fff;
border-color: transparent;
transform: translateY(-1px);
box-shadow: 0 8px 20px rgba(79,70,229,0.18);
}
.gradio-container.dark .ovc-links a {
background: rgba(30,41,59,0.7);
color: #c7d2fe;
border-color: var(--ovc-line-accent-strong);
}
/* Cards */
.ovc-card {
border-radius: 16px !important;
padding: 16px 18px !important;
border: 1px solid var(--ovc-line-soft) !important;
background: var(--background-fill-primary) !important;
box-shadow: var(--ovc-shadow-soft);
transition: box-shadow 0.18s ease, border-color 0.18s ease, transform 0.18s ease;
animation: ovc-fade-in 0.32s ease-out;
}
.ovc-card:hover {
border-color: var(--ovc-line) !important;
box-shadow: 0 8px 24px rgba(15,23,42,0.06);
}
.ovc-card + .ovc-card {
margin-top: 2px;
}
/* Primary outputs: subtle accent ring + lift */
.ovc-card-primary {
border: 1px solid var(--ovc-line-accent-soft) !important;
background:
linear-gradient(180deg, rgba(79,70,229,0.028), rgba(6,182,212,0.014)),
var(--background-fill-primary) !important;
box-shadow:
inset 0 0 0 1px rgba(255,255,255,0.65),
var(--ovc-shadow-accent) !important;
}
.ovc-card-primary:hover {
border-color: var(--ovc-line-accent) !important;
box-shadow:
inset 0 0 0 1px rgba(255,255,255,0.72),
0 10px 32px rgba(79,70,229,0.10) !important;
}
.ovc-card h3 {
display: inline-flex;
align-items: center;
gap: 8px;
font-size: 0.74rem !important;
font-weight: 700 !important;
text-transform: uppercase;
letter-spacing: 0.10em;
color: #3730a3 !important;
background: rgba(79,70,229,0.06);
border: 1px solid rgba(79,70,229,0.10);
padding: 4px 10px !important;
border-radius: 999px;
margin: 0 0 12px !important;
}
.ovc-card h3::before {
content: "";
display: inline-block;
width: 6px; height: 6px; border-radius: 50%;
background: var(--ovc-grad);
transform: translateY(0);
}
/* Run button */
#ovc-run button {
width: 100%;
height: 54px !important;
font-size: 1.06rem !important;
font-weight: 700 !important;
letter-spacing: 0.01em;
background: var(--ovc-grad) !important;
background-size: 200% 200% !important;
animation: ovc-shift 6s ease-in-out infinite, ovc-pulse 2.6s ease-in-out infinite;
border: none !important;
color: #fff !important;
border-radius: 14px !important;
transition: transform 0.06s ease;
}
#ovc-run button:hover {
transform: translateY(-1px);
animation-play-state: paused;
}
#ovc-run button:active { transform: translateY(0); }
/* Preset buttons */
.ovc-preset button {
background: var(--ovc-grad-soft) !important;
color: #4338ca !important;
border: 1px solid rgba(79,70,229,0.16) !important;
border-radius: 10px !important;
font-weight: 600 !important;
transition: all 0.15s ease;
}
.ovc-preset button:hover {
background: var(--ovc-grad) !important;
color: #fff !important;
border-color: transparent !important;
}
/* Footer */
#ovc-footer {
text-align: center;
color: var(--body-text-color-subdued);
font-size: 0.80rem;
padding: 22px 8px 10px;
margin-top: 14px;
border-top: 1px solid rgba(100,116,139,0.22);
}
#ovc-footer code {
background: rgba(79,70,229,0.08);
padding: 1px 6px;
border-radius: 4px;
}
/* Tighter spacing for sliders inside cards */
.ovc-card .gradio-slider { margin-bottom: 4px !important; }
.ovc-card .gradio-number,
.ovc-card .gradio-radio,
.ovc-card .gradio-checkbox,
.ovc-card .gradio-code,
.ovc-card .gradio-gallery,
.ovc-card .gradio-video,
.ovc-card .gradio-plot {
width: 100% !important;
}
/* Tame Gradio's dark default placeholders inside our cards: blanket-override
any background on the inner wrappers, then paint a brand-tinted gradient on
the canonical containers. This lights up the empty Video/Image/Plot zones
so they no longer look like black holes. */
.ovc-card .video-container,
.ovc-card .image-container,
.ovc-card .image-frame,
.ovc-card .preview,
.ovc-card .plot-container,
.ovc-card .empty,
.ovc-card video,
.ovc-card [data-testid="video"],
.ovc-card [data-testid="image"],
.ovc-card .icon-button,
.ovc-card .options,
.ovc-card .source-selection,
.ovc-card .upload-container {
background: transparent !important;
background-color: transparent !important;
}
.ovc-card .container,
.ovc-card .wrap,
.ovc-card .block,
.ovc-card fieldset,
.ovc-card [data-testid="block"],
.ovc-card [data-testid="video"],
.ovc-card [data-testid="image"],
.ovc-card [data-testid="plot"],
.ovc-card [data-testid="file-upload"],
.ovc-card [data-testid="upload"],
.ovc-card .video-container,
.ovc-card .image-container,
.ovc-card .plot-container {
border-radius: 12px !important;
}
.ovc-card .block,
.ovc-card fieldset,
.ovc-card [data-testid="block"],
.ovc-card [data-testid="video"],
.ovc-card [data-testid="image"],
.ovc-card [data-testid="plot"],
.ovc-card [data-testid="file-upload"],
.ovc-card [data-testid="upload"],
.ovc-card .wrap,
.ovc-card .container {
border: 1px solid transparent !important;
box-shadow: none !important;
outline: none !important;
background: transparent !important;
}
.ovc-card .video-container,
.ovc-card .image-container,
.ovc-card .plot-container,
.ovc-card-primary .video-container,
.ovc-card-primary .image-container,
.ovc-card-primary .plot-container {
background:
linear-gradient(180deg, rgba(79,70,229,0.028), rgba(6,182,212,0.018)),
var(--ovc-surface-tint) !important;
border: 1px solid rgba(79,70,229,0.18) !important;
box-shadow: inset 0 1px 0 rgba(255,255,255,0.68) !important;
}
.ovc-card .upload-container,
.ovc-card .video-container,
.ovc-card .image-container,
.ovc-card .plot-container {
width: 100% !important;
min-width: 0 !important;
}
.ovc-card .upload-container,
.ovc-card [data-testid="file-upload"] {
min-height: 220px !important;
background:
linear-gradient(180deg, rgba(79,70,229,0.03), rgba(6,182,212,0.02)),
rgba(248,250,252,0.72) !important;
border: 1px solid rgba(79,70,229,0.16) !important;
}
.ovc-card .plot-container,
.ovc-card [data-testid="plot"] {
min-height: 300px !important;
}
.ovc-card-primary .video-container,
.ovc-card [data-testid="video"] {
min-height: 260px !important;
}
.ovc-card .gradio-video, .ovc-card .gradio-image, .ovc-card .gradio-plot {
border-color: transparent !important;
background: transparent !important;
box-shadow: none !important;
outline: none !important;
}
.ovc-card video,
.ovc-card img,
.ovc-card canvas,
.ovc-card svg {
border: none !important;
outline: none !important;
box-shadow: none !important;
}
/* Empty placeholder text inside Gradio components */
.ovc-card .empty, .ovc-card .empty p, .ovc-card .empty span {
color: #94a3b8 !important;
}
/* Stats tile grid (rendered into a gr.HTML by render_stats_html) */
.ovc-stats {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 10px;
}
.ovc-stat {
padding: 12px 14px;
border-radius: 14px;
background: linear-gradient(135deg, rgba(79,70,229,0.055), rgba(6,182,212,0.03));
border: 1px solid rgba(79,70,229,0.12);
transition: transform 0.18s ease, box-shadow 0.18s ease;
}
.ovc-stat:hover {
transform: translateY(-1px);
box-shadow: 0 6px 18px rgba(79,70,229,0.10);
}
.ovc-stat .value {
font-size: 1.55rem; font-weight: 800;
background: var(--ovc-grad);
-webkit-background-clip: text; background-clip: text; color: transparent;
letter-spacing: -0.02em;
line-height: 1.1;
word-break: break-word;
}
.ovc-stat .label {
font-size: 0.74rem; color: #64748b;
text-transform: uppercase; letter-spacing: 0.06em;
margin-top: 4px;
font-weight: 600;
}
/* ─── Mobile / narrow viewport adjustments ─────────────────────────── */
@media (max-width: 768px) {
.gradio-container { padding: 6px !important; }
/* Force the controls/outputs row to stack vertically on phones */
.gradio-container .ovc-main {
flex-direction: column !important;
gap: 12px !important;
}
.gradio-container .ovc-bottom {
flex-direction: column !important;
gap: 12px !important;
}
.gradio-container .ovc-main > div {
width: 100% !important;
min-width: 0 !important;
max-width: 100% !important;
flex: 1 1 100% !important;
}
/* Hero scales down */
#ovc-hero { padding: 28px 14px 16px; border-radius: 16px; margin-bottom: 12px; }
#ovc-hero h1 { font-size: 2.05rem; letter-spacing: -0.02em; }
#ovc-hero p.tagline { font-size: 0.96rem; line-height: 1.5; margin-bottom: 12px; }
.ovc-links { gap: 6px; margin-top: 10px; }
.ovc-links a { font-size: 0.78rem; padding: 5px 10px; }
/* Cards tighter */
.ovc-card { padding: 12px 14px !important; border-radius: 14px !important; }
.ovc-card h3 { font-size: 0.70rem !important; margin-bottom: 8px !important; }
/* Run button */
#ovc-run button { height: 48px !important; font-size: 0.98rem !important; }
/* Stats tile sizing */
.ovc-stats { grid-template-columns: repeat(auto-fit, minmax(115px, 1fr)); gap: 8px; }
.ovc-stat { padding: 10px 12px; }
.ovc-stat .value { font-size: 1.25rem; }
.ovc-stat .label { font-size: 0.68rem; }
/* Outputs: shorter video so it does not dominate the screen */
.ovc-card video { max-height: 280px !important; }
.ovc-card .upload-container,
.ovc-card [data-testid="file-upload"] {
min-height: 180px !important;
}
}
@media (max-width: 480px) {
#ovc-hero { padding: 22px 12px 14px; }
#ovc-hero h1 { font-size: 1.7rem; }
#ovc-hero p.tagline { font-size: 0.9rem; }
/* Put each link on a row of two (browsers will pack 2 per row at this size) */
.ovc-links a { font-size: 0.74rem; padding: 4px 9px; }
#ovc-run button { height: 46px !important; font-size: 0.94rem !important; }
}
"""
THEME = gr.themes.Soft(
primary_hue="indigo",
secondary_hue="blue",
neutral_hue="slate",
font=[gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"],
).set(
body_background_fill="*neutral_50",
block_radius="14px",
button_primary_background_fill="*primary_500",
button_primary_background_fill_hover="*primary_600",
)
HERO_HTML = """
<div id="ovc-hero">
<h1>OneVision Encoder</h1>
<p class="tagline">
Codec-style patch saliency for video understanding &mdash; see which
patches the encoder picks from your video and pack them into the
canvas LLaVA-OneVision consumes.
</p>
<div class="ovc-links">
<a href="https://www.lmms-lab.com/onevision-encoder/index.html" target="_blank" rel="noopener">πŸ“&nbsp;Homepage</a>
<a href="https://huggingface.co/collections/lmms-lab-encoder/onevision-encoder" target="_blank" rel="noopener">πŸ€—&nbsp;Models</a>
<a href="https://arxiv.org/abs/2602.08683" target="_blank" rel="noopener">πŸ“„&nbsp;Tech Report</a>
<a href="docs/model_card.md" target="_blank" rel="noopener">πŸ“‹&nbsp;Model Card</a>
<a href="docs/data_card.md" target="_blank" rel="noopener">πŸ“Š&nbsp;Data Card</a>
</div>
</div>
"""
try:
_GR_MAJOR = int(gr.__version__.split(".")[0])
except Exception:
_GR_MAJOR = 4
_BLOCK_KW: dict = {"title": "OneVision Encoder"}
_LAUNCH_KW: dict = {}
if _GR_MAJOR >= 6:
# In Gradio 6.0 these moved off Blocks(...) onto launch(...).
_LAUNCH_KW["theme"] = THEME
_LAUNCH_KW["css"] = CUSTOM_CSS
else:
_BLOCK_KW["theme"] = THEME
_BLOCK_KW["css"] = CUSTOM_CSS
VIZ_CHOICES = [
("Selection β€” kept patches in color, others fade to gray-white", "selection"),
("Heatmap β€” full-frame JET overlay (blue=low, red=high)", "heatmap"),
("Both", "sbs"),
]
SIGNAL_CHOICES = [
("Gradient β€” intra-frame Sobel (sharp edges, textures, text)", "gradient"),
("Frame diff β€” inter-frame motion (movers, action)", "frame_diff"),
("Combined β€” 0.5Β·gradient + 0.5Β·frame_diff (general purpose)", "combined"),
]
with gr.Blocks(**_BLOCK_KW) as demo:
gr.HTML(HERO_HTML)
with gr.Row(equal_height=False, elem_classes="ovc-main"):
# ─── Controls (narrow column) ────────────────────────────────────
with gr.Column(scale=4, min_width=360):
with gr.Group(elem_classes="ovc-card"):
gr.Markdown("### Input")
video_in = gr.Video(label="Video", sources=["upload"], height=260)
with gr.Row(elem_classes="ovc-preset"):
btn_demo = gr.Button(
"Load demo video", size="sm",
visible=os.path.exists(DEMO_VIDEO_PATH),
)
with gr.Group(elem_classes="ovc-card"):
gr.Markdown("### Pipeline")
viz_mode = gr.Radio(
VIZ_CHOICES, value="selection",
label="Visualization mode",
)
sample_frames = gr.Slider(
4, 64, value=32, step=1, label="Sampled frames",
)
top_k = gr.Slider(
16, 16384, value=1024, step=16,
label="Total patches budget (whole video)",
info="The single budget shared across the whole video. "
"The uniform full-frame baseline will fit as many "
"complete frames as this budget allows, where one full "
"frame costs patch_size^2 patches; the codec path spends "
"the same budget on saliency-selected patches.",
)
patch_size = gr.Radio(
PATCH_CHOICES, value=14, label="Patch size (px)",
)
gop = gr.Radio(
[
("GOP = 4 β€” fixed 4-frame groups", "4"),
("GOP = 8 β€” fixed 8-frame groups", "8"),
("GOP = 16 β€” fixed 16-frame groups", "16"),
("Codec-stream: adaptive groups by saliency energy", "dynamic"),
],
value="8",
label="GOP (group of pictures)",
info="Splits sampled frames into GOP groups. Each group "
"produces exactly one GOP canvas: the group's first "
"frame stays whole as the I-frame, and each later "
"frame gets its own P section below it. So GOP=4 "
"means each group is I P P P. For fixed GOP=N, the "
"number of packed canvases is ceil(sampled_frames / N). "
"Example: 32 sampled frames with GOP=4 gives 8 "
"canvases; 32 sampled frames with GOP=8 gives 4. "
"Codec-stream mode adaptively groups by saliency "
"energy, targeting roughly 8-64 sampled frames per group.",
)
with gr.Accordion("Time window", open=False):
with gr.Row():
start_sec = gr.Number(value=0.0, precision=2, label="Start (s)")
end_sec = gr.Number(value=0.0, precision=2, label="End (s)")
gr.Markdown(
"<small>Set both to 0 to use the full video.</small>",
)
with gr.Accordion("Saliency", open=False):
saliency_signal = gr.Radio(
SIGNAL_CHOICES, value="gradient",
label="Scoring signal",
)
score_log_scale = gr.Checkbox(
value=False,
label="Apply log1p to scores",
info="Compresses dynamic range β€” brings up mid-energy patches.",
)
bitcost_pct = gr.Slider(
80.0, 99.9, value=99.0, step=0.1,
label="Heatmap normalization percentile",
info="Higher = harder to saturate red; lower = more vivid.",
)
with gr.Accordion("Visual style", open=False):
heatmap_alpha = gr.Slider(
0.1, 0.9, value=0.55, step=0.05,
label="Heatmap blend Ξ±",
)
fade_strength = gr.Slider(
0.0, 0.9, value=0.55, step=0.05,
label="Selection fade strength",
)
max_pixels = gr.Slider(
40000, 400000, value=150000, step=10000,
label="Max pixels per frame",
)
with gr.Row(elem_id="ovc-run"):
run_btn = gr.Button("Run pipeline", variant="primary")
# ─── Outputs (wide column) ───────────────────────────────────────
with gr.Column(scale=7, min_width=560):
with gr.Group(elem_classes="ovc-card ovc-card-primary"):
gr.Markdown("### Patch selection visualization")
vis_out = gr.Video(
label="", show_label=False, autoplay=True, height=460,
)
with gr.Group(elem_classes="ovc-card ovc-card-primary"):
gr.Markdown("### Cumulative patches over time")
gr.Markdown(
"<small><b>Indigo</b>: codec method β€” selects patches "
"within frames according to saliency, so the curve rises "
"in bursts. <b>Cyan (dashed)</b>: uniform full-frame "
"sampling β€” evenly samples complete frames from the same "
"time window, targeting the same sampled-frame count as "
"codec when the budget allows. Each step costs "
"<b>patch_size^2</b> patches, regardless of the preview "
"frame resolution. The dotted line marks the requested "
"budget.</small>"
)
chart_out = gr.Plot(label="", show_label=False)
with gr.Row(equal_height=False, elem_classes="ovc-bottom"):
with gr.Column(scale=7, min_width=420):
with gr.Group(elem_classes="ovc-card"):
gr.Markdown("### Packed canvases (one per GOP group)")
gr.Markdown(
"<small>Each canvas is one GOP group rendered in "
"<b>I/P structure</b>: the group's first frame is "
"the <b>I-frame</b> kept whole on top, and each "
"later frame gets its own packed <b>P-frame</b> "
"section below in time order. Fixed GOP=N means "
"<b>one canvas per N sampled frames</b>.</small>"
)
canvas_out = gr.Gallery(
label="", show_label=False,
columns=2, rows=2, height=520,
object_fit="contain",
preview=True,
)
with gr.Column(scale=5, min_width=340):
with gr.Group(elem_classes="ovc-card"):
gr.Markdown("### Raw JSON")
gr.Markdown(
"<small>Full reproducible record of this run "
"(params, frame ids, group spans). Collapsed by "
"default β€” click to expand.</small>"
)
with gr.Accordion("Show full JSON", open=False):
info_out = gr.Code(
label="", language="json", lines=22,
)
gr.HTML(
'<div id="ovc-footer">'
'<b>OneVision Encoder</b> Β· codec-style patch saliency demo Β· '
'Sobel + frame-diff stand in for the ffmpeg bitcost patch Β· '
'GOP-aware top-K patch selection with one IPPP canvas per group.'
'</div>'
)
run_btn.click(
process,
inputs=[
video_in, sample_frames, patch_size, top_k, max_pixels,
viz_mode, heatmap_alpha,
start_sec, end_sec,
saliency_signal, score_log_scale, bitcost_pct, fade_strength,
gop,
],
outputs=[vis_out, canvas_out, info_out, chart_out],
)
btn_demo.click(
lambda: DEMO_PRESET,
inputs=None,
outputs=[
video_in, sample_frames, patch_size, top_k, max_pixels,
viz_mode, heatmap_alpha, start_sec, end_sec,
saliency_signal, score_log_scale, bitcost_pct, fade_strength,
gop,
],
)
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=int(os.environ.get("PORT", 7860)),
**_LAUNCH_KW,
)