Spaces:
Running
Running
| """ | |
| VisionAI β Object Detection & Human Pose Estimation using YOLO | |
| Semester Project | |
| Key features: | |
| β’ weapon_detection.pt β custom weapon model (bundled) | |
| β’ Pose Threat Analysis β classifies each detected person's pose as: | |
| π’ NORMAL β relaxed / standing / walking | |
| π‘ SUSPICIOUS β crouching / leaning / unusual angle | |
| π΄ THREATENING β raised arms / aggressive / weapon + person together | |
| β’ FPS-based video scanning (choose how many frames/sec to analyse) | |
| β’ Works on HuggingFace free tier (CPU-safe) | |
| """ | |
| import cv2 | |
| import json | |
| import math | |
| import tempfile | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| import gradio as gr | |
| from ultralytics import YOLO | |
| try: | |
| import spaces | |
| except ImportError: | |
| class spaces: | |
| def GPU(fn): return fn | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODEL LOADING | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("=" * 60) | |
| print("[VisionAI] Loading models ...") | |
| def _load(path, label): | |
| try: | |
| m = YOLO(path) | |
| print(f" β {label} ({path})") | |
| return m | |
| except Exception as e: | |
| print(f" β οΈ {label} skipped β {e}") | |
| return None | |
| MODEL_OD = _load("yolo11m.pt", "Object Detection") | |
| MODEL_POSE = _load("yolo11m-pose.pt", "Pose Estimation") | |
| MODEL_SEG = _load("yolo11m-seg.pt", "Segmentation") | |
| MODEL_CLS = _load("yolo11m-cls.pt", "Classification") | |
| MODEL_OBB = _load("yolo11m-obb.pt", "OBB Detection") | |
| MODEL_WEAPON = _load("weapon_detection.pt", "Weapon Detection β ") | |
| # Ordered task registry (always includes weapon if loaded) | |
| MODELS = {} | |
| if MODEL_OD: MODELS["object_detection"] = MODEL_OD | |
| if MODEL_POSE: MODELS["pose"] = MODEL_POSE | |
| if MODEL_SEG: MODELS["segmentation"] = MODEL_SEG | |
| if MODEL_CLS: MODELS["classification"] = MODEL_CLS | |
| if MODEL_OBB: MODELS["obb"] = MODEL_OBB | |
| if MODEL_WEAPON: MODELS["weapon"] = MODEL_WEAPON | |
| TASK_DISPLAY = { | |
| "object_detection": "π Object Detection", | |
| "pose": "𦴠Pose Estimation", | |
| "segmentation": "π Segmentation", | |
| "classification": "π·οΈ Classification", | |
| "obb": "π¦ OBB Detection", | |
| "weapon": "π« Weapon Detection", | |
| } | |
| OVERLAY_TASKS = [t for t in ["object_detection","pose","segmentation","obb","weapon"] if t in MODELS] | |
| ALL_TASKS = list(MODELS.keys()) | |
| print(f"[VisionAI] β {len(MODELS)} models loaded: {ALL_TASKS}") | |
| print("=" * 60) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # POSE THREAT ANALYSER | |
| # COCO 17 keypoints: | |
| # 0-nose 1-left_eye 2-right_eye 3-left_ear 4-right_ear | |
| # 5-left_shoulder 6-right_shoulder | |
| # 7-left_elbow 8-right_elbow | |
| # 9-left_wrist 10-right_wrist | |
| # 11-left_hip 12-right_hip | |
| # 13-left_knee 14-right_knee | |
| # 15-left_ankle 16-right_ankle | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| THREAT_NORMAL = "NORMAL" | |
| THREAT_SUSPICIOUS = "SUSPICIOUS" | |
| THREAT_THREATENING = "THREATENING" | |
| THREAT_COLOR = { | |
| THREAT_NORMAL: (34, 197, 94), # green | |
| THREAT_SUSPICIOUS: (234, 179, 8), # yellow | |
| THREAT_THREATENING: (239, 68, 68), # red | |
| } | |
| THREAT_EMOJI = { | |
| THREAT_NORMAL: "π’", | |
| THREAT_SUSPICIOUS: "π‘", | |
| THREAT_THREATENING: "π΄", | |
| } | |
| def _kp(kps, idx): | |
| """Return (x, y, visible) for keypoint index. visible=True if coords > 0.""" | |
| if idx >= len(kps): | |
| return 0, 0, False | |
| x, y = float(kps[idx][0]), float(kps[idx][1]) | |
| return x, y, (x > 1 and y > 1) | |
| def _angle(a, b, c): | |
| """Angle at point b formed by a-b-c (degrees).""" | |
| ax, ay = a[0]-b[0], a[1]-b[1] | |
| cx, cy = c[0]-b[0], c[1]-b[1] | |
| dot = ax*cx + ay*cy | |
| mag = (math.hypot(ax,ay) * math.hypot(cx,cy)) + 1e-6 | |
| return math.degrees(math.acos(max(-1, min(1, dot/mag)))) | |
| def analyse_pose_threat(kps, weapon_in_frame=False): | |
| """ | |
| Returns (threat_level, reason_string) for a single person's keypoints. | |
| kps: list of [x, y] for 17 COCO keypoints. | |
| """ | |
| # ββ Extract key points ββ | |
| nose_x, nose_y, nose_v = _kp(kps, 0) | |
| ls_x, ls_y, ls_v = _kp(kps, 5) # left shoulder | |
| rs_x, rs_y, rs_v = _kp(kps, 6) # right shoulder | |
| le_x, le_y, le_v = _kp(kps, 7) # left elbow | |
| re_x, re_y, re_v = _kp(kps, 8) # right elbow | |
| lw_x, lw_y, lw_v = _kp(kps, 9) # left wrist | |
| rw_x, rw_y, rw_v = _kp(kps, 10) # right wrist | |
| lh_x, lh_y, lh_v = _kp(kps, 11) # left hip | |
| rh_x, rh_y, rh_v = _kp(kps, 12) # right hip | |
| lk_x, lk_y, lk_v = _kp(kps, 13) # left knee | |
| rk_x, rk_y, rk_v = _kp(kps, 14) # right knee | |
| la_x, la_y, la_v = _kp(kps, 15) # left ankle | |
| ra_x, ra_y, ra_v = _kp(kps, 16) # right ankle | |
| reasons = [] | |
| score = 0 # accumulate threat score | |
| # ββ 1. ARMS RAISED (wrists above shoulders) ββ | |
| arms_raised = 0 | |
| if lw_v and ls_v and lw_y < ls_y - 20: # y decreases upward in image coords | |
| arms_raised += 1 | |
| if rw_v and rs_v and rw_y < rs_y - 20: | |
| arms_raised += 1 | |
| if arms_raised == 2: | |
| score += 3 | |
| reasons.append("both arms raised") | |
| elif arms_raised == 1: | |
| score += 1 | |
| reasons.append("one arm raised") | |
| # ββ 2. ARMS EXTENDED FORWARD / POINTING ββ | |
| # Wrists far from body centre horizontally = reaching/pointing | |
| body_cx = 0 | |
| if ls_v and rs_v: | |
| body_cx = (ls_x + rs_x) / 2 | |
| if body_cx > 0: | |
| if lw_v and abs(lw_x - body_cx) > 120: | |
| score += 1 | |
| reasons.append("left arm extended") | |
| if rw_v and abs(rw_x - body_cx) > 120: | |
| score += 1 | |
| reasons.append("right arm extended") | |
| # ββ 3. ELBOW ANGLE (acute = punching / striking pose) ββ | |
| if lw_v and le_v and ls_v: | |
| ang = _angle((ls_x,ls_y),(le_x,le_y),(lw_x,lw_y)) | |
| if ang < 70: | |
| score += 2 | |
| reasons.append(f"left arm bent aggressively ({ang:.0f}Β°)") | |
| if rw_v and re_v and rs_v: | |
| ang = _angle((rs_x,rs_y),(re_x,re_y),(rw_x,rw_y)) | |
| if ang < 70: | |
| score += 2 | |
| reasons.append(f"right arm bent aggressively ({ang:.0f}Β°)") | |
| # ββ 4. CROUCHING (knees higher than hips relative to ankles) ββ | |
| if lk_v and lh_v and la_v: | |
| torso_h = abs(lh_y - la_y) + 1e-6 | |
| crouch_ratio = (lk_y - lh_y) / torso_h | |
| if crouch_ratio < 0.15: # knee close to hip β crouching | |
| score += 1 | |
| reasons.append("crouching posture") | |
| # ββ 5. LEANING / TILTED BODY ββ | |
| if ls_v and rs_v: | |
| shoulder_tilt = abs(ls_y - rs_y) / (abs(ls_x - rs_x) + 1e-6) | |
| if shoulder_tilt > 0.45: | |
| score += 1 | |
| reasons.append(f"body tilted ({shoulder_tilt:.2f})") | |
| # ββ 6. WEAPON IN SAME FRAME ββ | |
| if weapon_in_frame: | |
| score += 4 | |
| reasons.append("weapon detected nearby") | |
| # ββ 7. WIDE STANCE (feet far apart) ββ | |
| if la_v and ra_v and ls_v and rs_v: | |
| shoulder_w = abs(ls_x - rs_x) + 1e-6 | |
| stance_w = abs(la_x - ra_x) | |
| if stance_w / shoulder_w > 1.8: | |
| score += 1 | |
| reasons.append("wide aggressive stance") | |
| # ββ Map score β threat level ββ | |
| if score >= 6: | |
| level = THREAT_THREATENING | |
| elif score >= 2: | |
| level = THREAT_SUSPICIOUS | |
| else: | |
| level = THREAT_NORMAL | |
| reason_str = ", ".join(reasons) if reasons else "relaxed posture" | |
| return level, reason_str, score | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # OVERLAY DRAWING | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def draw_threat_overlay(frame_bgr, persons): | |
| """ | |
| Draw a threat status badge per person on the frame. | |
| persons: list of dicts with keys: bbox, threat, reason, score | |
| Returns annotated BGR frame. | |
| """ | |
| out = frame_bgr.copy() | |
| for p in persons: | |
| x1, y1, x2, y2 = [int(v) for v in p["bbox"]] | |
| threat = p["threat"] | |
| color = THREAT_COLOR[threat] # (R,G,B) | |
| bgr = (color[2], color[1], color[0]) # cv2 BGR | |
| emoji = THREAT_EMOJI[threat] | |
| # Bounding box border | |
| cv2.rectangle(out, (x1,y1), (x2,y2), bgr, 2) | |
| # Label background | |
| label = f"{emoji} {threat}" | |
| (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_DUPLEX, 0.6, 1) | |
| cv2.rectangle(out, (x1, y1-th-8), (x1+tw+8, y1), bgr, -1) | |
| cv2.putText(out, label, (x1+4, y1-4), | |
| cv2.FONT_HERSHEY_DUPLEX, 0.6, (255,255,255), 1, cv2.LINE_AA) | |
| # Reason sub-label (smaller, below box) | |
| reason_short = p["reason"][:50] | |
| cv2.putText(out, reason_short, (x1+2, y2+16), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.42, bgr, 1, cv2.LINE_AA) | |
| # ββ Overall frame status banner (top of frame) ββ | |
| if persons: | |
| worst = max(persons, key=lambda p: p["score"]) | |
| w_threat = worst["threat"] | |
| w_color = THREAT_COLOR[w_threat] | |
| w_bgr = (w_color[2], w_color[1], w_color[0]) | |
| banner = f" {THREAT_EMOJI[w_threat]} OVERALL: {w_threat} ({len(persons)} person(s) detected)" | |
| (bw, bh), _ = cv2.getTextSize(banner, cv2.FONT_HERSHEY_DUPLEX, 0.7, 1) | |
| cv2.rectangle(out, (0,0), (bw+16, bh+12), w_bgr, -1) | |
| cv2.putText(out, banner, (8, bh+4), | |
| cv2.FONT_HERSHEY_DUPLEX, 0.7, (255,255,255), 1, cv2.LINE_AA) | |
| return out | |
| def run_combined_analysis(frame_np, conf, iou, img_size): | |
| """ | |
| Run Object Detection + Pose + Weapon on one frame. | |
| Returns annotated PIL image + analysis dict. | |
| """ | |
| # ββ Step 1: Weapon detection ββ | |
| weapon_in_frame = False | |
| weapon_dets = [] | |
| if MODEL_WEAPON: | |
| w_res = MODEL_WEAPON.predict(source=frame_np, conf=conf, iou=iou, | |
| imgsz=img_size, verbose=False) | |
| for r in w_res: | |
| if r.boxes is not None and len(r.boxes): | |
| weapon_in_frame = True | |
| for box in r.boxes: | |
| weapon_dets.append({ | |
| "label": MODEL_WEAPON.names[int(box.cls)], | |
| "confidence": round(float(box.conf), 3), | |
| "bbox": [round(v,1) for v in box.xyxy[0].tolist()], | |
| }) | |
| # ββ Step 2: Pose estimation ββ | |
| persons = [] | |
| pose_anno = frame_np.copy() | |
| if MODEL_POSE: | |
| p_res = MODEL_POSE.predict(source=frame_np, conf=conf, iou=iou, | |
| imgsz=img_size, verbose=False) | |
| for r in p_res: | |
| pose_anno = r.plot() # skeleton overlay | |
| if r.boxes is None or r.keypoints is None: | |
| continue | |
| for i, box in enumerate(r.boxes): | |
| if MODEL_POSE.names[int(box.cls)] != "person": | |
| continue | |
| kps = r.keypoints.xy[i].tolist() | |
| threat, reason, score = analyse_pose_threat(kps, weapon_in_frame) | |
| persons.append({ | |
| "id": i, | |
| "bbox": [round(v,1) for v in box.xyxy[0].tolist()], | |
| "threat": threat, | |
| "reason": reason, | |
| "score": score, | |
| "keypoints_count": sum(1 for k in kps if k[0]>1 and k[1]>1), | |
| }) | |
| # Convert pose_anno (may be BGR from r.plot()) to BGR numpy | |
| if isinstance(pose_anno, np.ndarray) and pose_anno.shape[2] == 3: | |
| anno_bgr = pose_anno if pose_anno.dtype == np.uint8 else (pose_anno*255).astype(np.uint8) | |
| # r.plot() returns RGB; convert to BGR for cv2 | |
| anno_bgr = cv2.cvtColor(anno_bgr, cv2.COLOR_RGB2BGR) | |
| else: | |
| anno_bgr = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR) | |
| # ββ Step 3: Draw weapon boxes on top ββ | |
| for wd in weapon_dets: | |
| x1,y1,x2,y2 = [int(v) for v in wd["bbox"]] | |
| cv2.rectangle(anno_bgr, (x1,y1), (x2,y2), (0,0,220), 3) | |
| lbl = f"π« {wd['label']} {wd['confidence']:.0%}" | |
| cv2.putText(anno_bgr, lbl, (x1, y1-6), | |
| cv2.FONT_HERSHEY_DUPLEX, 0.6, (0,0,220), 1) | |
| # ββ Step 4: Draw threat overlays ββ | |
| anno_bgr = draw_threat_overlay(anno_bgr, persons) | |
| # Back to RGB PIL | |
| out_pil = Image.fromarray(cv2.cvtColor(anno_bgr, cv2.COLOR_BGR2RGB)) | |
| analysis = { | |
| "persons_detected": len(persons), | |
| "weapon_detected": weapon_in_frame, | |
| "weapons": weapon_dets, | |
| "persons": persons, | |
| "overall_threat": max((p["threat"] for p in persons), | |
| key=lambda t: [THREAT_NORMAL,THREAT_SUSPICIOUS,THREAT_THREATENING].index(t)) | |
| if persons else THREAT_NORMAL, | |
| } | |
| return out_pil, analysis | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CORE HELPERS (single-model path) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def predict(model, frame_np, conf, iou, img_size): | |
| return model.predict(source=frame_np, conf=conf, iou=iou, | |
| imgsz=img_size, verbose=False, | |
| show_labels=True, show_conf=True) | |
| def extract_dets(results, task, model): | |
| dets = [] | |
| for r in results: | |
| if task == "classification": | |
| if r.probs is not None: | |
| for idx, c in zip(r.probs.top5, r.probs.top5conf.tolist()): | |
| dets.append({"label": model.names[idx], "confidence": round(float(c),3)}) | |
| else: | |
| if r.boxes is not None: | |
| for i, box in enumerate(r.boxes): | |
| d = {"id": i, "label": model.names[int(box.cls)], | |
| "confidence": round(float(box.conf),3), | |
| "bbox": [round(v,1) for v in box.xyxy[0].tolist()]} | |
| if task == "pose" and r.keypoints is not None: | |
| kps = r.keypoints.xy[i].tolist() | |
| d["keypoints"] = [[round(x,1),round(y,1)] for x,y in kps] | |
| dets.append(d) | |
| return dets | |
| def to_pil(results): | |
| for r in results: | |
| return Image.fromarray(r.plot()[..., ::-1]) | |
| return None | |
| def resize_frame(frame, src_w, src_h, max_side=640): | |
| scale = min(max_side / max(src_w, src_h), 1.0) | |
| if scale < 1.0: | |
| ow = int(src_w*scale)&~1; oh = int(src_h*scale)&~1 | |
| if frame is None: | |
| return None, ow, oh, scale | |
| return cv2.resize(frame,(ow,oh)), ow, oh, scale | |
| if frame is None: | |
| return None, src_w&~1, src_h&~1, 1.0 | |
| return frame, src_w&~1, src_h&~1, 1.0 | |
| def _frame_interval(src_fps, scan_fps): | |
| return max(1, round(src_fps / min(scan_fps, src_fps))) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # INFERENCE FUNCTIONS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββ COMBINED IMAGE (Pose + OD + Weapon + Threat) ββββββββββββββββββ | |
| def infer_combined_image(image, conf, iou, img_size): | |
| if image is None: | |
| return None, '{"error":"No image"}' | |
| img_np = np.array(image.convert("RGB")) | |
| out_pil, analysis = run_combined_analysis(img_np, conf, iou, img_size) | |
| return out_pil, json.dumps(analysis, indent=2) | |
| # ββ SINGLE MODEL IMAGE ββββββββββββββββββββββββββββββββββββββββββββ | |
| def infer_image(image, task, conf, iou, img_size): | |
| if image is None: | |
| return None, '{"error":"No image"}' | |
| img_np = np.array(image.convert("RGB")) | |
| model = MODELS[task] | |
| results = predict(model, img_np, conf, iou, img_size) | |
| dets = extract_dets(results, task, model) | |
| out_img = to_pil(results) | |
| payload = {"task": TASK_DISPLAY[task], "count": len(dets), "detections": dets} | |
| return out_img, json.dumps(payload, indent=2) | |
| # ββ COMBINED VIDEO (Pose Threat per frame) ββββββββββββββββββββββββ | |
| def infer_combined_video(video_path, conf, iou, img_size, | |
| scan_fps=1, max_frames=300, progress=gr.Progress()): | |
| if video_path is None: | |
| return None, '{"error":"No video"}' | |
| cap = cv2.VideoCapture(video_path) | |
| src_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| src_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 640 | |
| src_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 480 | |
| total_src= max(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 1) | |
| # Lock to 1 fps: only process & write one frame per second | |
| scan_fps = float(scan_fps) if scan_fps else 1.0 | |
| interval = _frame_interval(src_fps, scan_fps) | |
| out_fps = max(src_fps / interval, 1.0) | |
| _, out_w, out_h, scale = resize_frame(None, src_w, src_h) | |
| tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name | |
| # Try H.264 first (smaller + browser-compatible), fall back to mp4v | |
| fourcc = cv2.VideoWriter_fourcc(*"avc1") | |
| vw = cv2.VideoWriter(tmp, fourcc, out_fps, (out_w, out_h)) | |
| if not vw.isOpened(): | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| vw = cv2.VideoWriter(tmp, fourcc, out_fps, (out_w, out_h)) | |
| frame_idx = 0 | |
| proc_count = 0 | |
| threat_counts = {THREAT_NORMAL:0, THREAT_SUSPICIOUS:0, THREAT_THREATENING:0} | |
| total_weapons = 0 | |
| progress(0, desc="Starting β¦") | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret or proc_count >= int(max_frames): | |
| break | |
| if scale < 1.0: | |
| frame = cv2.resize(frame, (out_w, out_h)) | |
| # Only process and write frames at the target scan rate | |
| if frame_idx % interval == 0: | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| _, analysis = run_combined_analysis(frame_rgb, conf, iou, img_size) | |
| anno_bgr = frame.copy() | |
| if MODEL_POSE: | |
| pr = MODEL_POSE.predict(source=frame_rgb, conf=conf, iou=iou, | |
| imgsz=img_size, verbose=False) | |
| for r in pr: | |
| plotted = r.plot() # RGB | |
| anno_bgr = cv2.cvtColor(plotted, cv2.COLOR_RGB2BGR) | |
| for wd in analysis["weapons"]: | |
| x1,y1,x2,y2 = [int(v) for v in wd["bbox"]] | |
| cv2.rectangle(anno_bgr,(x1,y1),(x2,y2),(0,0,220),3) | |
| cv2.putText(anno_bgr, f"WEAPON {wd['confidence']:.0%}", | |
| (x1,y1-6), cv2.FONT_HERSHEY_DUPLEX, 0.6,(0,0,220),1) | |
| anno_bgr = draw_threat_overlay(anno_bgr, analysis["persons"]) | |
| # Only write this annotated frame (skip raw in-between frames entirely) | |
| vw.write(anno_bgr) | |
| for p in analysis["persons"]: | |
| threat_counts[p["threat"]] += 1 | |
| total_weapons += len(analysis["weapons"]) | |
| proc_count += 1 | |
| ot = analysis["overall_threat"] | |
| progress(min(frame_idx/total_src, 0.99), | |
| desc=f"Frame {frame_idx}/{total_src} | {THREAT_EMOJI[ot]} {ot}") | |
| frame_idx += 1 | |
| cap.release() | |
| vw.release() | |
| progress(1.0, desc="β Done!") | |
| payload = { | |
| "source_fps": round(src_fps,2), | |
| "scan_fps": round(scan_fps,2), | |
| "frame_interval": interval, | |
| "frames_scanned": proc_count, | |
| "total_frames": frame_idx, | |
| "resolution": f"{out_w}x{out_h}", | |
| "weapon_detections":total_weapons, | |
| "pose_threat_summary": { | |
| f"{THREAT_EMOJI[THREAT_NORMAL]} NORMAL": threat_counts[THREAT_NORMAL], | |
| f"{THREAT_EMOJI[THREAT_SUSPICIOUS]} SUSPICIOUS": threat_counts[THREAT_SUSPICIOUS], | |
| f"{THREAT_EMOJI[THREAT_THREATENING]} THREATENING": threat_counts[THREAT_THREATENING], | |
| }, | |
| } | |
| return tmp, json.dumps(payload, indent=2) | |
| # ββ SINGLE MODEL VIDEO ββββββββββββββββββββββββββββββββββββββββββββ | |
| def infer_video(video_path, task, conf, iou, img_size, | |
| scan_fps=1, max_frames=300, progress=gr.Progress()): | |
| if video_path is None: | |
| return None, '{"error":"No video"}' | |
| model = MODELS[task] | |
| cap = cv2.VideoCapture(video_path) | |
| src_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| src_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 640 | |
| src_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 480 | |
| total_src= max(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),1) | |
| scan_fps = float(scan_fps) if scan_fps else 1.0 | |
| interval = _frame_interval(src_fps, scan_fps) | |
| out_fps = max(src_fps / interval, 1.0) | |
| _, out_w, out_h, scale = resize_frame(None, src_w, src_h) | |
| tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name | |
| # Try H.264 first (smaller + browser-compatible), fall back to mp4v | |
| fourcc = cv2.VideoWriter_fourcc(*"avc1") | |
| vw = cv2.VideoWriter(tmp, fourcc, out_fps, (out_w, out_h)) | |
| if not vw.isOpened(): | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| vw = cv2.VideoWriter(tmp, fourcc, out_fps, (out_w, out_h)) | |
| frame_idx=0; proc_count=0; total_dets=0 | |
| progress(0, desc="Starting β¦") | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret or proc_count >= int(max_frames): break | |
| if scale < 1.0: frame = cv2.resize(frame,(out_w,out_h)) | |
| # Only process and write frames at the target scan rate | |
| if frame_idx % interval == 0: | |
| results = predict(model, frame, conf, iou, img_size) | |
| for r in results: | |
| plotted = r.plot() # r.plot() returns RGB; convert to BGR for VideoWriter | |
| annotated_bgr = cv2.cvtColor(plotted, cv2.COLOR_RGB2BGR) | |
| if r.boxes is not None: total_dets += len(r.boxes) | |
| vw.write(annotated_bgr if 'annotated_bgr' in dir() else frame) | |
| proc_count += 1 | |
| progress(min(frame_idx/total_src,0.99), | |
| desc=f"Frame {frame_idx}/{total_src} | {total_dets} dets") | |
| frame_idx += 1 | |
| cap.release(); vw.release() | |
| progress(1.0, desc="β Done!") | |
| payload = { | |
| "task": TASK_DISPLAY[task], | |
| "source_fps": round(src_fps,2), "scan_fps": round(scan_fps,2), | |
| "frame_interval": interval, "frames_scanned": proc_count, | |
| "resolution": f"{out_w}x{out_h}", "total_detections": total_dets, | |
| "avg_detections_per_scanned_frame": round(total_dets/max(proc_count,1),2), | |
| } | |
| return tmp, json.dumps(payload, indent=2) | |
| # ββ WEBCAM β COMBINED (Pose Threat + Weapon live) βββββββββββββββββ | |
| def stream_webcam_combined(frame, conf, iou, img_size): | |
| if frame is None: | |
| return None | |
| out_pil, _ = run_combined_analysis(frame, conf, iou, img_size) | |
| return np.array(out_pil) | |
| # ββ WEBCAM β SINGLE MODEL βββββββββββββββββββββββββββββββββββββββββ | |
| def stream_webcam(frame, task, conf, iou, img_size): | |
| if frame is None: | |
| return None | |
| model = MODELS[task] | |
| results = predict(model, frame, conf, iou, img_size) | |
| for r in results: | |
| return r.plot()[..., ::-1] | |
| return frame | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UI HELPERS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def shared_controls(default_conf=0.25): | |
| with gr.Row(): | |
| conf = gr.Slider(0.05, 0.95, value=default_conf, step=0.05, label="Confidence") | |
| iou = gr.Slider(0.05, 0.95, value=0.45, step=0.05, label="IoU Threshold") | |
| isize = gr.Slider(320, 1280, value=640, step=32, label="Image Size") | |
| return conf, iou, isize | |
| def video_controls(): | |
| with gr.Row(): | |
| scan_fps = gr.Radio( | |
| choices=[1,2,3,5,8,10,15,24], value=5, type="value", | |
| label="Scan FPS Β· frames per second to analyse Β· higher = thorough but slower" | |
| ) | |
| max_frames = gr.Slider(50, 600, value=200, step=50, label="Max Frames Cap") | |
| return scan_fps, max_frames | |
| _order = ["object_detection","pose","segmentation","classification","obb","weapon"] | |
| TASK_CHOICES = [(TASK_DISPLAY[t],t) for t in _order if t in MODELS] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CSS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CSS = """ | |
| body,.gradio-container{ | |
| background:#060c1a!important;color:#e2e8f0!important; | |
| font-family:'Segoe UI',system-ui,sans-serif | |
| } | |
| .hero{ | |
| background:linear-gradient(135deg,#0d1b2a,#1a2744,#0f3460); | |
| border-radius:16px;padding:2rem;margin-bottom:1rem; | |
| border:1px solid #1e3a5f;text-align:center | |
| } | |
| .hero h1{ | |
| font-size:2rem;font-weight:800; | |
| background:linear-gradient(90deg,#38bdf8,#818cf8,#34d399); | |
| -webkit-background-clip:text;-webkit-text-fill-color:transparent;margin:0 | |
| } | |
| .hero p{color:#94a3b8;margin:.4rem 0 0} | |
| .threat-banner{ | |
| background:linear-gradient(135deg,rgba(99,102,241,.12),rgba(34,211,238,.08)); | |
| border:1px solid rgba(99,102,241,.4);border-radius:12px; | |
| padding:.85rem 1.25rem;margin-bottom:.75rem;font-size:.9rem | |
| } | |
| .threat-legend{ | |
| display:flex;gap:1rem;flex-wrap:wrap;margin-top:.5rem;font-size:.82rem | |
| } | |
| .tl-normal{color:#22c55e} .tl-sus{color:#eab308} .tl-threat{color:#ef4444} | |
| .tip{ | |
| background:rgba(52,211,153,.08);border:1px solid rgba(52,211,153,.3); | |
| border-radius:8px;padding:.5rem 1rem;color:#6ee7b7;font-size:.84rem;margin-bottom:.5rem | |
| } | |
| .weapon-note{ | |
| background:rgba(239,68,68,.08);border:1px solid rgba(239,68,68,.25); | |
| border-radius:8px;padding:.5rem 1rem;color:#fca5a5;font-size:.84rem;margin-bottom:.5rem | |
| } | |
| """ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GRADIO UI | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| THREAT_LEGEND_HTML = """ | |
| <div class="threat-legend"> | |
| <span class="tl-normal">π’ NORMAL β relaxed / standing / walking</span> | |
| <span class="tl-sus">π‘ SUSPICIOUS β crouching / leaning / unusual posture</span> | |
| <span class="tl-threat">π΄ THREATENING β raised arms / aggressive / weapon present</span> | |
| </div>""" | |
| with gr.Blocks(css=CSS, title="VisionAI β Object Detection & Pose Estimation") as app: | |
| gr.HTML(""" | |
| <div class="hero"> | |
| <h1>π€ VisionAI β Object Detection & Human Pose Estimation</h1> | |
| <p>YOLO11 Β· Pose Threat Analysis Β· Weapon Detection (weapon_detection.pt) Β· FPS-based Video Scanning | |
| <br><small style="color:#64748b">Semester Project β all models pre-loaded at startup</small></p> | |
| </div>""") | |
| with gr.Tabs(): | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 1 β POSE THREAT ANALYSIS (primary feature) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π― Pose Threat Analysis"): | |
| gr.HTML(f""" | |
| <div class="threat-banner"> | |
| <strong>Pose Threat Analysis</strong> β Runs Pose Estimation + Weapon Detection together. | |
| Each detected person is classified by posture: | |
| {THREAT_LEGEND_HTML} | |
| </div>""") | |
| with gr.Tabs(): | |
| # IMAGE | |
| with gr.Tab("π· Image"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| ta_img_in = gr.Image(type="pil", label="Upload Image") | |
| conf_tai, iou_tai, sz_tai = shared_controls() | |
| btn_tai = gr.Button("π― Analyse Threat", variant="primary") | |
| with gr.Column(): | |
| ta_img_out = gr.Image(type="pil", label="Annotated Result") | |
| ta_img_json = gr.Code(label="Threat Analysis JSON", language="json") | |
| btn_tai.click(infer_combined_image, | |
| [ta_img_in, conf_tai, iou_tai, sz_tai], | |
| [ta_img_out, ta_img_json]) | |
| # VIDEO | |
| with gr.Tab("π¬ Video"): | |
| gr.HTML('<div class="tip">β‘ Pose threat is evaluated on every scanned frame. Use Scan FPS 3β5 on free tier.</div>') | |
| with gr.Row(): | |
| with gr.Column(): | |
| ta_vid_in = gr.Video(label="Upload Video") | |
| conf_tav, iou_tav, sz_tav = shared_controls() | |
| fs_tav, mf_tav = video_controls() | |
| btn_tav = gr.Button("π― Analyse Video Threats", variant="primary") | |
| with gr.Column(): | |
| ta_vid_out = gr.Video(label="Annotated Output") | |
| ta_vid_json = gr.Code(label="Threat Summary JSON", language="json") | |
| btn_tav.click(infer_combined_video, | |
| [ta_vid_in, conf_tav, iou_tav, sz_tav, fs_tav, mf_tav], | |
| [ta_vid_out, ta_vid_json]) | |
| # WEBCAM | |
| with gr.Tab("π‘ Live Webcam"): | |
| gr.HTML(f""" | |
| <div class="threat-banner"> | |
| π‘ <strong>Live Pose Threat Detection</strong> β real-time per-person threat classification. | |
| {THREAT_LEGEND_HTML} | |
| </div>""") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| conf_taw, iou_taw, sz_taw = shared_controls(default_conf=0.30) | |
| gr.Markdown(""" | |
| **Tips for live accuracy:** | |
| - Stand in full view of camera | |
| - Ensure good lighting | |
| - Image Size 320 = faster on CPU | |
| - Raise both arms to test π΄ THREATENING | |
| """) | |
| with gr.Column(scale=2): | |
| ta_cam_in = gr.Image(sources=["webcam"], streaming=True, | |
| type="numpy", label="Webcam Feed") | |
| ta_cam_out = gr.Image(streaming=True, | |
| label="π― Live Threat Analysis") | |
| ta_cam_in.stream(stream_webcam_combined, | |
| [ta_cam_in, conf_taw, iou_taw, sz_taw], | |
| [ta_cam_out]) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 2 β WEAPON DETECTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π« Weapon Detection"): | |
| gr.HTML(""" | |
| <div class="weapon-note"> | |
| π« <strong>Custom Weapon Detection Model</strong> (weapon_detection.pt) β | |
| detects firearms and other weapons. Combined with pose analysis for full threat assessment. | |
| </div>""") | |
| with gr.Tabs(): | |
| with gr.Tab("π· Image"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| wp_in = gr.Image(type="pil", label="Upload Image") | |
| conf_wp, iou_wp, sz_wp = shared_controls(default_conf=0.20) | |
| btn_wp = gr.Button("π« Detect Weapons", variant="primary") | |
| with gr.Column(): | |
| wp_out = gr.Image(type="pil", label="Result") | |
| wp_json = gr.Code(label="Detection JSON", language="json") | |
| btn_wp.click(infer_image, | |
| [wp_in, gr.State("weapon"), conf_wp, iou_wp, sz_wp], | |
| [wp_out, wp_json]) | |
| with gr.Tab("π¬ Video"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| wpv_in = gr.Video(label="Upload Video") | |
| conf_wpv, iou_wpv, sz_wpv = shared_controls(default_conf=0.20) | |
| fs_wpv, mf_wpv = video_controls() | |
| btn_wpv = gr.Button("π« Detect Weapons in Video", variant="primary") | |
| with gr.Column(): | |
| wpv_out = gr.Video(label="Annotated Video") | |
| wpv_json = gr.Code(label="Summary JSON", language="json") | |
| btn_wpv.click(infer_video, | |
| [wpv_in, gr.State("weapon"), conf_wpv, iou_wpv, sz_wpv, fs_wpv, mf_wpv], | |
| [wpv_out, wpv_json]) | |
| with gr.Tab("π‘ Webcam"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| conf_wpc, iou_wpc, sz_wpc = shared_controls(default_conf=0.20) | |
| with gr.Column(scale=2): | |
| wpc_in = gr.Image(sources=["webcam"], streaming=True, | |
| type="numpy", label="Webcam") | |
| wpc_out = gr.Image(streaming=True, label="π« Weapon Detection Live") | |
| wpc_in.stream(lambda f,c,i,s: stream_webcam(f,"weapon",c,i,s), | |
| [wpc_in, conf_wpc, iou_wpc, sz_wpc], | |
| [wpc_out]) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 3 β OBJECT DETECTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π Object Detection"): | |
| with gr.Tabs(): | |
| with gr.Tab("π· Image"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| od_in = gr.Image(type="pil", label="Upload Image") | |
| conf_od, iou_od, sz_od = shared_controls() | |
| btn_od = gr.Button("βΆ Run Detection", variant="primary") | |
| with gr.Column(): | |
| od_out = gr.Image(type="pil", label="Result") | |
| od_json = gr.Code(label="JSON", language="json") | |
| btn_od.click(infer_image, | |
| [od_in, gr.State("object_detection"), conf_od, iou_od, sz_od], | |
| [od_out, od_json]) | |
| with gr.Tab("π¬ Video"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| odv_in = gr.Video(label="Upload Video") | |
| conf_odv, iou_odv, sz_odv = shared_controls() | |
| fs_odv, mf_odv = video_controls() | |
| btn_odv = gr.Button("βΆ Process Video", variant="primary") | |
| with gr.Column(): | |
| odv_out = gr.Video(label="Annotated Video") | |
| odv_json = gr.Code(label="Summary JSON", language="json") | |
| btn_odv.click(infer_video, | |
| [odv_in, gr.State("object_detection"), conf_odv, iou_odv, sz_odv, fs_odv, mf_odv], | |
| [odv_out, odv_json]) | |
| with gr.Tab("π‘ Webcam"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| conf_odc, iou_odc, sz_odc = shared_controls() | |
| with gr.Column(scale=2): | |
| odc_in = gr.Image(sources=["webcam"], streaming=True, | |
| type="numpy", label="Webcam") | |
| odc_out = gr.Image(streaming=True, label="Live Detection") | |
| odc_in.stream(lambda f,c,i,s: stream_webcam(f,"object_detection",c,i,s), | |
| [odc_in, conf_odc, iou_odc, sz_odc], | |
| [odc_out]) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 4 β POSE ESTIMATION (standalone) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("𦴠Pose Estimation"): | |
| with gr.Tabs(): | |
| with gr.Tab("π· Image"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| pe_in = gr.Image(type="pil", label="Upload Image") | |
| conf_pe, iou_pe, sz_pe = shared_controls() | |
| btn_pe = gr.Button("βΆ Estimate Pose", variant="primary") | |
| with gr.Column(): | |
| pe_out = gr.Image(type="pil", label="Skeleton Result") | |
| pe_json = gr.Code(label="Keypoints JSON", language="json") | |
| btn_pe.click(infer_image, | |
| [pe_in, gr.State("pose"), conf_pe, iou_pe, sz_pe], | |
| [pe_out, pe_json]) | |
| with gr.Tab("π¬ Video"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| pev_in = gr.Video(label="Upload Video") | |
| conf_pev, iou_pev, sz_pev = shared_controls() | |
| fs_pev, mf_pev = video_controls() | |
| btn_pev = gr.Button("βΆ Process Video", variant="primary") | |
| with gr.Column(): | |
| pev_out = gr.Video(label="Annotated Video") | |
| pev_json = gr.Code(label="Summary JSON", language="json") | |
| btn_pev.click(infer_video, | |
| [pev_in, gr.State("pose"), conf_pev, iou_pev, sz_pev, fs_pev, mf_pev], | |
| [pev_out, pev_json]) | |
| with gr.Tab("π‘ Webcam"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| conf_pec, iou_pec, sz_pec = shared_controls() | |
| with gr.Column(scale=2): | |
| pec_in = gr.Image(sources=["webcam"], streaming=True, | |
| type="numpy", label="Webcam") | |
| pec_out = gr.Image(streaming=True, label="Live Skeleton") | |
| pec_in.stream(lambda f,c,i,s: stream_webcam(f,"pose",c,i,s), | |
| [pec_in, conf_pec, iou_pec, sz_pec], | |
| [pec_out]) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 5 β OTHER MODELS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π§© More Models"): | |
| with gr.Tabs(): | |
| with gr.Tab("π· Image"): | |
| other_choices = [(TASK_DISPLAY[t],t) for t in | |
| ["segmentation","classification","obb"] if t in MODELS] | |
| if other_choices: | |
| task_om = gr.Radio(choices=other_choices, value=other_choices[0][1], | |
| label="Select Model") | |
| with gr.Row(): | |
| with gr.Column(): | |
| om_in = gr.Image(type="pil", label="Upload Image") | |
| conf_om, iou_om, sz_om = shared_controls() | |
| btn_om = gr.Button("βΆ Run", variant="primary") | |
| with gr.Column(): | |
| om_out = gr.Image(type="pil", label="Result") | |
| om_json = gr.Code(label="JSON", language="json") | |
| btn_om.click(infer_image, | |
| [om_in, task_om, conf_om, iou_om, sz_om], | |
| [om_out, om_json]) | |
| with gr.Tab("π¬ Video"): | |
| other_choices_v = [(TASK_DISPLAY[t],t) for t in | |
| ["segmentation","classification","obb"] if t in MODELS] | |
| if other_choices_v: | |
| task_omv = gr.Radio(choices=other_choices_v, value=other_choices_v[0][1], | |
| label="Select Model") | |
| with gr.Row(): | |
| with gr.Column(): | |
| omv_in = gr.Video(label="Upload Video") | |
| conf_omv, iou_omv, sz_omv = shared_controls() | |
| fs_omv, mf_omv = video_controls() | |
| btn_omv = gr.Button("βΆ Process Video", variant="primary") | |
| with gr.Column(): | |
| omv_out = gr.Video(label="Annotated Video") | |
| omv_json = gr.Code(label="Summary JSON", language="json") | |
| btn_omv.click(infer_video, | |
| [omv_in, task_omv, conf_omv, iou_omv, sz_omv, fs_omv, mf_omv], | |
| [omv_out, omv_json]) | |
| gr.HTML(""" | |
| <div style="text-align:center;padding:1.5rem;color:#475569;font-size:.82rem; | |
| margin-top:1rem;border-top:1px solid #1e293b;"> | |
| VisionAI Β· Object Detection & Human Pose Estimation Β· YOLO11 Β· weapon_detection.pt Β· Semester Project | |
| </div>""") | |
| if __name__ == "__main__": | |
| app.launch(server_name="0.0.0.0", server_port=7860, show_error=True) | |