diff --git "a/miner.py" "b/miner.py" --- "a/miner.py" +++ "b/miner.py" @@ -1,167 +1,97 @@ """ -Score Vision SN44 — Unified miner v3.29 (2026-04-08). R9c vehicle FP16 (mAP50=0.929). Person: TTA consensus + 15% box shrink + NMS 0.35. -Dual-model: vehicle (YOLO11m INT8 1280) + person (YOLO12s FP16 960 TRT). -Pose model: YOLOv8n-pose FP16 640 for false-positive filtering + keypoint box refinement. -Vehicle weights loaded from secondary HF repo (meaculpitt/ScoreVision-Vehicle). -Person weights loaded from primary HF repo (template downloads automatically). - -Vehicle model (vehicle_weights.onnx): - Trained classes: 0=car, 1=bus, 2=truck, 3=motorcycle - Output: 0=bus, 1=car, 2=truck, 3=motorcycle. All classes scored (v3.20 bus fix). - Per-class confidence thresholds: car 0.45, truck 0.45, motorcycle 0.35. - Per-class aspect ratio bounds for FP filtering. - Single-pass (v3.19) — flip TTA removed for RTF improvement. - -Person model (person_weights.onnx): - YOLO12s FP16 960px end2end [1,300,6]. Single class: 0=person. - Background TRT build: starts on CUDA immediately, builds TRT FP16 engine in background - thread (~18min on fresh node), swaps to TRT atomically when ready. Cached thereafter. - SAHI-style tiling: full + 2 adaptive tiles + flip TTA, max-conf NMS merge. - -Pose model (pose_weights.onnx): - YOLOv8n-pose FP16 640px [1,56,8400]. 17 COCO keypoints. - Runs once on full image after person detection. - Anatomical keypoint scoring: weighted per-keypoint sum (head 0.38, upper 0.32, lower 0.30). - 1. Head keypoints visible → never suppress, always refine box. - 2. Score >= 0.15 → keep + refine. Score > 0 → keep as-is. Score == 0 + large + low-conf → suppress. - 3. Box refinement: blend detected box with tight keypoint bbox for better fit. - Face detector (optional): if face_session loaded, face inside box → never suppress. - -Vehicle + person models run on every image when hint='both'. All detections merged. -Vehicle eval uses cls_id 1-3. Person eval uses cls_id 0 only. +SN44 number plate detection miner — single-element chute for +manak0/Detect-number-plates-1-0. + +Adapted from the auto-generated detect-person-reference miner with four +substantive changes: + +1. Class set is the single class ``numberplate`` (the validator's exact + label string). +2. Lower confidence threshold (0.15 vs 0.25) because the validator's + plates are tiny — 5–92 px wide on a 1408 px frame, median ~30 px. + At standard 0.25 most true positives get filtered before NMS. +3. Standard NMS replaced with Gaussian Soft-NMS (sigma=0.5). Soft-NMS + decays scores of overlapping boxes instead of suppressing them + outright, which helps on plate-dense frames (parking lot, car + carrier, gas station forecourt) where standard NMS over-suppresses + adjacent plates. +4. CUDA library preload at import time so onnxruntime-gpu finds + libcudnn / libcublas from the nvidia-* pip wheels even when + LD_LIBRARY_PATH is not set (the chute container ships these wheels + but does not export them). + +Soft-NMS is inlined here rather than imported from /home/miner/utils +because the chute platform sandbox restricts non-stdlib imports beyond +the deps declared in chute_config.yml. The implementation is a +specialised single-class version of soft_nms_yolo from +/home/miner/utils/soft_nms.py — see that file for the full +multi-class / multi-backend version. """ - -import os import ctypes import glob as _glob import logging as _logging +import os _cuda_log = _logging.getLogger(__name__) -def _preload_cuda_libs(): - """Pre-load CUDA + TensorRT libs from pip packages so ORT GPU/TRT providers work. - Search order for TRT libs (libnvinfer.so, libnvonnxparser.so): - 1. sys.path entries containing tensorrt_libs/ subdirectory - 2. site.getsitepackages() + user site-packages for tensorrt_libs/ or tensorrt/ - 3. ctypes.util.find_library('nvinfer') as system-wide fallback - If not found, logs clearly and skips TRT — never attempts pip operations. +def _preload_cuda_libs() -> None: + """Pre-load CUDA + cuDNN + cuBLAS shared libs from nvidia-* pip wheels. + + Without this, onnxruntime-gpu's CUDAExecutionProvider silently falls + back to CPU because it can't dlopen libcudnn.so.9 — the nvidia + wheels ship the library inside `nvidia/cudnn/lib/` but do NOT add + that directory to the loader path. We import the wheel modules to + locate their lib dirs, prepend them to LD_LIBRARY_PATH for any + child processes, and ctypes.CDLL the .so files with RTLD_GLOBAL so + onnxruntime's dlopen sees them. """ try: - import ctypes.util as _ctypes_util - lib_dirs = [] - loaded = set() - - # ── CUDA libs from nvidia pip packages ── - for mod_name in ['nvidia.cudnn', 'nvidia.cublas', 'nvidia.cuda_runtime', - 'nvidia.cufft', 'nvidia.curand', 'nvidia.cusolver', - 'nvidia.cusparse', 'nvidia.nvjitlink']: + lib_dirs: list[str] = [] + for mod_name in ( + "nvidia.cudnn", + "nvidia.cublas", + "nvidia.cuda_runtime", + "nvidia.cufft", + "nvidia.curand", + "nvidia.cusolver", + "nvidia.cusparse", + "nvidia.nvjitlink", + ): try: - mod = __import__(mod_name, fromlist=['__file__']) - lib_dir = os.path.join(os.path.dirname(mod.__file__), 'lib') + mod = __import__(mod_name, fromlist=["__file__"]) + lib_dir = os.path.join(os.path.dirname(mod.__file__), "lib") if os.path.isdir(lib_dir) and lib_dir not in lib_dirs: lib_dirs.append(lib_dir) except ImportError: pass - # ── TensorRT libs — multi-strategy search ── - import sys as _sys - _trt_dir = None - - # Strategy 1: sys.path (covers standard pip installs) - for p in _sys.path: - for subdir in ('tensorrt_libs', 'tensorrt'): - candidate = os.path.join(p, subdir) - if os.path.isdir(candidate) and _glob.glob(os.path.join(candidate, 'libnvinfer*')): - _trt_dir = candidate - break - if _trt_dir: - break - - # Strategy 2: site-packages directories (covers user installs, venvs) - if not _trt_dir: - import site - search_dirs = list(site.getsitepackages()) if hasattr(site, 'getsitepackages') else [] - user_site = getattr(site, 'getusersitepackages', lambda: None)() - if user_site: - search_dirs.append(user_site) - # Also check common paths not always in site - search_dirs.extend([ - '/usr/local/lib/python3.12/dist-packages', - os.path.expanduser('~/.local/lib/python3.12/site-packages'), - '/home/miner/.local/lib/python3.12/site-packages', - ]) - for sp in search_dirs: - for subdir in ('tensorrt_libs', 'tensorrt'): - candidate = os.path.join(sp, subdir) - if os.path.isdir(candidate) and _glob.glob(os.path.join(candidate, 'libnvinfer*')): - _trt_dir = candidate - break - if _trt_dir: - break - - # Strategy 3: ctypes.util.find_library (system-wide LD search) - if not _trt_dir: - nvinfer_path = _ctypes_util.find_library('nvinfer') - if nvinfer_path: - _cuda_log.info('TRT found via system library: %s', nvinfer_path) - try: - ctypes.CDLL(nvinfer_path, mode=ctypes.RTLD_GLOBAL) - loaded.add('nvinfer') - except OSError as e: - _cuda_log.warning('Failed to load system nvinfer: %s', e) - - if _trt_dir: - if _trt_dir not in lib_dirs: - lib_dirs.append(_trt_dir) - _cuda_log.info('TRT libs directory: %s', _trt_dir) - elif 'nvinfer' not in loaded: - _cuda_log.info('TensorRT libs not found — TRT EP will be unavailable (CUDA EP still works)') - - if not lib_dirs and not loaded: - _cuda_log.warning('No CUDA or TRT libs found to preload') + if not lib_dirs: + _cuda_log.warning("no nvidia-* lib dirs found; ORT GPU may fall back to CPU") return - # Set LD_LIBRARY_PATH for any child processes / dlopen fallbacks - existing = os.environ.get('LD_LIBRARY_PATH', '') - os.environ['LD_LIBRARY_PATH'] = ':'.join(lib_dirs + ([existing] if existing else [])) + # Update LD_LIBRARY_PATH for any child processes / dlopen fallbacks + existing = os.environ.get("LD_LIBRARY_PATH", "") + os.environ["LD_LIBRARY_PATH"] = ":".join( + lib_dirs + ([existing] if existing else []) + ) - # Load CUDA libs (glob all .so in nvidia dirs) + # ctypes.CDLL each .so so the symbols are globally visible to ORT for lib_dir in lib_dirs: - if 'tensorrt' in lib_dir: - continue # TRT libs loaded selectively below - for so in sorted(_glob.glob(os.path.join(lib_dir, 'lib*.so*'))): + for so in sorted(_glob.glob(os.path.join(lib_dir, "lib*.so*"))): try: ctypes.CDLL(so, mode=ctypes.RTLD_GLOBAL) except OSError: pass + except Exception as e: # pragma: no cover - best effort + _cuda_log.warning("CUDA preload failed: %s", e) - # Load TRT libs selectively (only the essentials, not builder resources) - if _trt_dir: - for lib_name in ['libnvinfer.so', 'libnvinfer_plugin.so', 'libnvonnxparser.so']: - matches = _glob.glob(os.path.join(_trt_dir, lib_name + '*')) - if matches: - try: - ctypes.CDLL(matches[0], mode=ctypes.RTLD_GLOBAL) - loaded.add(lib_name.split('.')[0]) - except OSError as e: - _cuda_log.warning('Failed to load %s: %s', lib_name, e) - else: - _cuda_log.info('%s not found in %s', lib_name, _trt_dir) - - if loaded: - _cuda_log.info('Preloaded libs: %s', ', '.join(sorted(loaded))) - except Exception as e: - _cuda_log.warning('CUDA/TRT preload error: %s', e) _preload_cuda_libs() - from pathlib import Path import math -import time -import logging import cv2 import numpy as np @@ -169,371 +99,6 @@ import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel -import json -import threading -from datetime import datetime, timezone -from concurrent.futures import ThreadPoolExecutor, as_completed -import inspect - -# ── Latency logger (per-request timing) ───────────────────────────────── -import logging as _lat_logging -_lat_logger = _lat_logging.getLogger("sv_latency") -_lat_logger.setLevel(_lat_logging.INFO) -_lat_logger.propagate = False -if not _lat_logger.handlers: - try: - import tempfile as _lat_tempfile - # Try /home/miner first (Lium), fall back to /tmp (Chutes cloud) - for _lat_path in ["/home/miner/latency.log", _lat_tempfile.gettempdir() + "/latency.log"]: - try: - _lat_fh = _lat_logging.FileHandler(_lat_path) - _lat_fh.setFormatter(_lat_logging.Formatter( - "%(asctime)s.%(msecs)03d %(message)s", datefmt="%Y-%m-%d %H:%M:%S")) - _lat_logger.addHandler(_lat_fh) - break - except (OSError, PermissionError): - continue - except Exception: - pass # No file logging — latency still logged via main logger - -logger = logging.getLogger(__name__) - -# ── Vehicle config ────────────────────────────────────────────────────────── -VEH_MODEL_TO_OUT: dict[int, int] = {0: 1, 1: 0, 2: 2, 3: 3} # bus→0 (validator expects bus at idx 0) -VEH_SKIP_CLS = set() # v3.20: bus now scored (cls_id=0). Element detection prevents collision. -VEH_NUM_CLASSES = 4 -VEH_CONF_THRES = 0.30 # Low decode threshold for TTA (final filter is per-class) -VEH_TTA_CONF = 0.20 # TTA flip pass decode threshold -VEH_NMS_IOU = 0.50 - -# ── Per-class vehicle confidence thresholds (output cls_id) ──────────────── -# Raising from uniform 0.35: reduces FP (avg 4.1 FFPI → target <2.0) -VEH_CLASS_CONF: dict[int, float] = { - 1: 0.60, # car — raised from 0.50, most FP-prone class (75% of training data) - 2: 0.45, # truck — keep - 3: 0.50, # motorcycle — raised from 0.45, small targets prone to FP - 0: 0.45, # bus — keep -} - -# ── Per-class vehicle aspect ratio bounds (min_ratio, max_ratio) ─────────── -# ratio = max(w,h) / min(w,h). Generous bounds to avoid suppressing valid detections. -VEH_CLASS_ASPECT: dict[int, float] = { - 1: 5.0, # car — rarely > 5:1 from any angle - 2: 6.0, # truck — can be elongated - 3: 4.5, # motorcycle — compact, rarely very elongated - 0: 8.0, # bus — elongated body -} - -# ── Per-class minimum area (pixels) ─────────────────────────────────────── -VEH_CLASS_MIN_AREA: dict[int, int] = { - 1: 196, # car — 14x14 min - 2: 256, # truck — 16x16 min (should be at least medium-sized) - 3: 100, # motorcycle — 10x10 min (can be very small in distance) - 0: 400, # bus — 20x20 min -} - -# ── Vehicle box sanity filters (global fallbacks) ───────────────────────── -VEH_MIN_WH = 20 # was 8. Kills tiny horizon artifacts (confirmed: h<25 extras on block 7900800) -VEH_MIN_AREA = 100 -VEH_MAX_ASPECT = 8.0 -VEH_MAX_AREA_RATIO = 0.95 -VEH_MAX_DET = 40 - -# ── Vehicle parts confirmation config ──────────────────────────────────── -# Cross-validates vehicle detections using person detections, OpenCV analysis, -# and optional license plate detector. Small/distant vehicles exempt. -VEH_PARTS_ENABLED = True # Master switch for parts confirmation -VEH_PARTS_SMALL_AREA = 0.004 # Below this area ratio: exempt from suppression -VEH_PARTS_FP_CONF = 0.50 # Below this conf + large + unconfirmed → suppress -VEH_PARTS_FP_CONF_STRICT = 0.55 # Stricter threshold when plate model loaded but no plate -VEH_PARTS_FP_AREA = 0.03 # Above this area ratio → eligible for FP suppression -# Confidence boosts for confirmed parts (additive) -VEH_PARTS_BOOST_DRIVER = 0.08 # Person in driver/passenger region -VEH_PARTS_BOOST_RIDER = 0.10 # Person on motorcycle (overlap + optional lean) -VEH_PARTS_BOOST_HL = 0.05 # Headlight pair detected -VEH_PARTS_BOOST_PLATE = 0.12 # License plate detected (Phase 2) -VEH_PARTS_BOOST_WINDOW = 0.06 # Bus window pattern on truck -# Headlight detection thresholds -VEH_PARTS_HL_MIN_PX = 60 # Min vehicle width (px) for headlight check -VEH_PARTS_HL_BRIGHT = 200 # Grayscale threshold for bright spots -VEH_PARTS_HL_MIN_BLOB = 15 # Min contour area for headlight candidate -# Window pattern detection (bus/coach) -VEH_PARTS_WINDOW_MIN_PX = 100 # Min vehicle width for window pattern check -VEH_PARTS_WINDOW_MIN_PEAKS = 3 # Min periodic edge peaks for window confirmation -# Motorcycle rider pose -VEH_PARTS_RIDER_LEAN_DEG = 15.0 # Min torso lean from vertical (degrees) for rider pose -# Plate detection thresholds -VEH_PARTS_PLATE_MIN_PX = 80 # plates visible at ~80px vehicle width (was 120) -VEH_PARTS_PLATE_CONF = 0.35 # Min plate detection confidence - -# ── Person config (TTA consensus) ─────────────────────────────────────────── -PER_CONF_LOW = 0.60 # Was 0.55. Raised 2026-04-05 to match top peer precision floor after - # observing the 3-way tied 52-box group (conf_min=0.585, composite=0.280) was - # beaten by top peer's 44-box response (conf_min=0.716, composite=0.377). - # 0.60 targets the precision/recall inflection point without the full 0.65+ - # aggression that might cost recall on sparse scenes. -PER_CONF_HIGH = 0.58 # NOTE: dead code, not referenced anywhere. Kept for reference only. -PER_CONSENSUS_IOU = 0.50 -PER_RTF_BUDGET = 8.0 - -# ── Person box sanity filters ────────────────────────────────────────────── -PER_MIN_WH = 8 -PER_MIN_AREA = 14 * 14 -PER_MAX_ASPECT = 6.0 -PER_MAX_AREA_RATIO = 0.80 - -# ── Person tiling config (SAHI-inspired) ──────────────────────────────────── -PER_TILE_OVERLAP = 0.20 # 20% overlap between tiles -PER_TILE_MIN_DIM_RATIO = 1.15 # tile when image dim > model_dim * this (~1104px for 960 model) -PER_TILE_CONF = 0.55 # raised from 0.40 to match PER_CONF_LOW -PER_NMS_IOU = 0.35 # NMS IoU for merging across passes — tightened to reduce FP duplicates -PER_MAX_DET = 100 # Loose safety ceiling ONLY — not a count cap. Strategy is confidence-floor: - # PER_CONF_LOW=0.60 is the real filter; any box above threshold passes. - # Raised from 50 after 2026-04-05 investigation: top peers emit 77+ boxes on - # crowd eval images, and the currently-running chute (rev 6b9d0d6) caps at 30 - # which is demonstrably hitting mAP50 0.39 on person crowd blocks. 50 would - # still clip. 100 gives real headroom — only triggers on pathological runaway - # FP cases where NMS has already failed. Previous values (10 spec'd, 50 first - # fix) were too tight. See FAILURE_ANALYSIS.md (2026-04-05). - -# ── TTA consensus thresholds (DMSC19-inspired graduated approach) ──────────── -# Cross-view confirmation eliminates the soft-NMS confidence decay bug. -# Instead of concatenate+soft-NMS (which decayed confs below floor), we match -# boxes across original+flip views and apply graduated confidence thresholds. -PER_TTA_MATCH_IOU = 0.50 # IoU threshold for cross-view box matching -PER_TTA_CONF_BOTH = 0.50 # Confirmed by both views: lower threshold (high confidence) -PER_TTA_CONF_ORIG = 0.60 # Original-only: standard threshold (PER_CONF_LOW) -PER_TTA_CONF_FLIP = 0.75 # Flip-only: strict (flip-only detections are likely FP) - -# ── Frame quality gating (Laplacian variance) ─────────────────────────────── -PER_BLUR_THRESHOLD = 50.0 # Laplacian variance below this = severely blurry -PER_BLUR_CONF_PENALTY = 0.85 # multiply confs by this for blurry frames (reduce FP) - -# ── Adaptive CLAHE config ─────────────────────────────────────────────────── -PER_CLAHE_CLIP = 2.0 # mild CLAHE (was 12.0, too aggressive) -PER_CLAHE_CONTRAST_THRESH = 40.0 # only apply CLAHE when L-channel std < this - -# ── Perspective scaling confidence penalty ───────────────────────────────── -PERSP_DEVIATION_THRESH = 3.0 # ratio >3x or <1/3x triggers penalty -PERSP_CONF_PENALTY = 0.85 # multiply conf by this for perspective violations -PERSP_MIN_DETECTIONS = 3 # need ≥3 detections to estimate model -PERSP_MIN_Y_SPREAD = 0.15 # min y-spread as fraction of image height - -# ── Pose FP filter + box refinement config ───────────────────���────────────── -POSE_CONF_THRESH = 0.25 # Minimum confidence for pose detection -POSE_NMS_IOU = 0.65 # NMS IoU threshold for pose detections -POSE_MATCH_IOU = 0.30 # IoU threshold to match pose to person box -POSE_KP_CONF = 0.3 # Keypoint visibility threshold -POSE_FP_MAX_CONF = 0.65 # Max conf below which unmatched large boxes are suppressed -POSE_FP_MIN_AREA = 0.04 # Min area ratio (of image) for FP suppression to apply -POSE_REFINE_BLEND = 0.25 # Blend factor for keypoint box refinement (0=original, 1=keypoint) -POSE_KP_PAD = 0.10 # Padding around keypoint tight bbox - -# ── Anatomical keypoint scoring ───────────────────────────────────────────── -# COCO keypoints: 0=nose 1=l_eye 2=r_eye 3=l_ear 4=r_ear -# 5=l_shoulder 6=r_shoulder 7=l_elbow 8=r_elbow 9=l_wrist 10=r_wrist -# 11=l_hip 12=r_hip 13=l_knee 14=r_knee 15=l_ankle 16=r_ankle -POSE_HEAD_KP = [0, 1, 2, 3, 4] # nose + eyes + ears -POSE_UPPER_KP = [5, 6, 7, 8, 9, 10] # shoulders + elbows + wrists -POSE_LOWER_KP = [11, 12, 13, 14, 15, 16] # hips + knees + ankles -# Per-keypoint weights (head > upper > lower). Sum of all = 1.0. -POSE_KP_WEIGHTS = np.array([ - 0.12, # 0 nose — strongest single indicator - 0.08, # 1 left_eye - 0.08, # 2 right_eye - 0.05, # 3 left_ear - 0.05, # 4 right_ear - 0.07, # 5 left_shoulder - 0.07, # 6 right_shoulder - 0.05, # 7 left_elbow - 0.05, # 8 right_elbow - 0.04, # 9 left_wrist - 0.04, # 10 right_wrist - 0.05, # 11 left_hip - 0.05, # 12 right_hip - 0.04, # 13 left_knee - 0.04, # 14 right_knee - 0.03, # 15 left_ankle - 0.04, # 16 right_ankle -], dtype=np.float32) # sums to 1.0 -POSE_ANAT_REFINE_THRESH = 0.15 # Score above which we refine box with keypoints -POSE_ANAT_SUPPRESS_THRESH = 0.0 # Score at or below which suppression is considered - -# ── TensorRT engine cache config ──────────────────────────────────────────── -TRT_CACHE_PATH = "/tmp/trt_engine_cache" -TRT_FP16 = True -TRT_WORKSPACE_GB = 4 - -# ── Shared ────────────────────────────────────────────────────────────────── -WBF_SKIP_THR = 0.0001 - -# ── Speed config ──────────────────────────────────────────────────────────── -ENABLE_TTA = True -ENABLE_PARALLEL = True - -# ── Secondary HF repo for vehicle weights ─────────────────────────────────── -VEHICLE_HF_REPO = "meaculpitt/ScoreVision-Vehicle" - - - -def _wbf_multi(boxes_list, scores_list, labels_list, iou_thr=0.55, skip_thr=0.0001): - """Weighted Boxes Fusion (multi-class). Boxes in [0,1] normalized coords.""" - if not boxes_list: - return np.empty((0, 4)), np.empty(0), np.empty(0) - - all_b, all_s, all_l = [], [], [] - for bx, sc, lb in zip(boxes_list, scores_list, labels_list): - for i in range(len(bx)): - if sc[i] < skip_thr: - continue - all_b.append(bx[i]) - all_s.append(sc[i]) - all_l.append(int(lb[i])) - - if not all_b: - return np.empty((0, 4)), np.empty(0), np.empty(0) - - all_b = np.array(all_b) - all_s = np.array(all_s) - all_l = np.array(all_l, dtype=int) - - fused_b, fused_s, fused_l = [], [], [] - for cls in np.unique(all_l): - m = all_l == cls - cb, cs = all_b[m], all_s[m] - order = cs.argsort()[::-1] - cb, cs = cb[order], cs[order] - - clusters, cboxes = [], [] - for i in range(len(cb)): - matched, best_iou = -1, iou_thr - for ci, cbox in enumerate(cboxes): - xx1 = max(cb[i, 0], cbox[0]) - yy1 = max(cb[i, 1], cbox[1]) - xx2 = min(cb[i, 2], cbox[2]) - yy2 = min(cb[i, 3], cbox[3]) - inter = max(0, xx2 - xx1) * max(0, yy2 - yy1) - a1 = (cb[i, 2] - cb[i, 0]) * (cb[i, 3] - cb[i, 1]) - a2 = (cbox[2] - cbox[0]) * (cbox[3] - cbox[1]) - iou = inter / (a1 + a2 - inter + 1e-9) - if iou > best_iou: - best_iou = iou - matched = ci - if matched >= 0: - clusters[matched].append(i) - idxs = clusters[matched] - w = cs[idxs] - cboxes[matched] = (cb[idxs] * w[:, None]).sum(0) / w.sum() - else: - clusters.append([i]) - cboxes.append(cb[i].copy()) - - for ci, idxs in enumerate(clusters): - fused_b.append(cboxes[ci]) - fused_s.append(cs[idxs].mean()) - fused_l.append(cls) - - if not fused_b: - return np.empty((0, 4)), np.empty(0), np.empty(0) - return np.array(fused_b), np.array(fused_s), np.array(fused_l) - - -def _wbf_single(boxes_list, scores_list, iou_thr=0.45, skip_thr=0.0001): - """Weighted Boxes Fusion (single-class). Boxes in [0,1] normalized coords.""" - if not boxes_list: - return np.empty((0, 4)), np.empty(0) - - all_b, all_s = [], [] - for bx, sc in zip(boxes_list, scores_list): - for i in range(len(bx)): - if sc[i] < skip_thr: - continue - all_b.append(bx[i]) - all_s.append(sc[i]) - - if not all_b: - return np.empty((0, 4)), np.empty(0) - - all_b = np.array(all_b) - all_s = np.array(all_s) - order = all_s.argsort()[::-1] - all_b, all_s = all_b[order], all_s[order] - - clusters, cboxes = [], [] - for i in range(len(all_b)): - matched, best_iou = -1, iou_thr - for ci, cbox in enumerate(cboxes): - xx1 = max(all_b[i, 0], cbox[0]) - yy1 = max(all_b[i, 1], cbox[1]) - xx2 = min(all_b[i, 2], cbox[2]) - yy2 = min(all_b[i, 3], cbox[3]) - inter = max(0, xx2 - xx1) * max(0, yy2 - yy1) - a1 = (all_b[i, 2] - all_b[i, 0]) * (all_b[i, 3] - all_b[i, 1]) - a2 = (cbox[2] - cbox[0]) * (cbox[3] - cbox[1]) - iou = inter / (a1 + a2 - inter + 1e-9) - if iou > best_iou: - best_iou = iou - matched = ci - if matched >= 0: - clusters[matched].append(i) - idxs = clusters[matched] - w = all_s[idxs] - cboxes[matched] = (all_b[idxs] * w[:, None]).sum(0) / w.sum() - else: - clusters.append([i]) - cboxes.append(all_b[i].copy()) - - fused_b, fused_s = [], [] - for ci, idxs in enumerate(clusters): - fused_b.append(cboxes[ci]) - fused_s.append(all_s[idxs].mean()) - - if not fused_b: - return np.empty((0, 4)), np.empty(0) - return np.array(fused_b), np.array(fused_s) - - -def _nms_per_class_boost(boxes, scores, labels, iou_thr=0.50): - """Per-class hard NMS with max-score cluster boosting. - Surviving box keeps its coordinates but gets the max confidence - among all boxes in its overlap cluster.""" - if len(boxes) == 0: - return np.empty((0, 4)), np.empty(0), np.empty(0, dtype=int) - - out_b, out_s, out_l = [], [], [] - for cls in np.unique(labels): - m = labels == cls - cb, cs = boxes[m], scores[m] - order = cs.argsort()[::-1] - cb, cs = cb[order], cs[order] - - suppressed = set() - for i in range(len(cb)): - if i in suppressed: - continue - max_score = float(cs[i]) - for j in range(i + 1, len(cb)): - if j in suppressed: - continue - xx1 = max(cb[i, 0], cb[j, 0]) - yy1 = max(cb[i, 1], cb[j, 1]) - xx2 = min(cb[i, 2], cb[j, 2]) - yy2 = min(cb[i, 3], cb[j, 3]) - inter = max(0, xx2 - xx1) * max(0, yy2 - yy1) - a1 = (cb[i, 2] - cb[i, 0]) * (cb[i, 3] - cb[i, 1]) - a2 = (cb[j, 2] - cb[j, 0]) * (cb[j, 3] - cb[j, 1]) - iou = inter / (a1 + a2 - inter + 1e-9) - if iou >= iou_thr: - max_score = max(max_score, float(cs[j])) - suppressed.add(j) - out_b.append(cb[i]) - out_s.append(max_score) - out_l.append(cls) - - if not out_b: - return np.empty((0, 4)), np.empty(0), np.empty(0, dtype=int) - return np.array(out_b), np.array(out_s), np.array(out_l, dtype=int) - class BoundingBox(BaseModel): x1: int @@ -551,1658 +116,237 @@ class TVFrameResult(BaseModel): class Miner: - def __init__(self, path_hf_repo: Path) -> None: - self.path_hf_repo = path_hf_repo - - # Vehicle model — download from secondary HF repo with safety guard - t0 = time.monotonic() - veh_path = None # Path to secondary repo snapshot (also used for plate model) - try: - from huggingface_hub import snapshot_download as _sd - veh_path = Path(_sd(VEHICLE_HF_REPO)) - veh_weights = str(veh_path / "vehicle_weights.onnx") - logger.info(f"[init] Vehicle weights from {VEHICLE_HF_REPO} in {time.monotonic()-t0:.1f}s") - except Exception as e: - # Fallback: try loading from primary repo (backward compat) - logger.warning(f"[init] Vehicle secondary repo failed ({e}), trying primary repo") - veh_weights = str(path_hf_repo / "vehicle_weights.onnx") - if not Path(veh_weights).exists(): - raise FileNotFoundError(f"vehicle_weights.onnx not found in primary or secondary repo") from e - - self.veh_session = ort.InferenceSession( - veh_weights, - providers=["CUDAExecutionProvider", "CPUExecutionProvider"], - ) - veh_actual = self.veh_session.get_providers() - logger.warning(f"[init] Vehicle session ACTIVE providers: {veh_actual}") - if "CUDAExecutionProvider" not in veh_actual: - logger.error("[init] ⚠ VEHICLE IS ON CPU — CUDA EP NOT ACTIVE") - self.veh_input_name = self.veh_session.get_inputs()[0].name - veh_shape = self.veh_session.get_inputs()[0].shape - self.veh_h = int(veh_shape[2]) - self.veh_w = int(veh_shape[3]) - - # FP32 fallback — lazy-loaded on first trigger to save ~300MB VRAM at startup - self.veh_session_fp32 = None - self._veh_fp32_path = None - try: - veh_fp32 = str(veh_path / "vehicle_weights_fp32.onnx") if veh_path else None - if veh_fp32 and Path(veh_fp32).exists(): - self._veh_fp32_path = veh_fp32 - logger.info("[init] Vehicle FP32 fallback available (lazy-load)") - else: - logger.info("[init] Vehicle FP32 fallback not available") - except Exception as e: - logger.warning(f"[init] Vehicle FP32 fallback path check failed: {e}") + """ + Single-element ONNX miner for the manak0/Detect-number-plates-1-0 + element. Auto-loaded by the chute platform; the platform passes the + snapshot path of the HF repo containing weights.onnx as + ``path_hf_repo`` and calls ``predict_batch(batch_images, offset, + n_keypoints)`` for each request. + """ - # Person model — CUDA immediately, TRT engine builds in background - per_onnx = str(path_hf_repo / "person_weights.onnx") - self.per_session = ort.InferenceSession( - per_onnx, + def __init__(self, path_hf_repo) -> None: + self.path_hf_repo = Path(path_hf_repo) + self.class_names = ['numberplate'] + self.session = ort.InferenceSession( + str(self.path_hf_repo / "numberplate_weights.onnx"), providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) - self.per_input_name = self.per_session.get_inputs()[0].name - per_shape = self.per_session.get_inputs()[0].shape - self.per_h = int(per_shape[2]) - self.per_w = int(per_shape[3]) - self._trt_ready = False - logger.info("[init] Person model: CUDA (TRT build starting in background)") - - # Launch background TRT engine build - os.makedirs(TRT_CACHE_PATH, exist_ok=True) - threading.Thread( - target=self._build_trt_engine, - args=(per_onnx,), - daemon=True, - name="trt-builder", - ).start() - - # Pose model — for FP filtering + box refinement - pose_path = path_hf_repo / "pose_weights.onnx" - if pose_path.exists(): - self.pose_session = ort.InferenceSession( - str(pose_path), - providers=["CUDAExecutionProvider", "CPUExecutionProvider"], - ) - self.pose_input_name = self.pose_session.get_inputs()[0].name - pose_shape = self.pose_session.get_inputs()[0].shape - self.pose_h = int(pose_shape[2]) - self.pose_w = int(pose_shape[3]) - logger.info(f"[init] Pose model loaded: {self.pose_h}x{self.pose_w}") - else: - self.pose_session = None - logger.info("[init] No pose model found, FP filter disabled") - - # Face detector (SCRFD-500M) — confirms person boxes, prevents FP suppression - face_path = path_hf_repo / "face_weights.onnx" - if face_path.exists(): - self.face_session = ort.InferenceSession( - str(face_path), - providers=["CUDAExecutionProvider", "CPUExecutionProvider"], - ) - self.face_input_name = self.face_session.get_inputs()[0].name - logger.info("[init] Face model (SCRFD-500M) loaded") - else: - self.face_session = None - logger.info("[init] No face model found") - - # License plate detector — loaded from secondary HF repo alongside vehicle weights - plate_path = veh_path / "plate_weights.onnx" if veh_path else None - if plate_path and plate_path.exists(): - self.plate_session = ort.InferenceSession( - str(plate_path), - providers=["CUDAExecutionProvider", "CPUExecutionProvider"], - ) - self.plate_input_name = self.plate_session.get_inputs()[0].name - plate_shape = self.plate_session.get_inputs()[0].shape - self.plate_h = int(plate_shape[2]) if isinstance(plate_shape[2], int) else 640 - self.plate_w = int(plate_shape[3]) if isinstance(plate_shape[3], int) else 640 - logger.info(f"[init] Plate model loaded: {self.plate_h}x{self.plate_w}") - else: - self.plate_session = None - logger.info("[init] No plate model found, plate confirmation disabled") - - - # Pose cache — populated by _pose_filter_refine, read by vehicle parts - self._cached_pose_data = None - - # Thread pool for parallel inference - self._executor = ThreadPoolExecutor(max_workers=2) - - # Log provider info - veh_prov = self.veh_session.get_providers() - per_prov = self.per_session.get_providers() - logger.info(f"Vehicle ORT providers: {veh_prov}") - logger.info(f"Person ORT providers: {per_prov} (TRT building in background)") - logger.info(f"TTA={ENABLE_TTA} PARALLEL={ENABLE_PARALLEL}") - - def _build_trt_engine(self, per_onnx): - """Build TRT FP16 engine in background, swap person session when ready. - - On fresh nodes: ~18 min to compile. Cached engine loads in <1s. - During build, inference uses CUDAExecutionProvider (passes RTF at ~78ms). - After build, atomically swaps to TRT session (~29ms pipeline). - """ - try: - trt_opts = { - "trt_fp16_enable": str(TRT_FP16).lower(), - "trt_max_workspace_size": str(TRT_WORKSPACE_GB << 30), - "trt_engine_cache_enable": "true", - "trt_engine_cache_path": TRT_CACHE_PATH, - } - t0 = time.monotonic() - logger.info("[trt-build] Creating TRT session (may take ~18min on fresh node)...") - trt_session = ort.InferenceSession( - per_onnx, - providers=[ - ("TensorrtExecutionProvider", trt_opts), - "CUDAExecutionProvider", - "CPUExecutionProvider", - ], - ) - - provs = trt_session.get_providers() - if "TensorrtExecutionProvider" not in provs: - logger.warning("[trt-build] TRT provider not active (%s), keeping CUDA", provs) - return - - # Run dummy inference to fully materialize the engine - inp_name = trt_session.get_inputs()[0].name - inp_shape = trt_session.get_inputs()[0].shape - dummy = np.zeros((1, 3, int(inp_shape[2]), int(inp_shape[3])), dtype=np.float32) - trt_session.run(None, {inp_name: dummy}) - - dt = time.monotonic() - t0 - logger.info("[trt-build] TRT engine ready in %.1fs — swapping person session", dt) - - # Atomic swap — Python GIL makes single attribute assignment safe. - # Any in-flight inference holds a reference to the old session, which - # stays alive until that inference completes. - self.per_session = trt_session - self._trt_ready = True - - logger.info("[trt-build] Person model now using TensorRT FP16") - except Exception as e: - logger.warning("[trt-build] TRT build failed (%s), keeping CUDA", e) + self.input_name = self.session.get_inputs()[0].name + input_shape = self.session.get_inputs()[0].shape + # expected [N, C, H, W]; dynamic-export ONNX has string placeholders + # for spatial dims. We always run inference at 1408 (the validator's + # native frame width); the ONNX accepts variable shapes via dynamic + # axes, and inference at 1408 gives substantially better small-plate + # recall than the model's training resolution (verified on the 7 + # starter assets: 43% recall at 960 vs 60% at 1408). + def _maybe_int(d, default): + try: + return int(d) + except (TypeError, ValueError): + return default + # Hard-pin to the validator's native 1408x768 (rectangular). This + # is half the pixel count of a 1408x1408 square pad and matches + # the validator's exact frame shape, eliminating wasted padding + # rows. yolo11s strides are 32, both 1408 (44*32) and 768 (24*32) + # are valid. + self.input_h = 768 + self.input_w = 1408 + # Record what the ONNX *declared*, for diagnostic logging only + self._onnx_declared_h = _maybe_int(input_shape[2], None) + self._onnx_declared_w = _maybe_int(input_shape[3], None) + + # Pre-NMS confidence threshold. The reference uses 0.25; we lower + # slightly because validator plates are tiny but not as far as 0.15 + # which produces too many decayed-score ghost detections at 1408 + # input resolution (verified on starter assets: F1 dropped from + # 0.625 to 0.462 at conf=0.15). + self.conf_threshold = 0.25 + # Soft-NMS hyperparameters (Gaussian variant). + self.soft_nms_sigma = 0.5 + # Final score floor after Soft-NMS decay. At higher input resolution + # the model produces more medium-confidence detections that survive + # decay; we keep this stricter so they don't pollute the output. + self.score_threshold = 0.20 def __repr__(self) -> str: - trt_status = "TRT" if self._trt_ready else "CUDA (TRT building)" - return f"Unified Miner v3.16 — person={trt_status}, background TRT engine build" - - # ── Vehicle preprocessing (letterbox) ─────────────────────────��───────── - - def _veh_letterbox(self, img): - h, w = img.shape[:2] - r = min(self.veh_h / h, self.veh_w / w) - nw, nh = int(round(w * r)), int(round(h * r)) - img_r = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR) - dw, dh = self.veh_w - nw, self.veh_h - nh - pl, pt = dw // 2, dh // 2 - img_p = cv2.copyMakeBorder( - img_r, pt, dh - pt, pl, dw - pl, - cv2.BORDER_CONSTANT, value=(114, 114, 114), - ) - return img_p, r, pl, pt - - def _veh_preprocess(self, image_bgr): - img_p, ratio, pl, pt = self._veh_letterbox(image_bgr) - rgb = cv2.cvtColor(img_p, cv2.COLOR_BGR2RGB) - inp = rgb.astype(np.float32) / 255.0 - inp = np.ascontiguousarray(inp.transpose(2, 0, 1)[np.newaxis]) - return inp, ratio, pl, pt - - def _veh_decode(self, raw, ratio, pl, pt, ow, oh, conf_thresh): - pred = raw[0] - if pred.shape[0] < pred.shape[1]: - pred = pred.T - cls_scores = pred[:, 4:] - cls_ids = np.argmax(cls_scores, axis=1) - confs = np.max(cls_scores, axis=1) - mask = confs >= conf_thresh - if not mask.any(): - return np.empty((0, 4)), np.empty(0), np.empty(0, dtype=int) - bx, confs, cls_ids = pred[mask, :4], confs[mask], cls_ids[mask] - cx, cy, bw, bh = bx[:, 0], bx[:, 1], bx[:, 2], bx[:, 3] - x1 = np.clip((cx - bw / 2 - pl) / ratio, 0, ow) - y1 = np.clip((cy - bh / 2 - pt) / ratio, 0, oh) - x2 = np.clip((cx + bw / 2 - pl) / ratio, 0, ow) - y2 = np.clip((cy + bh / 2 - pt) / ratio, 0, oh) - return np.stack([x1, y1, x2, y2], axis=1), confs, cls_ids - - def _veh_run_pass(self, image_bgr, conf_thresh, session=None): - if session is None: - session = self.veh_session - oh, ow = image_bgr.shape[:2] - inp, ratio, pl, pt = self._veh_preprocess(image_bgr) - raw = session.run(None, {self.veh_input_name: inp})[0] - return self._veh_decode(raw, ratio, pl, pt, ow, oh, conf_thresh) - - def _infer_vehicle_core(self, image_bgr, session=None): - """Core vehicle detection pipeline. session param allows FP32 fallback.""" - oh, ow = image_bgr.shape[:2] - - # Primary pass - boxes, confs, cls_ids = self._veh_run_pass(image_bgr, VEH_CONF_THRES, session) - - # Flip TTA pass — horizontal flip, mirror boxes back - if ENABLE_TTA: - flipped = cv2.flip(image_bgr, 1) - f_boxes, f_confs, f_cls = self._veh_run_pass(flipped, VEH_TTA_CONF, session) - if len(f_boxes) > 0: - # Mirror x-coords: x1'=ow-x2, x2'=ow-x1 - f_boxes[:, 0], f_boxes[:, 2] = ow - f_boxes[:, 2], ow - f_boxes[:, 0] - if len(boxes) > 0: - boxes = np.concatenate([boxes, f_boxes]) - confs = np.concatenate([confs, f_confs]) - cls_ids = np.concatenate([cls_ids, f_cls]) - else: - boxes, confs, cls_ids = f_boxes, f_confs, f_cls - - if len(boxes) == 0: - return [] - - # Remap model classes to output classes - out_cls = np.array([VEH_MODEL_TO_OUT[int(c)] for c in cls_ids]) - - # Per-class hard NMS with max-score cluster boosting - boxes, confs, out_cls = _nms_per_class_boost( - boxes, confs, out_cls, iou_thr=VEH_NMS_IOU) - - if len(boxes) == 0: - return [] - - # Per-class confidence filter + aspect ratio filter + bus suppression - img_area = float(oh * ow) - sane = [] - for i in range(len(boxes)): - cls = int(out_cls[i]) - - # Skip bus entirely (not scored by validator, just generates FP) - if cls in VEH_SKIP_CLS: - continue - - # Per-class confidence threshold - min_conf = VEH_CLASS_CONF.get(cls, VEH_CONF_THRES) - if confs[i] < min_conf: - continue - - bw = boxes[i, 2] - boxes[i, 0] - bh = boxes[i, 3] - boxes[i, 1] - - # Minimum dimension - if bw < VEH_MIN_WH or bh < VEH_MIN_WH: - continue - - area = bw * bh - - # Per-class minimum area - min_area = VEH_CLASS_MIN_AREA.get(cls, VEH_MIN_AREA) - if area < min_area: - continue - - # Per-class aspect ratio filter - aspect = max(bw, bh) / max(min(bw, bh), 1e-6) - max_aspect = VEH_CLASS_ASPECT.get(cls, VEH_MAX_ASPECT) - if aspect > max_aspect: - continue - - # Max area ratio (covers entire image — likely FP) - if area / img_area > VEH_MAX_AREA_RATIO: - continue - - sane.append(i) - - if not sane: - return [] - boxes, confs, out_cls = boxes[sane], confs[sane], out_cls[sane] - - # Limit max detections - if len(boxes) > VEH_MAX_DET: - top_k = np.argsort(confs)[::-1][:VEH_MAX_DET] - boxes, confs, out_cls = boxes[top_k], confs[top_k], out_cls[top_k] - - out = [] - for i in range(len(boxes)): - b = boxes[i] - out.append(BoundingBox( - x1=max(0, min(ow, math.floor(b[0]))), - y1=max(0, min(oh, math.floor(b[1]))), - x2=max(0, min(ow, math.ceil(b[2]))), - y2=max(0, min(oh, math.ceil(b[3]))), - cls_id=int(out_cls[i]), - conf=max(0.0, min(1.0, float(confs[i]))), - )) - return out - - def _infer_vehicle(self, image_bgr): - """Vehicle detection with FP32 fallback on catastrophic INT8 failure. - - Runs INT8 model first. If it returns 0 boxes (true catastrophic failure, - see block 7905900), retries with FP32 model. Single-box results are - kept as-is — likely real sparse scenes, not INT8 degradation. - """ - if not hasattr(self, '_veh_providers_logged'): - provs = self.veh_session.get_providers() - logger.warning(f"[vehicle] First inference — active providers: {provs}") - self._veh_providers_logged = True - boxes = self._infer_vehicle_core(image_bgr, self.veh_session) - - if len(boxes) == 0 and (self.veh_session_fp32 or self._veh_fp32_path): - # Lazy-load FP32 session on first trigger - if self.veh_session_fp32 is None and self._veh_fp32_path: - try: - self.veh_session_fp32 = ort.InferenceSession( - self._veh_fp32_path, - providers=["CUDAExecutionProvider", "CPUExecutionProvider"], - ) - logger.info("[vehicle] FP32 fallback lazy-loaded") - except Exception as e: - logger.warning(f"[vehicle] FP32 lazy-load failed: {e}") - self._veh_fp32_path = None - if self.veh_session_fp32: - boxes_fp32 = self._infer_vehicle_core(image_bgr, self.veh_session_fp32) - if len(boxes_fp32) > len(boxes): - logger.warning( - f"[vehicle] INT8 degraded ({len(boxes)} boxes), " - f"FP32 fallback recovered ({len(boxes_fp32)} boxes)" - ) - return boxes_fp32 - - return boxes - - # ── Vehicle parts confirmation ─────────────────────────────────────── - - @staticmethod - def _veh_check_driver(vb, person_boxes): - """Check if any person detection overlaps the driver/passenger region. - - Driver region: upper 55% height, center 70% width of vehicle box. - A person's center inside this region → vehicle confirmed. - """ - if not person_boxes: - return False - vw = vb.x2 - vb.x1 - vh = vb.y2 - vb.y1 - dr_x1 = vb.x1 + vw * 0.15 - dr_y1 = vb.y1 - dr_x2 = vb.x2 - vw * 0.15 - dr_y2 = vb.y1 + vh * 0.55 - for pb in person_boxes: - pcx = (pb.x1 + pb.x2) / 2 - pcy = (pb.y1 + pb.y2) / 2 - if dr_x1 <= pcx <= dr_x2 and dr_y1 <= pcy <= dr_y2: - return True - return False - - def _veh_check_rider(self, moto_box, person_boxes): - """Check if motorcycle has a rider, optionally with forward-lean pose. - - Returns (has_overlap, has_lean_pose). - Uses cached pose keypoints from person pipeline to check torso angle. - Motorcycle riders lean forward (torso > 15° from vertical). - """ - if not person_boxes: - return False, False - mw = moto_box.x2 - moto_box.x1 - mh = moto_box.y2 - moto_box.y1 - mx = mw * 0.1 - my = mh * 0.1 - has_overlap = False - for pb in person_boxes: - pcx = (pb.x1 + pb.x2) / 2 - pcy = (pb.y1 + pb.y2) / 2 - if (moto_box.x1 - mx <= pcx <= moto_box.x2 + mx and - moto_box.y1 - my <= pcy <= moto_box.y2 + my): - has_overlap = True - break - if not has_overlap: - return False, False - - # Check forward-lean pose using cached pose data - if self._cached_pose_data is None: - return True, False - pose_boxes, pose_kps = self._cached_pose_data - if len(pose_boxes) == 0: - return True, False - - for j in range(len(pose_boxes)): - pb = pose_boxes[j] - pcx = (pb[0] + pb[2]) / 2 - pcy = (pb[1] + pb[3]) / 2 - if not (moto_box.x1 - mx <= pcx <= moto_box.x2 + mx and - moto_box.y1 - my <= pcy <= moto_box.y2 + my): - continue - kps = pose_kps[j] - # Need at least one shoulder + one hip visible - l_sh, r_sh = kps[5], kps[6] - l_hip, r_hip = kps[11], kps[12] - sh_vis = [k[:2] for k in [l_sh, r_sh] if k[2] >= POSE_KP_CONF] - hip_vis = [k[:2] for k in [l_hip, r_hip] if k[2] >= POSE_KP_CONF] - if not sh_vis or not hip_vis: - continue - sh_mid = np.mean(sh_vis, axis=0) - hip_mid = np.mean(hip_vis, axis=0) - dx = sh_mid[0] - hip_mid[0] - dy = hip_mid[1] - sh_mid[1] # positive = shoulder above hip - if dy <= 0: - continue - angle = math.degrees(math.atan2(abs(dx), dy)) - if angle >= VEH_PARTS_RIDER_LEAN_DEG: - return True, True - return True, False - - def _veh_check_headlights(self, vb, image_bgr): - """Detect bright symmetric pair in lower portion of vehicle box. - - Requires two bright blobs at similar y, on opposite sides of center, - with similar area. Only checks vehicles wider than VEH_PARTS_HL_MIN_PX. - """ - bw = vb.x2 - vb.x1 - bh = vb.y2 - vb.y1 - if bw < VEH_PARTS_HL_MIN_PX or bh < 30: - return False - - oh, ow = image_bgr.shape[:2] - y1 = max(0, min(oh, int(vb.y1 + bh * 0.65))) - y2 = max(0, min(oh, int(vb.y2))) - x1 = max(0, min(ow, int(vb.x1))) - x2 = max(0, min(ow, int(vb.x2))) - if y2 - y1 < 5 or x2 - x1 < 10: - return False - - roi = image_bgr[y1:y2, x1:x2] - gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) - _, bright = cv2.threshold(gray, VEH_PARTS_HL_BRIGHT, 255, cv2.THRESH_BINARY) - contours, _ = cv2.findContours(bright, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - - blobs = [] - for c in contours: - area = cv2.contourArea(c) - if area < VEH_PARTS_HL_MIN_BLOB: - continue - M = cv2.moments(c) - if M["m00"] < 1: - continue - blobs.append((M["m10"] / M["m00"], M["m01"] / M["m00"], area)) - - if len(blobs) < 2: - return False - - roi_mid = (x2 - x1) / 2.0 - roi_h = y2 - y1 - for i in range(len(blobs)): - for j in range(i + 1, len(blobs)): - b1, b2 = blobs[i], blobs[j] - if abs(b1[1] - b2[1]) > roi_h * 0.4: - continue - if max(b1[2], b2[2]) / max(min(b1[2], b2[2]), 1) > 3.0: - continue - if (b1[0] - roi_mid) * (b2[0] - roi_mid) < 0: - return True - return False - - def _veh_check_windows(self, vb, image_bgr): - """Detect repeated window pattern (bus/coach signature) using vertical edge periodicity. - - Extracts middle horizontal band, applies vertical Sobel, projects vertically, - and checks for 3+ regularly-spaced peaks (window frame edges). - Only for large vehicles (truck cls_id=2). - """ - bw = vb.x2 - vb.x1 - bh = vb.y2 - vb.y1 - if bw < VEH_PARTS_WINDOW_MIN_PX or bh < 40: - return False - - oh, ow = image_bgr.shape[:2] - # Middle 40% of height (window band on a bus/coach) - y1 = max(0, min(oh, int(vb.y1 + bh * 0.30))) - y2 = max(0, min(oh, int(vb.y1 + bh * 0.70))) - x1 = max(0, min(ow, int(vb.x1))) - x2 = max(0, min(ow, int(vb.x2))) - if y2 - y1 < 10 or x2 - x1 < 30: - return False - - roi = image_bgr[y1:y2, x1:x2] - gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) - - # Vertical edge detection (window frames are vertical edges) - sobel_v = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) - abs_sobel = np.abs(sobel_v) - - # Project vertically: mean per column - projection = abs_sobel.mean(axis=0) - if len(projection) < 10: - return False - - # Smooth projection - ks = max(3, int(len(projection) * 0.02) | 1) - projection = np.convolve(projection, np.ones(ks) / ks, mode='same') - - # Find peaks above mean + 1 std - thresh = projection.mean() + projection.std() - peaks = [] - in_peak = False - pk_start = 0 - for i in range(len(projection)): - if projection[i] > thresh: - if not in_peak: - pk_start = i - in_peak = True - else: - if in_peak: - peaks.append((pk_start + i) // 2) - in_peak = False - if in_peak: - peaks.append((pk_start + len(projection) - 1) // 2) - - if len(peaks) < VEH_PARTS_WINDOW_MIN_PEAKS: - return False - - # Check regular spacing: gaps within 40% of median - gaps = [peaks[i + 1] - peaks[i] for i in range(len(peaks) - 1)] - if not gaps: - return False - med = sorted(gaps)[len(gaps) // 2] - if med < 5: - return False - regular = sum(1 for g in gaps if abs(g - med) / max(med, 1) < 0.4) - return regular >= len(gaps) * 0.6 - - def _veh_check_plate(self, vb, image_bgr): - """Run license plate detector on a vehicle crop. Returns True if plate found.""" - if self.plate_session is None: - return False - bw = vb.x2 - vb.x1 - if bw < VEH_PARTS_PLATE_MIN_PX: - return False - - oh, ow = image_bgr.shape[:2] - # Crop vehicle region with 5% padding - pad_x = int(bw * 0.05) - pad_y = int((vb.y2 - vb.y1) * 0.05) - cx1 = max(0, int(vb.x1) - pad_x) - cy1 = max(0, int(vb.y1) - pad_y) - cx2 = min(ow, int(vb.x2) + pad_x) - cy2 = min(oh, int(vb.y2) + pad_y) - crop = image_bgr[cy1:cy2, cx1:cx2] - if crop.size == 0: - return False - - # Letterbox to plate model input - ch, cw = crop.shape[:2] - r = min(self.plate_h / ch, self.plate_w / cw) - nw, nh = int(round(cw * r)), int(round(ch * r)) - img_r = cv2.resize(crop, (nw, nh), interpolation=cv2.INTER_LINEAR) - dw, dh = self.plate_w - nw, self.plate_h - nh - pl, pt = dw // 2, dh // 2 - img_p = cv2.copyMakeBorder( - img_r, pt, dh - pt, pl, dw - pl, - cv2.BORDER_CONSTANT, value=(114, 114, 114), - ) - rgb = cv2.cvtColor(img_p, cv2.COLOR_BGR2RGB) - inp = rgb.astype(np.float32) / 255.0 - inp = np.ascontiguousarray(inp.transpose(2, 0, 1)[np.newaxis]) - - raw = self.plate_session.run(None, {self.plate_input_name: inp})[0] - pred = raw[0] if raw.ndim == 3 else raw - - # Handle both [N,6] end2end (post-NMS) and [N, 5+nc] raw formats - if pred.shape[0] < pred.shape[1]: - pred = pred.T # transpose [5+nc, N] -> [N, 5+nc] - if pred.shape[1] < 5: - return False - # End2end post-NMS: few detections (< 500), col4=conf already final - if pred.shape[0] < 500 and pred.shape[1] == 6: - confs = pred[:, 4] - elif pred.shape[1] == 5: - confs = pred[:, 4] # single objectness score - else: - # Raw: x,y,w,h,objectness,cls_scores... → conf = obj * max(cls) - confs = pred[:, 4] * np.max(pred[:, 5:], axis=1) - return bool((confs >= VEH_PARTS_PLATE_CONF).any()) - - def _vehicle_parts_confirm(self, vehicle_boxes, person_boxes, image_bgr): - """Parts-based confidence scoring for vehicle detections. - - Scoring hierarchy (confidence boosts are additive): - 1. License plate detected → +0.12 (strong, never suppress) - 2. Person (driver/rider) inside vehicle → +0.08-0.10 - 3. Headlight pair detected → +0.05 - 4. Bus window pattern on truck → +0.06 - 5. No parts but small/distant or high-conf → keep original - 6. Large + low-conf + no parts → suppress as FP - - Small/distant vehicles (area < 0.4% of image) are always exempt. - Bus (cls_id=4) suppressed in _infer_vehicle — window check applies to trucks. - """ - if not vehicle_boxes or not VEH_PARTS_ENABLED: - return vehicle_boxes - - oh, ow = image_bgr.shape[:2] - img_area = float(oh * ow) - has_plate_model = self.plate_session is not None - # Skip plate checks on crowded scenes (aerial/drone, plates invisible) - skip_plate = len(vehicle_boxes) > 20 - - result = [] - n_driver = 0 - n_rider = 0 - n_rider_lean = 0 - n_headlight = 0 - n_window = 0 - n_plate = 0 - n_suppressed = 0 - - for vb in vehicle_boxes: - bw = vb.x2 - vb.x1 - bh = vb.y2 - vb.y1 - area_ratio = (bw * bh) / img_area - - # Small/distant: exempt from parts check - if area_ratio < VEH_PARTS_SMALL_AREA: - result.append(vb) - continue - - boost = 0.0 - confirmed = False - - # Check 1: License plate (strongest signal) - if has_plate_model and not skip_plate and bw >= VEH_PARTS_PLATE_MIN_PX: - try: - if self._veh_check_plate(vb, image_bgr): - boost += VEH_PARTS_BOOST_PLATE - confirmed = True - n_plate += 1 - except Exception: - pass - - # Check 2: Driver/passenger inside car or truck - if vb.cls_id in (1, 2): - if self._veh_check_driver(vb, person_boxes): - boost += VEH_PARTS_BOOST_DRIVER - confirmed = True - n_driver += 1 - - # Check 3: Motorcycle rider (overlap + optional lean pose) - if vb.cls_id == 3: - has_overlap, has_lean = self._veh_check_rider(vb, person_boxes) - if has_overlap: - boost += VEH_PARTS_BOOST_RIDER - if has_lean: - boost += 0.05 # Extra for confirmed lean pose - n_rider_lean += 1 - confirmed = True - n_rider += 1 - - # Check 4: Headlight pair - if bw >= VEH_PARTS_HL_MIN_PX: - try: - if self._veh_check_headlights(vb, image_bgr): - boost += VEH_PARTS_BOOST_HL - confirmed = True - n_headlight += 1 - except Exception: - pass - - # Check 5: Window pattern (large trucks that might be buses) - if vb.cls_id == 2 and bw >= VEH_PARTS_WINDOW_MIN_PX: - try: - if self._veh_check_windows(vb, image_bgr): - boost += VEH_PARTS_BOOST_WINDOW - n_window += 1 - except Exception: - pass - - # Apply boost and decide - new_conf = min(1.0, vb.conf + boost) - - if confirmed: - result.append(BoundingBox( - x1=vb.x1, y1=vb.y1, x2=vb.x2, y2=vb.y2, - cls_id=vb.cls_id, conf=new_conf, - )) - elif area_ratio > VEH_PARTS_FP_AREA: - # Large vehicle — use stricter threshold if plate model loaded - fp_thresh = VEH_PARTS_FP_CONF_STRICT if (has_plate_model and not skip_plate) else VEH_PARTS_FP_CONF - if vb.conf < fp_thresh: - n_suppressed += 1 - else: - result.append(vb) - else: - result.append(vb) - - if n_driver or n_rider or n_headlight or n_window or n_plate or n_suppressed: - logger.info(f"[veh-parts] plate={n_plate} driver={n_driver} rider={n_rider}" - f"(lean={n_rider_lean}) hl={n_headlight} win={n_window} " - f"suppress={n_suppressed}, kept {len(result)}/{len(vehicle_boxes)}") - return result - - # ── Person preprocessing (letterbox) ────────────────────────────────── - - def _per_letterbox(self, img): - h, w = img.shape[:2] - r = min(self.per_h / h, self.per_w / w) - nw, nh = int(round(w * r)), int(round(h * r)) - interp = cv2.INTER_CUBIC if r > 1.0 else cv2.INTER_LINEAR - img_r = cv2.resize(img, (nw, nh), interpolation=interp) - dw, dh = self.per_w - nw, self.per_h - nh - pl, pt = dw // 2, dh // 2 - img_p = cv2.copyMakeBorder( - img_r, pt, dh - pt, pl, dw - pl, - cv2.BORDER_CONSTANT, value=(114, 114, 114), + return ( + f"NumberplateMiner session={type(self.session).__name__} " + f"input={self.input_h}x{self.input_w} classes={len(self.class_names)}" ) - return img_p, r, pl, pt - - def _per_preprocess(self, image_bgr): - img_p, ratio, pl, pt = self._per_letterbox(image_bgr) - rgb = cv2.cvtColor(img_p, cv2.COLOR_BGR2RGB) - inp = rgb.astype(np.float32) / 255.0 - inp = np.ascontiguousarray(inp.transpose(2, 0, 1)[np.newaxis]) - return inp, ratio, pl, pt - - def _per_enhance(self, img_bgr): - """Adaptive CLAHE: only apply to low-contrast frames, mild clip=2.0.""" - lab = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB) - l, a, b = cv2.split(lab) - if float(l.std()) < PER_CLAHE_CONTRAST_THRESH: - clahe = cv2.createCLAHE(clipLimit=PER_CLAHE_CLIP, tileGridSize=(8, 8)) - l = clahe.apply(l) - return cv2.cvtColor(cv2.merge([l, a, b]), cv2.COLOR_LAB2BGR) - return img_bgr # skip CLAHE on normal-contrast images - @staticmethod - def _frame_blur_score(img_bgr): - """Laplacian variance blur metric. Lower = blurrier.""" - gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) - return cv2.Laplacian(gray, cv2.CV_64F).var() + # ---------------------------------------------------------------- preproc + def _preprocess(self, image_bgr: ndarray): + """Letterbox the BGR image to (input_h, input_w), preserving aspect. - @staticmethod - def _perspective_penalty(boxes, confs, image_h): - """Apply confidence penalty to perspective-anomalous person detections. - - Model: expected_height(y) = alpha * (y_foot - y_vp), where y_vp = image_h / 3. - Alpha is estimated from the median height/distance ratio across detections. - Detections deviating >3x from expected get conf *= 0.85. - Fails open (returns confs unchanged) when model can't be estimated. + Returns the float32 NCHW tensor plus the metadata needed to undo + the letterbox during decode: (orig_h, orig_w, scale, dx, dy). """ - n = len(boxes) - if n < PERSP_MIN_DETECTIONS: - return confs - - y_vp = image_h / 3.0 - y_feet = boxes[:, 3] - heights = boxes[:, 3] - boxes[:, 1] - - valid = y_feet > (y_vp + 10) - if valid.sum() < PERSP_MIN_DETECTIONS: - return confs - - valid_y = y_feet[valid] - valid_h = heights[valid] - - y_spread = (valid_y.max() - valid_y.min()) / image_h - if y_spread < PERSP_MIN_Y_SPREAD: - return confs - - alpha = float(np.median(valid_h / (valid_y - y_vp))) - if alpha <= 0.01: - return confs - - new_confs = confs.copy() - for i in range(n): - if y_feet[i] <= y_vp: - continue - expected_h = alpha * (y_feet[i] - y_vp) - if expected_h <= 0: - continue - ratio = heights[i] / expected_h - if ratio > PERSP_DEVIATION_THRESH or ratio < (1.0 / PERSP_DEVIATION_THRESH): - new_confs[i] *= PERSP_CONF_PENALTY - - return new_confs - - def _per_decode(self, raw, ratio, pl, pt, oh, ow, conf_thresh): + h, w = image_bgr.shape[:2] + scale = min(self.input_h / h, self.input_w / w) + nh, nw = int(round(h * scale)), int(round(w * scale)) + resized = cv2.resize(image_bgr, (nw, nh)) + # Pad to (input_h, input_w) with grey (114) - ultralytics default + canvas = np.full((self.input_h, self.input_w, 3), 114, dtype=np.uint8) + dy = (self.input_h - nh) // 2 + dx = (self.input_w - nw) // 2 + canvas[dy:dy + nh, dx:dx + nw] = resized + rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB) + x = rgb.astype(np.float32) / 255.0 + x = np.transpose(x, (2, 0, 1))[None, ...] + return x, (h, w, scale, dx, dy) + + # ---------------------------------------------------------------- decode + def _normalize_predictions(self, raw: np.ndarray) -> np.ndarray: + """Handle both common ultralytics export shapes ([1,C,N] and [1,N,C]).""" pred = raw[0] if pred.ndim != 2: - return np.empty((0, 4)), np.empty(0) - - # Auto-detect output format - if pred.shape[-1] == 6 and pred.shape[0] > pred.shape[1]: - # YOLO26 end2end: [N, 6] = [x1, y1, x2, y2, conf, class_id] - confs = pred[:, 4] - keep = confs >= conf_thresh - boxes, confs = pred[keep, :4], confs[keep] - if len(boxes) == 0: - return np.empty((0, 4)), np.empty(0) - boxes[:, 0] = np.floor((boxes[:, 0] - pl) / ratio) - boxes[:, 1] = np.floor((boxes[:, 1] - pt) / ratio) - boxes[:, 2] = np.ceil((boxes[:, 2] - pl) / ratio) - boxes[:, 3] = np.ceil((boxes[:, 3] - pt) / ratio) - boxes = np.clip(boxes, 0, [[ow, oh, ow, oh]]) - return boxes, confs - - # YOLO11 raw format: [5+nc, N] or [N, 5+nc] - if pred.shape[0] < pred.shape[1]: - pred = pred.T - if pred.shape[1] < 5: - return np.empty((0, 4)), np.empty(0) - cls_scores = pred[:, 4:] - confs = np.max(cls_scores, axis=1) - keep = confs >= conf_thresh - boxes, confs = pred[keep, :4], confs[keep] - if len(boxes) == 0: - return np.empty((0, 4)), np.empty(0) - cx, cy, bw, bh = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] - x1 = np.clip(np.floor((cx - bw / 2 - pl) / ratio), 0, ow) - y1 = np.clip(np.floor((cy - bh / 2 - pt) / ratio), 0, oh) - x2 = np.clip(np.ceil((cx + bw / 2 - pl) / ratio), 0, ow) - y2 = np.clip(np.ceil((cy + bh / 2 - pt) / ratio), 0, oh) - return np.stack([x1, y1, x2, y2], axis=1), confs - - def _per_run_pass(self, image_bgr, conf_thresh): - oh, ow = image_bgr.shape[:2] - inp, ratio, pl, pt = self._per_preprocess(image_bgr) - raw = self.per_session.run(None, {self.per_input_name: inp})[0] - return self._per_decode(raw, ratio, pl, pt, oh, ow, conf_thresh) - - def _generate_tiles(self, h, w): - """SAHI-inspired tile generation. - - Smart 2-tile split: horizontal for landscape, vertical for portrait. - Edge-aware: for landscape, split in upper portion to avoid cutting - through people standing in bottom third. - Returns: [(x1,y1,x2,y2), ...] — always starts with full image. - """ - tiles = [(0, 0, w, h)] # full image always first - - # Only tile if image significantly exceeds model input - if max(h, w) <= max(self.per_h, self.per_w) * PER_TILE_MIN_DIM_RATIO: - return tiles - - overlap_px_x = int(w * PER_TILE_OVERLAP) - overlap_px_y = int(h * PER_TILE_OVERLAP) - - if w >= h: - # Landscape: 2 horizontal tiles (left + right) - mid = w // 2 - tiles.append((0, 0, mid + overlap_px_x, h)) - tiles.append((mid - overlap_px_x, 0, w, h)) - else: - # Portrait: 2 vertical tiles (top + bottom) - # Edge-aware: bias split toward upper portion (people stand at bottom) - mid = int(h * 0.45) # split at 45% height, not 50% - tiles.append((0, 0, w, mid + overlap_px_y)) - tiles.append((0, mid - overlap_px_y, w, h)) - - return tiles - - def _per_run_tile(self, image_bgr, tile_region, conf_thresh): - """Run person model on a tile crop, return boxes in original coords.""" - x1t, y1t, x2t, y2t = tile_region - crop = image_bgr[y1t:y2t, x1t:x2t] - boxes, confs = self._per_run_pass(crop, conf_thresh) - if len(boxes) == 0: - return np.empty((0, 4)), np.empty(0) - # Shift back to original image coordinates - boxes[:, 0] += x1t - boxes[:, 1] += y1t - boxes[:, 2] += x1t - boxes[:, 3] += y1t - return boxes, confs - - @staticmethod - @staticmethod - def _nms_max_conf(boxes, scores, iou_thr, sigma=0.5, min_conf=0.20): - """Soft-NMS with Gaussian decay (replaces hard NMS). - - Instead of suppressing overlapping boxes entirely, decays their - confidence: score_j *= exp(-(iou^2) / sigma). This preserves - partially-occluded detections in crowds while still penalising - duplicates. Boxes whose confidence decays below min_conf are - removed. - """ - if len(boxes) == 0: - return np.empty((0, 4)), np.empty(0) - - b = boxes.copy().astype(np.float64) - s = scores.copy().astype(np.float64) - n = len(s) - indices = list(range(n)) - - for i in range(n): - # Find current max-confidence box - max_idx = i - for j in range(i + 1, n): - if s[indices[j]] > s[indices[max_idx]]: - max_idx = j - # Swap to front - indices[i], indices[max_idx] = indices[max_idx], indices[i] - - ix = indices[i] - # Decay overlapping boxes - for j in range(i + 1, n): - jx = indices[j] - xx1 = max(b[ix, 0], b[jx, 0]) - yy1 = max(b[ix, 1], b[jx, 1]) - xx2 = min(b[ix, 2], b[jx, 2]) - yy2 = min(b[ix, 3], b[jx, 3]) - inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1) - a1 = (b[ix, 2] - b[ix, 0]) * (b[ix, 3] - b[ix, 1]) - a2 = (b[jx, 2] - b[jx, 0]) * (b[jx, 3] - b[jx, 1]) - iou = inter / (a1 + a2 - inter + 1e-9) - if iou > 0: - s[jx] *= np.exp(-(iou * iou) / sigma) - - # Keep boxes above min_conf - keep = [indices[i] for i in range(n) if s[indices[i]] >= min_conf] - if not keep: - return np.empty((0, 4)), np.empty(0) - return b[keep], s[keep] - - # ── Pose FP filter + box refinement ────────────────────────────────── - - def _pose_run(self, image_bgr): - """Run pose model on full image, return (boxes [N,4], confs [N], keypoints [N,17,3]) in original coords.""" - if self.pose_session is None: - return np.empty((0, 4)), np.empty(0), np.empty((0, 17, 3)) - - oh, ow = image_bgr.shape[:2] - - # Letterbox to pose model input size - r = min(self.pose_h / oh, self.pose_w / ow) - nw, nh = int(round(ow * r)), int(round(oh * r)) - img_r = cv2.resize(image_bgr, (nw, nh), interpolation=cv2.INTER_LINEAR) - dw, dh = self.pose_w - nw, self.pose_h - nh - pl, pt = dw // 2, dh // 2 - img_p = cv2.copyMakeBorder( - img_r, pt, dh - pt, pl, dw - pl, - cv2.BORDER_CONSTANT, value=(114, 114, 114), - ) - - rgb = cv2.cvtColor(img_p, cv2.COLOR_BGR2RGB) - inp = rgb.astype(np.float32) / 255.0 - inp = np.ascontiguousarray(inp.transpose(2, 0, 1)[np.newaxis]) - - raw = self.pose_session.run(None, {self.pose_input_name: inp})[0] - - # raw shape: [1, 56, 8400] -> transpose to [8400, 56] - pred = raw[0] if raw.ndim == 3 else raw + raise ValueError(f"Unexpected prediction shape: {raw.shape}") if pred.shape[0] < pred.shape[1]: - pred = pred.T - - # Decode: cols 0-3=xywh, col 4=conf, cols 5-55=17*3 keypoints - confs = pred[:, 4] - keep = confs >= POSE_CONF_THRESH - if not keep.any(): - return np.empty((0, 4)), np.empty(0), np.empty((0, 17, 3)) - - pred = pred[keep] - confs = pred[:, 4] - - # Convert xywh to x1y1x2y2 in original coords - cx, cy, bw, bh = pred[:, 0], pred[:, 1], pred[:, 2], pred[:, 3] - x1 = np.clip((cx - bw / 2 - pl) / r, 0, ow) - y1 = np.clip((cy - bh / 2 - pt) / r, 0, oh) - x2 = np.clip((cx + bw / 2 - pl) / r, 0, ow) - y2 = np.clip((cy + bh / 2 - pt) / r, 0, oh) - boxes = np.stack([x1, y1, x2, y2], axis=1) - - # Decode keypoints: [N, 51] -> [N, 17, 3] - kp_raw = pred[:, 5:].reshape(-1, 17, 3).copy() - kp_raw[:, :, 0] = (kp_raw[:, :, 0] - pl) / r # x - kp_raw[:, :, 1] = (kp_raw[:, :, 1] - pt) / r # y - kp_raw[:, :, 0] = np.clip(kp_raw[:, :, 0], 0, ow) - kp_raw[:, :, 1] = np.clip(kp_raw[:, :, 1], 0, oh) - - # NMS on pose detections - order = np.argsort(-confs) - boxes = boxes[order] - confs = confs[order] - kp_raw = kp_raw[order] - - keep_idx = [] - suppressed = set() - for i in range(len(boxes)): - if i in suppressed: - continue - keep_idx.append(i) - for j in range(i + 1, len(boxes)): - if j in suppressed: - continue - xx1 = max(boxes[i, 0], boxes[j, 0]) - yy1 = max(boxes[i, 1], boxes[j, 1]) - xx2 = min(boxes[i, 2], boxes[j, 2]) - yy2 = min(boxes[i, 3], boxes[j, 3]) - inter = max(0, xx2 - xx1) * max(0, yy2 - yy1) - a1 = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]) - a2 = (boxes[j, 2] - boxes[j, 0]) * (boxes[j, 3] - boxes[j, 1]) - iou_val = inter / (a1 + a2 - inter + 1e-9) - if iou_val >= POSE_NMS_IOU: - suppressed.add(j) - - if not keep_idx: - return np.empty((0, 4)), np.empty(0), np.empty((0, 17, 3)) - keep_idx = np.array(keep_idx) - return boxes[keep_idx], confs[keep_idx], kp_raw[keep_idx] - - _FACE_SIZE = 640 - _FACE_STRIDES = (8, 16, 32) - _FACE_NUM_ANCHORS = 2 - _FACE_THRESH = 0.5 - _FACE_NMS_THRESH = 0.4 - - def _face_run(self, image_bgr): - """Run SCRFD-500M face detector. Returns (face_boxes [N,4], face_confs [N]).""" - if self.face_session is None: - return np.empty((0, 4)), np.empty(0) - - oh, ow = image_bgr.shape[:2] - sz = self._FACE_SIZE - - # Letterbox resize preserving aspect ratio (top-left aligned) - scale = min(sz / oh, sz / ow) - nw, nh = int(round(ow * scale)), int(round(oh * scale)) - resized = cv2.resize(image_bgr, (nw, nh), interpolation=cv2.INTER_LINEAR) - det_img = np.zeros((sz, sz, 3), dtype=np.uint8) - det_img[:nh, :nw, :] = resized - - # Preprocess: BGR→RGB, (pixel - 127.5) / 128.0 - blob = cv2.dnn.blobFromImage( - det_img, 1.0 / 128.0, (sz, sz), (127.5, 127.5, 127.5), swapRB=True, - ) - - outputs = self.face_session.run(None, {self.face_input_name: blob}) - - # Decode 3 stride levels: outputs[0:3]=scores, [3:6]=bboxes, [6:9]=kps - all_scores, all_boxes = [], [] - for idx, stride in enumerate(self._FACE_STRIDES): - scores = outputs[idx][:, 0] # (N,) - bbox_d = outputs[idx + 3] # (N, 4) distances - keep = scores >= self._FACE_THRESH - if not keep.any(): - continue - scores = scores[keep] - bbox_d = bbox_d[keep] - - # Generate anchor centers for kept positions - fh, fw = sz // stride, sz // stride - grid_y, grid_x = np.mgrid[:fh, :fw] - centers = np.stack([grid_x, grid_y], axis=-1).astype(np.float32).reshape(-1, 2) - centers = np.tile(centers, (1, self._FACE_NUM_ANCHORS)).reshape(-1, 2) * stride - centers = centers[keep] - - # distance → bbox: [x1, y1, x2, y2] - x1 = centers[:, 0] - bbox_d[:, 0] * stride - y1 = centers[:, 1] - bbox_d[:, 1] * stride - x2 = centers[:, 0] + bbox_d[:, 2] * stride - y2 = centers[:, 1] + bbox_d[:, 3] * stride - boxes = np.stack([x1, y1, x2, y2], axis=-1) / scale - - all_scores.append(scores) - all_boxes.append(boxes) - - if not all_scores: - return np.empty((0, 4)), np.empty(0) + pred = pred.transpose(1, 0) + return pred - scores = np.concatenate(all_scores) - boxes = np.concatenate(all_boxes) - - # NMS - order = scores.argsort()[::-1] - scores, boxes = scores[order], boxes[order] - keep = [] - x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] - areas = (x2 - x1) * (y2 - y1) - suppressed = np.zeros(len(scores), dtype=bool) - for i in range(len(scores)): - if suppressed[i]: - continue - keep.append(i) - xx1 = np.maximum(x1[i], x1[i + 1:]) - yy1 = np.maximum(y1[i], y1[i + 1:]) - xx2 = np.minimum(x2[i], x2[i + 1:]) - yy2 = np.minimum(y2[i], y2[i + 1:]) - inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1) - ovr = inter / (areas[i] + areas[i + 1:] - inter + 1e-6) - suppressed[i + 1:] |= ovr > self._FACE_NMS_THRESH - - return boxes[keep], scores[keep] - - @staticmethod - def _anatomical_score(kps, kp_conf_thresh=POSE_KP_CONF): - """Compute weighted anatomical score from keypoints [17, 3]. - - Returns (score, has_head, n_visible): - score: weighted sum of visible keypoints (0.0-1.0) - has_head: True if any head keypoint (nose/eyes/ears) is visible - n_visible: number of visible keypoints - """ - visible = kps[:, 2] >= kp_conf_thresh - n_visible = int(visible.sum()) - score = float((visible.astype(np.float32) * POSE_KP_WEIGHTS).sum()) - has_head = bool(visible[POSE_HEAD_KP].any()) - return score, has_head, n_visible - - def _refine_box_with_keypoints(self, pb, kps, ow, oh): - """Blend person box with tight keypoint bbox.""" - visible = kps[:, 2] >= POSE_KP_CONF - if not visible.any(): - return pb - vis_kps = kps[visible] - kp_x1 = float(vis_kps[:, 0].min()) - kp_y1 = float(vis_kps[:, 1].min()) - kp_x2 = float(vis_kps[:, 0].max()) - kp_y2 = float(vis_kps[:, 1].max()) - - # Pad around keypoint bbox - kp_w = kp_x2 - kp_x1 - kp_h = kp_y2 - kp_y1 - pad_x = kp_w * POSE_KP_PAD - pad_y = kp_h * POSE_KP_PAD - kp_x1 = max(0, kp_x1 - pad_x) - kp_y1 = max(0, kp_y1 - pad_y) - kp_x2 = min(ow, kp_x2 + pad_x) - kp_y2 = min(oh, kp_y2 + pad_y) - - a = POSE_REFINE_BLEND - return BoundingBox( - x1=max(0, min(ow, int(pb.x1 * (1 - a) + kp_x1 * a))), - y1=max(0, min(oh, int(pb.y1 * (1 - a) + kp_y1 * a))), - x2=max(0, min(ow, int(pb.x2 * (1 - a) + kp_x2 * a))), - y2=max(0, min(oh, int(pb.y2 * (1 - a) + kp_y2 * a))), - cls_id=0, - conf=pb.conf, - ) - - def _pose_filter_refine(self, person_boxes, image_bgr): - """Filter FP detections and refine boxes using anatomical keypoint scoring. - - Anatomical scoring: weighted sum of visible keypoints where head/face - keypoints (nose, eyes, ears) contribute most, upper body (shoulders, - elbows, wrists) next, lower body (hips, knees, ankles) least. - - Decision logic: - 1. Run pose model once on full image. - 2. Run face detector (if available) for additional confirmation. - 3. Match each person detection to best-overlapping pose detection. - 4. For matched boxes: - a. Head keypoints visible OR face detected → KEEP + refine (never suppress) - b. Anatomical score >= REFINE threshold → KEEP + refine - c. Anatomical score > 0 → KEEP as-is (partially visible person) - d. Anatomical score == 0 + large + low-conf → SUPPRESS (FP candidate) - 5. For unmatched boxes: - a. Face detected inside box → KEEP - b. Large + low-conf → SUPPRESS - c. Small or high-conf → KEEP (SAHI-detected or confident) - """ - if not person_boxes or self.pose_session is None: - return person_boxes - - oh, ow = image_bgr.shape[:2] - img_area = float(oh * ow) - - # Run pose model - t_pose = time.monotonic() - pose_boxes, pose_confs, pose_kps = self._pose_run(image_bgr) - dt_pose = (time.monotonic() - t_pose) * 1000 - - # Cache pose data for motorcycle rider check in vehicle parts confirmation - self._cached_pose_data = (pose_boxes, pose_kps) - - # Run face detector if available - face_boxes = np.empty((0, 4)) - if self.face_session is not None: - t_face = time.monotonic() - face_boxes, _ = self._face_run(image_bgr) - dt_face = (time.monotonic() - t_face) * 1000 - logger.info(f"[pose] {len(pose_boxes)} pose, {len(face_boxes)} faces " - f"in {dt_pose:.0f}+{dt_face:.0f}ms") - else: - logger.info(f"[pose] {len(pose_boxes)} pose detections in {dt_pose:.0f}ms") - - # Helper: check if any face detection is inside a person box - def has_face_inside(pb): - if len(face_boxes) == 0: - return False - for fb in face_boxes: - # Face center must be inside person box - fcx = (fb[0] + fb[2]) / 2 - fcy = (fb[1] + fb[3]) / 2 - if pb.x1 <= fcx <= pb.x2 and pb.y1 <= fcy <= pb.y2: - return True - return False - - if len(pose_boxes) == 0: - # No pose detections — use face detector or size/conf heuristic - result = [] - n_suppressed = 0 - for pb in person_boxes: - if has_face_inside(pb): - result.append(pb) - continue - bw = pb.x2 - pb.x1 - bh = pb.y2 - pb.y1 - area_ratio = (bw * bh) / img_area - if area_ratio > POSE_FP_MIN_AREA and pb.conf < POSE_FP_MAX_CONF: - n_suppressed += 1 - continue - result.append(pb) - if n_suppressed: - logger.info(f"[pose] Suppressed {n_suppressed} FP (no pose detections)") - return result - - # Match person detections to pose detections via IoU - result = [] - n_refined = 0 - n_suppressed = 0 - n_face_saved = 0 - - for pb in person_boxes: - pb_arr = np.array([pb.x1, pb.y1, pb.x2, pb.y2], dtype=float) - best_iou = 0.0 - best_idx = -1 - - for j in range(len(pose_boxes)): - xx1 = max(pb_arr[0], pose_boxes[j, 0]) - yy1 = max(pb_arr[1], pose_boxes[j, 1]) - xx2 = min(pb_arr[2], pose_boxes[j, 2]) - yy2 = min(pb_arr[3], pose_boxes[j, 3]) - inter = max(0, xx2 - xx1) * max(0, yy2 - yy1) - a1 = (pb_arr[2] - pb_arr[0]) * (pb_arr[3] - pb_arr[1]) - a2 = (pose_boxes[j, 2] - pose_boxes[j, 0]) * (pose_boxes[j, 3] - pose_boxes[j, 1]) - iou_val = inter / (a1 + a2 - inter + 1e-9) - if iou_val > best_iou: - best_iou = iou_val - best_idx = j - - if best_iou >= POSE_MATCH_IOU and best_idx >= 0: - # Matched to a pose detection — compute anatomical score - kps = pose_kps[best_idx] # [17, 3] - anat_score, has_head, n_vis = self._anatomical_score(kps) - - if has_head or has_face_inside(pb): - # Head/face visible → definitely a person, refine box - result.append(self._refine_box_with_keypoints(pb, kps, ow, oh)) - n_refined += 1 - elif anat_score >= POSE_ANAT_REFINE_THRESH: - # Good anatomical score → person confirmed, refine - result.append(self._refine_box_with_keypoints(pb, kps, ow, oh)) - n_refined += 1 - elif anat_score > POSE_ANAT_SUPPRESS_THRESH: - # Some keypoints visible but low score — keep as-is - result.append(pb) - else: - # Matched to pose bbox but ZERO keypoints visible - # Only suppress if also large and low confidence - bw = pb.x2 - pb.x1 - bh = pb.y2 - pb.y1 - area_ratio = (bw * bh) / img_area - if area_ratio > POSE_FP_MIN_AREA and pb.conf < POSE_FP_MAX_CONF: - n_suppressed += 1 - continue - result.append(pb) - else: - # Not matched to any pose detection - if has_face_inside(pb): - # Face detector confirms a person - result.append(pb) - n_face_saved += 1 - continue - - bw = pb.x2 - pb.x1 - bh = pb.y2 - pb.y1 - area_ratio = (bw * bh) / img_area - - if area_ratio > POSE_FP_MIN_AREA and pb.conf < POSE_FP_MAX_CONF: - # Large unmatched low-conf box — likely FP - n_suppressed += 1 - continue - else: - # Small box or high conf — keep - result.append(pb) - - if n_refined or n_suppressed or n_face_saved: - logger.info(f"[pose] Refined {n_refined}, suppressed {n_suppressed} FP, " - f"face-saved {n_face_saved}, " - f"kept {len(result)}/{len(person_boxes)}") - return result - - # ── Person inference with SAHI tiling ──────────────────────────────── - - @staticmethod - def _match_boxes_iou(boxes_a, boxes_b, iou_thr): - """Match boxes from two sets by IoU. Returns (matched_pairs, unmatched_a, unmatched_b). - - matched_pairs: list of (idx_a, idx_b, iou) tuples - unmatched_a: list of indices in boxes_a with no match - unmatched_b: list of indices in boxes_b with no match + # ---------------------------------------------------------------- soft NMS + def _soft_nms( + self, + dets: list[tuple[float, float, float, float, float, int]], + ) -> list[tuple[float, float, float, float, float, int]]: + """Gaussian Soft-NMS for a single class. + + Decays each remaining box's score by ``exp(-iou^2 / sigma)`` against + the highest-scoring picked box, then drops anything below + ``self.score_threshold``. Returns detections in descending decayed + score order. """ - if len(boxes_a) == 0: - return [], [], list(range(len(boxes_b))) - if len(boxes_b) == 0: - return [], list(range(len(boxes_a))), [] - - matched_pairs = [] - used_b = set() - - for i in range(len(boxes_a)): - best_iou = 0 - best_j = -1 - for j in range(len(boxes_b)): - if j in used_b: - continue - xx1 = max(boxes_a[i, 0], boxes_b[j, 0]) - yy1 = max(boxes_a[i, 1], boxes_b[j, 1]) - xx2 = min(boxes_a[i, 2], boxes_b[j, 2]) - yy2 = min(boxes_a[i, 3], boxes_b[j, 3]) - inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1) - a1 = (boxes_a[i, 2] - boxes_a[i, 0]) * (boxes_a[i, 3] - boxes_a[i, 1]) - a2 = (boxes_b[j, 2] - boxes_b[j, 0]) * (boxes_b[j, 3] - boxes_b[j, 1]) - iou = inter / (a1 + a2 - inter + 1e-9) - if iou > best_iou: - best_iou = iou - best_j = j - if best_iou >= iou_thr: - matched_pairs.append((i, best_j, best_iou)) - used_b.add(best_j) - - matched_a = {p[0] for p in matched_pairs} - unmatched_a = [i for i in range(len(boxes_a)) if i not in matched_a] - unmatched_b = [j for j in range(len(boxes_b)) if j not in used_b] + if not dets: + return [] - return matched_pairs, unmatched_a, unmatched_b + boxes = np.asarray([[d[0], d[1], d[2], d[3]] for d in dets], dtype=np.float32) + scores = np.asarray([d[4] for d in dets], dtype=np.float32) + cls_ids = [int(d[5]) for d in dets] + n = len(dets) - def _infer_person(self, image_bgr): - """Person detection with TTA consensus merging. + keep_idx: list[int] = [] + keep_scores: list[float] = [] + active = np.ones(n, dtype=bool) - Pipeline (v3.23 — replaces concatenate+soft-NMS with consensus merging): - 1. Original pass at native 960px - 2. Flip TTA pass - 3. Match boxes across views (IoU >= PER_TTA_MATCH_IOU) - 4. Graduated confidence thresholds: - - Confirmed by both views: keep at PER_TTA_CONF_BOTH (0.50) - - Original-only: keep at PER_TTA_CONF_ORIG (0.60) - - Flip-only: keep at PER_TTA_CONF_FLIP (0.75) - 5. Hard NMS on merged result - 6. Sanity filters + safety ceiling - 7. Pose FP filter + box refinement (if time allows) - """ - oh, ow = image_bgr.shape[:2] - t_start = time.monotonic() + while True: + valid_mask = active & (scores >= self.score_threshold) + if not valid_mask.any(): + break + valid_idx = np.where(valid_mask)[0] + m_local = valid_idx[int(np.argmax(scores[valid_idx]))] - # Frame quality gating - blur_score = self._frame_blur_score(image_bgr) - is_blurry = blur_score < PER_BLUR_THRESHOLD + keep_idx.append(int(m_local)) + keep_scores.append(float(scores[m_local])) + active[m_local] = False - # Pass 1: original image - boxes_orig, confs_orig = self._per_run_pass(image_bgr, PER_TTA_CONF_BOTH) + # IoU of m_local against all still-active boxes + others = np.where(active)[0] + if others.size == 0: + break + ax1 = np.maximum(boxes[m_local, 0], boxes[others, 0]) + ay1 = np.maximum(boxes[m_local, 1], boxes[others, 1]) + ax2 = np.minimum(boxes[m_local, 2], boxes[others, 2]) + ay2 = np.minimum(boxes[m_local, 3], boxes[others, 3]) + inter_w = np.clip(ax2 - ax1, a_min=0.0, a_max=None) + inter_h = np.clip(ay2 - ay1, a_min=0.0, a_max=None) + inter = inter_w * inter_h + area_m = max(0.0, (boxes[m_local, 2] - boxes[m_local, 0])) * \ + max(0.0, (boxes[m_local, 3] - boxes[m_local, 1])) + area_o = ( + np.clip(boxes[others, 2] - boxes[others, 0], a_min=0.0, a_max=None) * + np.clip(boxes[others, 3] - boxes[others, 1], a_min=0.0, a_max=None) + ) + union = area_m + area_o - inter + iou = np.where(union > 0.0, inter / union, 0.0) + + decay = np.exp(-(iou * iou) / self.soft_nms_sigma) + scores[others] = scores[others] * decay + + return [ + ( + float(boxes[i, 0]), + float(boxes[i, 1]), + float(boxes[i, 2]), + float(boxes[i, 3]), + float(s), + cls_ids[i], + ) + for i, s in zip(keep_idx, keep_scores) + ] - # Pass 2: horizontal flip - flipped = cv2.flip(image_bgr, 1) - boxes_flip, confs_flip = self._per_run_pass(flipped, PER_TTA_CONF_BOTH) - if len(boxes_flip) > 0: - boxes_flip[:, 0], boxes_flip[:, 2] = ( - ow - boxes_flip[:, 2], ow - boxes_flip[:, 0]) + # ---------------------------------------------------------------- inference + def _infer_single(self, image_bgr: ndarray) -> list[BoundingBox]: + inp, (orig_h, orig_w, scale, dx, dy) = self._preprocess(image_bgr) + out = self.session.run(None, {self.input_name: inp})[0] + pred = self._normalize_predictions(out) - if len(boxes_orig) == 0 and len(boxes_flip) == 0: + if pred.shape[1] < 5: return [] - # TTA consensus: match boxes across views - matched, unmatched_o, unmatched_f = self._match_boxes_iou( - boxes_orig, boxes_flip, PER_TTA_MATCH_IOU) - - # Build merged result with graduated thresholds - merged_b = [] - merged_s = [] - - # Confirmed by both views: keep original box, use max confidence, threshold=0.50 - for i_o, i_f, iou in matched: - conf = max(float(confs_orig[i_o]), float(confs_flip[i_f])) - if conf >= PER_TTA_CONF_BOTH: - merged_b.append(boxes_orig[i_o]) - merged_s.append(conf) - - # Original-only: need higher confidence (0.60) - for i_o in unmatched_o: - if confs_orig[i_o] >= PER_TTA_CONF_ORIG: - merged_b.append(boxes_orig[i_o]) - merged_s.append(float(confs_orig[i_o])) - - # Flip-only: strict threshold (0.75) — flip-only detections are likely FP - for i_f in unmatched_f: - if confs_flip[i_f] >= PER_TTA_CONF_FLIP: - merged_b.append(boxes_flip[i_f]) - merged_s.append(float(confs_flip[i_f])) - - if not merged_b: + boxes = pred[:, :4] + cls_scores = pred[:, 4:] + if cls_scores.shape[1] == 0: return [] - merged_b = np.array(merged_b) - merged_s = np.array(merged_s) - - # Hard NMS on merged result (no soft-NMS — no confidence decay) - keep = _nms_per_class_boost( - merged_b, merged_s, - np.zeros(len(merged_s), dtype=int), # single class - iou_thr=PER_NMS_IOU) - merged_b, merged_s = keep[0], keep[1] + cls_ids = np.argmax(cls_scores, axis=1) + confs = np.max(cls_scores, axis=1) + keep = confs >= self.conf_threshold - # Safety ceiling - if len(merged_s) > PER_MAX_DET: - top_idx = np.argsort(merged_s)[-PER_MAX_DET:] - merged_b = merged_b[top_idx] - merged_s = merged_s[top_idx] + boxes = boxes[keep] + confs = confs[keep] + cls_ids = cls_ids[keep] - if len(merged_b) == 0: + if boxes.shape[0] == 0: return [] - # Blur confidence penalty - if is_blurry: - merged_s = merged_s * PER_BLUR_CONF_PENALTY - - # Perspective scaling penalty - merged_s = self._perspective_penalty(merged_b, merged_s, oh) - - # Final confidence floor (catches blur/perspective decay edge cases) - keep_mask = merged_s >= PER_TTA_CONF_BOTH - merged_b = merged_b[keep_mask] - merged_s = merged_s[keep_mask] - - # Sanity filters - img_area = float(oh * ow) - out = [] - for i in range(len(merged_b)): - bw = merged_b[i, 2] - merged_b[i, 0] - bh = merged_b[i, 3] - merged_b[i, 1] - if bw < PER_MIN_WH or bh < PER_MIN_WH: - continue - area = bw * bh - if area < PER_MIN_AREA: - continue - if max(bw, bh) / max(min(bw, bh), 1e-6) > PER_MAX_ASPECT: - continue - if area / img_area > PER_MAX_AREA_RATIO: - continue - b = merged_b[i] - # Shrink box 15% toward center to tighten fit (our boxes avg 57% larger than top miners') - cx = (b[0] + b[2]) / 2.0 - cy = (b[1] + b[3]) / 2.0 - bw2 = (b[2] - b[0]) * 0.85 / 2.0 - bh2 = (b[3] - b[1]) * 0.85 / 2.0 - out.append(BoundingBox( - x1=max(0, min(ow, int(cx - bw2))), - y1=max(0, min(oh, int(cy - bh2))), - x2=max(0, min(ow, int(cx + bw2))), - y2=max(0, min(oh, int(cy + bh2))), - cls_id=0, - conf=max(0.0, min(1.0, float(merged_s[i]))), - )) - - # Pose FP filter + box refinement (only if time budget allows) - if time.monotonic() - t_start < PER_RTF_BUDGET * 0.85: - out = self._pose_filter_refine(out, image_bgr) - - return out - - # ── Element detection (stack frame inspection) ────────────────────────── - _CHALLENGE_TYPE_MAP = {2: 'person', 12: 'vehicle'} - - def _detect_element_hint(self) -> str: - """Detect whether this request is for person or vehicle. - - Reads challenge_type_id from the chute template predict() metadata - via stack frame inspection. Returns 'person', 'vehicle', or 'both'. - """ - frame = None - try: - frame = inspect.currentframe() - for _ in range(10): - frame = frame.f_back - if frame is None: - break - meta = frame.f_locals.get('metadata') - if isinstance(meta, dict) and 'challenge_type_id' in meta: - ct_id = meta['challenge_type_id'] - hint = self._CHALLENGE_TYPE_MAP.get(ct_id) - if hint: - return hint - return 'both' - except Exception: - pass - finally: - del frame - return 'both' - - # ── Unified inference ─────────────────────────────────────────────────── - - def _infer_single(self, image_bgr: ndarray, element_hint: str = 'both') -> list[BoundingBox]: - self._cached_pose_data = None # reset before each frame - - if element_hint == 'person': - return self._infer_person(image_bgr) - - if element_hint == 'vehicle': - # Run vehicle detection + parts confirmation with empty person_boxes. - # Plate/headlight/window checks fire normally; driver/rider overlap - # check finds no matches (boost=0) but doesn't suppress. - vehicle_boxes = self._infer_vehicle(image_bgr) - return self._vehicle_parts_confirm(vehicle_boxes, [], image_bgr) - - # Fallback: run both (original behavior) - if ENABLE_PARALLEL: - veh_future = self._executor.submit(self._infer_vehicle, image_bgr) - per_future = self._executor.submit(self._infer_person, image_bgr) - vehicle_boxes = veh_future.result() - person_boxes = per_future.result() - else: - vehicle_boxes = self._infer_vehicle(image_bgr) - person_boxes = self._infer_person(image_bgr) - - # Vehicle parts confirmation: cross-reference with person detections - vehicle_boxes = self._vehicle_parts_confirm( - vehicle_boxes, person_boxes, image_bgr) - - return vehicle_boxes + person_boxes - - - # -- Replay buffer ------------------------------------------------------- - REPLAY_DIR = Path("/home/miner/replay_buffer") - REPLAY_MAX = 100 - - def _replay_save(self, batch_images, results): - try: - ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") - query_dir = self.REPLAY_DIR / ts - query_dir.mkdir(parents=True, exist_ok=True) - - for i, img in enumerate(batch_images): - cv2.imwrite(str(query_dir / f"img_{i:03d}.jpg"), img, - [cv2.IMWRITE_JPEG_QUALITY, 95]) - - preds = [] - for r in results: - preds.append({ - "frame_id": r.frame_id, - "boxes": [b.model_dump() for b in r.boxes], - }) - meta = { - "timestamp": ts, - "num_images": len(batch_images), - "image_shapes": [list(img.shape) for img in batch_images], - "predictions": preds, - } - (query_dir / "meta.json").write_text(json.dumps(meta, indent=2)) - self._replay_prune() - except Exception: - pass - - def _replay_prune(self): - try: - dirs = sorted( - [d for d in self.REPLAY_DIR.iterdir() if d.is_dir()], - key=lambda d: d.name, + # Undo letterbox: model coords -> remove pad -> divide by scale -> + # original image coords + dets: list[tuple[float, float, float, float, float, int]] = [] + for i in range(boxes.shape[0]): + cx, cy, bw, bh = boxes[i].tolist() + x1 = (cx - bw / 2.0 - dx) / scale + y1 = (cy - bh / 2.0 - dy) / scale + x2 = (cx + bw / 2.0 - dx) / scale + y2 = (cy + bh / 2.0 - dy) / scale + dets.append((x1, y1, x2, y2, float(confs[i]), int(cls_ids[i]))) + + dets = self._soft_nms(dets) + + out_boxes: list[BoundingBox] = [] + for x1, y1, x2, y2, conf, cls_id in dets: + ix1 = max(0, min(orig_w, math.floor(x1))) + iy1 = max(0, min(orig_h, math.floor(y1))) + ix2 = max(0, min(orig_w, math.ceil(x2))) + iy2 = max(0, min(orig_h, math.ceil(y2))) + out_boxes.append( + BoundingBox( + x1=ix1, + y1=iy1, + x2=ix2, + y2=iy2, + cls_id=cls_id, + conf=max(0.0, min(1.0, conf)), + ) ) - if len(dirs) > self.REPLAY_MAX: - import shutil - for old in dirs[: len(dirs) - self.REPLAY_MAX]: - shutil.rmtree(old, ignore_errors=True) - except Exception: - pass + return out_boxes + # ---------------------------------------------------------------- entry def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: - t_start = time.perf_counter() - - # Detect element type from caller metadata - element_hint = self._detect_element_hint() - t_setup = time.perf_counter() - dt_setup = (t_setup - t_start) * 1000 - - _lat_logger.info( - "REQUEST batch=%d hint=%s setup=%.1fms", - len(batch_images), element_hint, dt_setup, - ) - results: list[TVFrameResult] = [] for idx, image in enumerate(batch_images): - t_img = time.perf_counter() - boxes = self._infer_single(image, element_hint=element_hint) - t_post = time.perf_counter() - dt_infer = (t_post - t_img) * 1000 - + boxes = self._infer_single(image) keypoints = [(0, 0) for _ in range(max(0, int(n_keypoints)))] - results.append(TVFrameResult( - frame_id=offset + idx, boxes=boxes, keypoints=keypoints, - )) - dt_post = (time.perf_counter() - t_post) * 1000 - - if idx < 3 or idx == len(batch_images) - 1: - _lat_logger.info( - " IMG %d/%d boxes=%d infer=%.1fms post=%.1fms shape=%s", - idx, len(batch_images), len(boxes), dt_infer, dt_post, - image.shape, + results.append( + TVFrameResult( + frame_id=offset + idx, + boxes=boxes, + keypoints=keypoints, ) - - t_done = time.perf_counter() - dt_total = (t_done - t_start) * 1000 - total_boxes = sum(len(r.boxes) for r in results) - - _lat_logger.info( - "DONE batch=%d boxes=%d total=%.1fms setup=%.1fms hint=%s", - len(batch_images), total_boxes, dt_total, dt_setup, element_hint, - ) - logger.info(f"[miner] predict_batch: {len(batch_images)} images, " - f"{total_boxes} total boxes, {dt_total:.0f}ms (hint={element_hint})") - - threading.Thread( - target=self._replay_save, - args=(batch_images, results), - daemon=True, - ).start() - + ) return results -# Miner v3.19 — 1-pass vehicle + CLAHE pass + parts_confirm fix — element detection + per-step timing — background TRT engine build + CUDA-first fallback 20260402