mastefan's picture
Update app.py
43e4f43 verified
# Fencing Scoreboard Clips - YOLO x AutoGluon (Gradio)
# ----------------------------------------------------
# Goal (AGENTS): Build a cohesive app that: upload video -> frame timestamps ->
# YOLO scoreboard detect + gray-mask background -> color feature timeseries ->
# AutoGluon Tabular detector -> multi-event 4s clips in a Gradio gallery.
#
# Plan (AGENTS):
# 1) Load YOLO weights from HF Hub; load AutoGluon Tabular predictor from HF Hub.
# 2) For each (skipped) frame: YOLO infer -> gray-mask non-scoreboard parts
# (keep color inside any bbox with conf>=0.85), then compute red/green features.
# 3) Roll features to add z-scores/diffs. Predict with AG Tabular.
# 4) Find local events with persistence + spacing; group & cut (-2s, +2s).
# 5) Gradio UI: video in → gallery of clips + status text out.
#
# Fencing Scoreboard Clips - YOLO x AutoGluon (Gradio)
import os, cv2, zipfile, shutil, tempfile, subprocess, pathlib
import numpy as np, pandas as pd
from typing import List, Tuple
import gradio as gr
# --- Patch for FastAI/AutoGluon deserialization ---
import fastai.tabular.core as ftc
from fastai.data.load import _FakeLoader, DataLoader
class TabWeightedDL(DataLoader):
"Compatibility patch for missing dataloader class in old AutoGluon FastAI models."
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
ftc.TabWeightedDL = TabWeightedDL
# =====================================================
# Configuration
# =====================================================
YOLO_REPO_ID = "mastefan/fencing-scoreboard-yolov8"
YOLO_FILENAME = "best.pt"
AG_REPO_ID = "emkessle/2024-24679-fencing-touch-predictor"
AG_ZIP_NAME = "autogluon_predictor_dir.zip"
FRAME_SKIP = 2
KEEP_CONF = 0.85
YOLO_CONF = 0.30
YOLO_IOU = 0.50
CLIP_PAD_S = 2.0
MIN_SEP_S = 1.2
GROUP_GAP_S = 1.5
DEBUG_MODE = False # set True to save debug images/CSVs
# =====================================================
# Dependency setup
# =====================================================
def _pip(pkgs):
import subprocess, sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", *pkgs])
try:
from ultralytics import YOLO
except:
_pip(["ultralytics"]); from ultralytics import YOLO
try:
from autogluon.tabular import TabularPredictor
except:
_pip(["autogluon.tabular"]); from autogluon.tabular import TabularPredictor
try:
from huggingface_hub import hf_hub_download
except:
_pip(["huggingface_hub"]); from huggingface_hub import hf_hub_download
# =====================================================
# Model loading
# =====================================================
CACHE_DIR = pathlib.Path("hf_assets"); CACHE_DIR.mkdir(exist_ok=True)
def load_yolo_from_hub():
w = hf_hub_download(repo_id=YOLO_REPO_ID, filename=YOLO_FILENAME, cache_dir=CACHE_DIR)
print(f"[INFO] Loaded YOLO weights from {w}")
return YOLO(w)
def load_autogluon_tabular_from_hub():
"""Download and load AutoGluon predictor, removing any FastAI submodels."""
z = hf_hub_download(repo_id=AG_REPO_ID, filename=AG_ZIP_NAME, cache_dir=CACHE_DIR)
extract_dir = CACHE_DIR / "ag_predictor_native"
if extract_dir.exists():
shutil.rmtree(extract_dir)
with zipfile.ZipFile(z, "r") as zip_ref:
zip_ref.extractall(extract_dir)
# --- delete fastai models before loading to avoid deserialization errors ---
fastai_dirs = list(extract_dir.rglob("*fastai*"))
for p in fastai_dirs:
try:
if p.is_dir():
shutil.rmtree(p)
else:
p.unlink()
except Exception as e:
print(f"[WARN] Could not remove {p}: {e}")
print(f"[CLEANUP] Removed {len(fastai_dirs)} FastAI model files.")
# Now load normally (no version check)
from autogluon.tabular import TabularPredictor
predictor = TabularPredictor.load(str(extract_dir), require_py_version_match=False)
print(f"[INFO] Loaded AutoGluon predictor from {extract_dir}")
return predictor
_YOLO = None
_AGP = None
def yolo(): # lazy load
global _YOLO
if _YOLO is None: _YOLO = load_yolo_from_hub()
return _YOLO
def ag_predictor():
global _AGP
if _AGP is None: _AGP = load_autogluon_tabular_from_hub()
return _AGP
# =====================================================
# Image + feature utilities
# =====================================================
DEBUG_DIR = pathlib.Path("debug_frames"); DEBUG_DIR.mkdir(exist_ok=True)
def isolate_scoreboard_color(frame_bgr: np.ndarray,
conf: float = YOLO_CONF,
iou: float = YOLO_IOU,
keep_conf: float = KEEP_CONF,
debug: bool = False,
frame_id: int = None) -> np.ndarray:
"""
Improved version:
- Choose the largest bbox among candidates meeting confidence.
- Primary threshold: >= max(0.80, keep_conf)
- Fallback threshold: >= (primary - 0.05)
- Entire chosen bbox is restored to color; everything else is grayscale.
- Rejects low-saturation ROIs (flat/neutral areas).
"""
H, W = frame_bgr.shape[:2]
# Start fully grayscale
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
gray = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
primary_thr = max(0.85, keep_conf)
fallback_thr = max(0.70, primary_thr - 0.03)
chosen_box = None
res = yolo().predict(frame_bgr, conf=conf, iou=iou, verbose=False)
if len(res):
r = res[0]
if getattr(r, "boxes", None) is not None and len(r.boxes) > 0:
boxes = r.boxes.xyxy.cpu().numpy()
scores = r.boxes.conf.cpu().numpy()
candidates = list(zip(boxes, scores))
# Prefer largest box meeting primary threshold
strong = [(b, s) for (b, s) in candidates if float(s) >= primary_thr]
if strong:
chosen_box, _ = max(strong, key=lambda bs: (bs[0][2]-bs[0][0]) * (bs[0][3]-bs[0][1]))
else:
# Fallback: largest box meeting fallback threshold
medium = [(b, s) for (b, s) in candidates if float(s) >= fallback_thr]
if medium:
chosen_box, _ = max(medium, key=lambda bs: (bs[0][2]-bs[0][0]) * (bs[0][3]-bs[0][1]))
if chosen_box is not None:
x1, y1, x2, y2 = [int(round(v)) for v in chosen_box]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(W-1, x2), min(H-1, y2)
if x2 > x1 and y2 > y1:
# Single safeguard: reject very low-saturation ROIs
roi_color = frame_bgr[y1:y2, x1:x2]
if roi_color.size > 0:
hsv = cv2.cvtColor(roi_color, cv2.COLOR_BGR2HSV)
sat_mean = hsv[:, :, 1].mean()
if sat_mean < 30:
print(f"[WARN] Rejected bbox due to low saturation (mean={sat_mean:.1f})")
chosen_box = None
# If accepted, restore whole bbox to color
if chosen_box is not None:
gray[y1:y2, x1:x2] = frame_bgr[y1:y2, x1:x2]
# Optional debug save
if debug and frame_id is not None:
dbg = gray.copy()
if chosen_box is not None:
x1, y1, x2, y2 = [int(round(v)) for v in chosen_box]
cv2.rectangle(dbg, (x1, y1), (x2, y2), (0, 255, 0), 2)
out_path = DEBUG_DIR / f"frame_{frame_id:06d}.jpg"
cv2.imwrite(str(out_path), dbg)
print(f"[DEBUG] Saved debug frame → {out_path}")
return gray
def _count_color_pixels(rgb, ch):
R, G, B = rgb[:,:,0], rgb[:,:,1], rgb[:,:,2]
if ch==0: mask=(R>150)&(R>1.2*G)&(R>1.2*B)
else: mask=(G>100)&(G>1.05*R)&(G>1.05*B)
return int(np.sum(mask))
def color_pixel_ratio(rgb,ch): return _count_color_pixels(rgb,ch)/(rgb.shape[0]*rgb.shape[1]+1e-9)
def rolling_z(series, win=40):
med = series.rolling(win,min_periods=5).median()
mad = series.rolling(win,min_periods=5).apply(lambda x: np.median(np.abs(x-np.median(x))),raw=True)
mad = mad.replace(0, mad[mad>0].min() if (mad>0).any() else 1.0)
return (series-med)/mad
# =====================================================
# Video feature extraction
# =====================================================
def extract_feature_timeseries(video_path:str, frame_skip:int=FRAME_SKIP, debug:bool=DEBUG_MODE):
cap=cv2.VideoCapture(video_path)
if not cap.isOpened(): return pd.DataFrame(),0.0
fps=cap.get(cv2.CAP_PROP_FPS) or 30.0
total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 1
print(f"[INFO] Reading {total} frames @ {fps:.2f}fps ...")
if total > 1200:
frame_skip = 4
elif total > 2400:
frame_skip = 6
else:
frame_skip = FRAME_SKIP
rec,idx=[],0
while True:
ret,frame=cap.read()
if not ret: break
if idx%frame_skip==0:
ts=idx/fps
masked=isolate_scoreboard_color(frame,debug=debug,frame_id=idx)
rgb=cv2.cvtColor(masked,cv2.COLOR_BGR2RGB)
rec.append({
"frame_id":idx,"timestamp":ts,
"red_ratio":color_pixel_ratio(rgb,0),
"green_ratio":color_pixel_ratio(rgb,1)
})
idx+=1
cap.release()
df=pd.DataFrame(rec)
if df.empty: return df,fps
df["red_diff"]=df["red_ratio"].diff().fillna(0)
df["green_diff"]=df["green_ratio"].diff().fillna(0)
df["z_red"]=rolling_z(df["red_ratio"])
df["z_green"]=rolling_z(df["green_ratio"])
print(f"[INFO] Extracted {len(df)} processed frames.")
return df,fps
# =====================================================
# Predictor & event logic
# =====================================================
def predict_scores(df):
"""Predict illumination likelihoods using AutoGluon regression ensemble."""
feats = ["red_ratio", "green_ratio", "red_diff", "green_diff", "z_red", "z_green"]
X = df[feats].copy()
ag = ag_predictor()
# Get model list (older AG versions have .model_names(), newer have .model_names)
try:
models_all = ag.model_names()
except Exception:
models_all = ag.model_names
print(f"[INFO] Evaluating models: {models_all}")
preds_total = []
for m in models_all:
try:
print(f"[INFO] → Predicting with {m}")
y_pred = ag._learner.predict(X, model=m)
preds_total.append(pd.Series(y_pred, name=m))
except Exception as e:
print(f"[WARN] Skipping model {m}: {e}")
if not preds_total:
print("[ERROR] No usable models, returning zeros.")
return pd.Series(np.zeros(len(df)))
# Average predictions from working models
y_mean = pd.concat(preds_total, axis=1).mean(axis=1)
# Normalize 0–1 for consistency
rng = (y_mean.quantile(0.95) - y_mean.quantile(0.05)) or 1.0
score = ((y_mean - y_mean.quantile(0.05)) / rng).clip(0, 1)
print(f"[INFO] Used {len(preds_total)} valid submodels for regression scoring.")
return score
def pick_events(df,score,fps):
z=rolling_z(score,35); strong=(z>4.0); keep=strong.rolling(3,min_periods=1).sum()>=2
min_dist=max(1,int(MIN_SEP_S*fps))
y=score.values; out=[]; last=-min_dist
for i in range(1,len(y)-1):
if keep.iloc[i] and y[i]>y[i-1] and y[i]>y[i+1] and (i-last)>=min_dist:
out.append(float(df.iloc[i]["timestamp"])); last=i
if not out and len(y)>0: out=[float(df.iloc[int(np.argmax(y))]["timestamp"])]
grouped=[]
for t in sorted(out):
if (not grouped) or (t-grouped[-1])>GROUP_GAP_S: grouped.append(t)
return grouped
# =====================================================
# Clip utilities
# =====================================================
def _probe_duration(video_path):
try:
import ffmpeg
meta=ffmpeg.probe(video_path)
return float(meta["format"]["duration"])
except: return 0.0
def cut_clip(video_path,start,end,out_path):
try:
cmd=["ffmpeg","-y","-ss",str(start),"-to",str(end),"-i",video_path,"-c","copy",out_path]
sp=subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
if sp.returncode==0 and os.path.exists(out_path): return out_path
except: pass
from moviepy.editor import VideoFileClip
clip=VideoFileClip(video_path).subclip(start,end)
clip.write_videofile(out_path,codec="libx264",audio_codec="aac",verbose=False,logger=None)
return out_path
def extract_score_clips(video_path:str,debug:bool=DEBUG_MODE):
df,fps=extract_feature_timeseries(video_path,FRAME_SKIP,debug)
if df.empty: return [],"No frames processed."
score=predict_scores(df); events=pick_events(df,score,fps)
print(f"[INFO] Detected {len(events)} potential events: {events}")
dur=_probe_duration(video_path) or float(df["timestamp"].max()+CLIP_PAD_S+0.5)
out=[]; base=os.path.splitext(os.path.basename(video_path))[0]
for i,t in enumerate(events):
s=max(0,t-CLIP_PAD_S); e=min(dur,t+CLIP_PAD_S)
tmp=os.path.join(tempfile.gettempdir(),f"{base}_score_{i+1:02d}.mp4")
print(f"[INFO] Cutting clip {i+1}: {s:.2f}s→{e:.2f}s")
cut_clip(video_path,s,e,tmp)
out.append((tmp,f"Touch {i+1} @ {t:.2f}s"))
return out,f"✅ Detected {len(out)} event(s)."
# =====================================================
# Progress GUI helpers
# =====================================================
CSS = """
.gradio-container {max-width:900px;margin:auto;}
.full-width{width:100%!important;}
.progress-bar{width:100%;height:30px;background:#e0e0e0;border-radius:15px;margin:15px 0;position:relative;overflow:hidden;}
.progress-fill{height:100%;background:#4CAF50;border-radius:15px;text-align:center;line-height:30px;color:white;font-weight:bold;transition:width .3s;}
.fencer{position:absolute;top:-5px;font-size:24px;transition:left .3s;transform:scaleX(-1);}
"""
def _make_progress_bar(percent:int,final_text:str=None,label:str=""):
text=f"{percent}%" if not final_text else final_text
return f"""
<div class="progress-bar">
<div id="progress-fill" class="progress-fill" style="width:{percent}%">{label} {text}</div>
<div id="fencer" class="fencer" style="left:{percent}%">🤺</div>
</div>
"""
def run_with_progress(video_file):
if not video_file:
yield [],"Please upload a video.",_make_progress_bar(0)
return
print("[GUI] Starting processing...")
yield [],"🔄 Extracting frames...",_make_progress_bar(20,"","Pipeline")
df,fps=extract_feature_timeseries(video_file,FRAME_SKIP,DEBUG_MODE)
if df.empty:
yield [],"❌ No frames processed!",_make_progress_bar(100,"No Frames ❌","Pipeline");return
yield [],"🔄 YOLO masking...",_make_progress_bar(40,"","Pipeline")
yield [],"🔄 Feature analysis...",_make_progress_bar(60,"","Pipeline")
yield [],"🔄 Scoring...",_make_progress_bar(80,"","Pipeline")
clips,msg=extract_score_clips(video_file,DEBUG_MODE)
final=_make_progress_bar(100,f"Detected {len(clips)} Touches ⚡","Pipeline")
print("[GUI] Finished.")
yield clips,msg,final
# =====================================================
# Gradio interface
# =====================================================
with gr.Blocks(css=CSS,title="Fencing Scoreboard Detector") as demo:
gr.Markdown("## 🤺 Fencing Score Detector\nUpload a fencing bout video and automatically detect scoreboard lights using YOLO + AutoGluon.")
in_video=gr.Video(label="Upload Bout Video",elem_classes="full-width",height=400)
run_btn=gr.Button("⚡ Detect Touches",elem_classes="full-width")
progress_html=gr.HTML(value="",label="Progress",visible=False)
status=gr.Markdown("Ready.")
gallery=gr.Gallery(label="Detected Clips",columns=1,height=400,visible=False)
def wrapped_run(video_file):
print("[SYSTEM] User started detection.")
yield [],"Processing started...",gr.update(value=_make_progress_bar(0),visible=True)
for clips,msg,bar in run_with_progress(video_file):
print(f"[SYSTEM] {msg}")
yield gr.update(value=clips,visible=bool(clips)),msg,gr.update(value=bar,visible=True)
run_btn.click(fn=wrapped_run,inputs=in_video,outputs=[gallery,status,progress_html])
if __name__=="__main__":
demo.launch(debug=True)