| |
| """SN44 crime detection miner — ALFRED ONLY, no TTA, no RF-DETR. |
| |
| v5 (2026-05-04): drops the RF-DETR branch entirely. Component benchmarks showed |
| RF-DETR was ~10× slower than alfred (8.2s vs 0.8s on CPU) and contributed zero |
| observed scoring credit on cid 61709 (alfred alone returned the same 3 correct |
| boxes that the alfred-competitor used to earn 0.8). Goal: get under the 5s |
| validator gate with comfortable margin (target p95 < 2000ms e2e). |
| |
| Single ONNX file expected in path_hf_repo: |
| weights.onnx — alfred yolo26n e2e [1,300,6] in input-pixel coords (1280) |
| |
| Conf threshold 0.52, NMS IoU 0.4, min_box_area 196 — unchanged from v3/v4. |
| All 6 classes routed through alfred (identity remap). |
| """ |
| import math |
| from pathlib import Path |
|
|
| import cv2 |
| import numpy as np |
| import onnxruntime as ort |
| from numpy import ndarray |
| from pydantic import BaseModel |
|
|
|
|
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
|
|
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: list[BoundingBox] |
| keypoints: list[tuple[int, int]] |
|
|
|
|
| class Miner: |
| """Public miner — chute calls predict_batch(...). v5 is alfred-only, |
| single forward pass, no TTA, no RF-DETR.""" |
|
|
| def __init__(self, path_hf_repo) -> None: |
| self.path_hf_repo = Path(path_hf_repo) |
| self.class_names = ["balaclava", "hoodie", "glove", "bat", "spray paint", "graffiti"] |
| self.cls_remap = np.arange(6, dtype=np.int32) |
|
|
| try: |
| ort.preload_dlls() |
| except Exception: |
| pass |
|
|
| sess_options = ort.SessionOptions() |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
| try: |
| self.session = ort.InferenceSession( |
| str(self.path_hf_repo / "weights.onnx"), |
| sess_options=sess_options, |
| providers=["CUDAExecutionProvider", "CPUExecutionProvider"], |
| ) |
| except Exception: |
| self.session = ort.InferenceSession( |
| str(self.path_hf_repo / "weights.onnx"), |
| sess_options=sess_options, |
| providers=["CPUExecutionProvider"], |
| ) |
| self.input_name = self.session.get_inputs()[0].name |
| self.output_names = [o.name for o in self.session.get_outputs()] |
|
|
| self.input_h = 1280 |
| self.input_w = 1280 |
| self.conf_threshold = 0.52 |
| self.iou_thresh = 0.4 |
| self.cross_iou_thresh = 0.7 |
| self.max_det = 150 |
| self.min_box_area = 196 |
| self.min_side = 8 |
| self.max_aspect_ratio = 8.0 |
|
|
| |
| warm = np.zeros((1280, 1280, 3), dtype=np.uint8) |
| for _ in range(2): |
| try: self._infer_single(warm) |
| except Exception: break |
|
|
| def __repr__(self): |
| return (f"CrimeMiner v5 alfred-only(yolo26n@1280, NO TTA) " |
| f"conf>=0.52 iou={self.iou_thresh} min_area={self.min_box_area}") |
|
|
| def _letterbox(self, image): |
| h, w = image.shape[:2] |
| ratio = min(self.input_w / w, self.input_h / h) |
| nw, nh = int(round(w * ratio)), int(round(h * ratio)) |
| if (nw, nh) != (w, h): |
| interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR |
| resized = cv2.resize(image, (nw, nh), interpolation=interp) |
| else: |
| resized = image |
| canvas = np.full((self.input_h, self.input_w, 3), 114, dtype=np.uint8) |
| dy = (self.input_h - nh) // 2 |
| dx = (self.input_w - nw) // 2 |
| canvas[dy:dy+nh, dx:dx+nw] = resized |
| return canvas, ratio, (float(dx), float(dy)) |
|
|
| def _preprocess(self, image_bgr): |
| canvas, ratio, pad = self._letterbox(image_bgr) |
| rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB) |
| x = (rgb.astype(np.float32) / 255.0).transpose(2, 0, 1)[None, ...] |
| |
| |
| return np.ascontiguousarray(x, dtype=np.float32), ratio, pad |
|
|
| @staticmethod |
| def _hard_nms(boxes, scores, iou_thresh): |
| n = len(boxes) |
| if n == 0: return np.array([], dtype=np.intp) |
| order = np.argsort(scores)[::-1] |
| keep, suppressed = [], np.zeros(n, dtype=bool) |
| for i in range(n): |
| idx = order[i] |
| if suppressed[idx]: continue |
| keep.append(int(idx)) |
| bi = boxes[idx] |
| for k in range(i + 1, n): |
| jdx = order[k] |
| if suppressed[jdx]: continue |
| bj = boxes[jdx] |
| xx1, yy1 = max(bi[0], bj[0]), max(bi[1], bj[1]) |
| xx2, yy2 = min(bi[2], bj[2]), min(bi[3], bj[3]) |
| inter = max(0.0, xx2-xx1) * max(0.0, yy2-yy1) |
| ai = (bi[2]-bi[0])*(bi[3]-bi[1]); aj = (bj[2]-bj[0])*(bj[3]-bj[1]) |
| iou = inter / (ai + aj - inter + 1e-7) |
| if iou > iou_thresh: suppressed[jdx] = True |
| return np.array(keep, dtype=np.intp) |
|
|
| def _per_class_hard_nms(self, boxes, scores, cls_ids, iou_thresh): |
| if len(boxes) == 0: return np.array([], dtype=np.intp) |
| all_keep = [] |
| for c in np.unique(cls_ids): |
| mask = cls_ids == c |
| indices = np.where(mask)[0] |
| keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh) |
| all_keep.extend(indices[keep].tolist()) |
| all_keep.sort() |
| return np.array(all_keep, dtype=np.intp) |
|
|
| @staticmethod |
| def _cross_class_dedup(boxes, scores, cls_ids, iou_thresh): |
| n = len(boxes) |
| if n <= 1: return boxes, scores, cls_ids |
| areas = np.maximum(0.0, boxes[:, 2]-boxes[:, 0]) * np.maximum(0.0, boxes[:, 3]-boxes[:, 1]) |
| order = np.lexsort((-scores, -areas)) |
| suppressed = np.zeros(n, dtype=bool); keep = [] |
| for i in order: |
| if suppressed[i]: continue |
| keep.append(int(i)) |
| bi = boxes[i] |
| xx1 = np.maximum(bi[0], boxes[:, 0]); yy1 = np.maximum(bi[1], boxes[:, 1]) |
| xx2 = np.minimum(bi[2], boxes[:, 2]); yy2 = np.minimum(bi[3], boxes[:, 3]) |
| inter = np.maximum(0.0, xx2-xx1) * np.maximum(0.0, yy2-yy1) |
| ai = max(1e-7, float((bi[2]-bi[0])*(bi[3]-bi[1]))) |
| iou = inter / (ai + areas - inter + 1e-7) |
| dup = iou > iou_thresh; dup[i] = False |
| suppressed |= dup |
| kept = np.array(keep, dtype=np.intp) |
| return boxes[kept], scores[kept], cls_ids[kept] |
|
|
| def _infer_single(self, image_bgr): |
| inp, ratio, (dx, dy) = self._preprocess(image_bgr) |
| out = self.session.run(self.output_names, {self.input_name: inp})[0] |
| if out.ndim == 3: out = out[0] |
| confs = out[:, 4].astype(np.float32) |
| keep = confs >= self.conf_threshold |
| if not keep.any(): return [] |
| out = out[keep] |
| boxes = out[:, :4].astype(np.float32).copy() |
| confs = out[:, 4].astype(np.float32) |
| cls_ids = self.cls_remap[out[:, 5].astype(np.int32)] |
| boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / ratio |
| boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / ratio |
| oh, ow = image_bgr.shape[:2] |
| boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, ow - 1) |
| boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, oh - 1) |
| if len(boxes) > 1: |
| keep_idx = self._per_class_hard_nms(boxes, confs, cls_ids, self.iou_thresh) |
| keep_idx = keep_idx[: self.max_det] |
| boxes, confs, cls_ids = boxes[keep_idx], confs[keep_idx], cls_ids[keep_idx] |
| boxes, confs, cls_ids = self._cross_class_dedup(boxes, confs, cls_ids, self.cross_iou_thresh) |
| return self._to_boundingboxes(boxes, confs, cls_ids, ow, oh) |
|
|
| def _to_boundingboxes(self, boxes, confs, cls_ids, orig_w, orig_h): |
| out = [] |
| for i in range(len(boxes)): |
| x1, y1, x2, y2 = boxes[i] |
| ix1 = max(0, min(orig_w, math.floor(x1))) |
| iy1 = max(0, min(orig_h, math.floor(y1))) |
| ix2 = max(0, min(orig_w, math.ceil(x2))) |
| iy2 = max(0, min(orig_h, math.ceil(y2))) |
| if ix2 <= ix1 or iy2 <= iy1: continue |
| bw, bh = ix2 - ix1, iy2 - iy1 |
| if bw * bh < self.min_box_area: continue |
| if min(bw, bh) < self.min_side: continue |
| ar = max(bw / max(bh, 1), bh / max(bw, 1)) |
| if ar > self.max_aspect_ratio: continue |
| out.append(BoundingBox(x1=ix1, y1=iy1, x2=ix2, y2=iy2, cls_id=int(cls_ids[i]), |
| conf=max(0.0, min(1.0, float(confs[i]))))) |
| return out |
|
|
| def predict_batch(self, batch_images, offset, n_keypoints): |
| results = [] |
| for idx, image in enumerate(batch_images): |
| boxes = self._infer_single(image) |
| results.append(TVFrameResult( |
| frame_id=offset + idx, |
| boxes=boxes, |
| keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], |
| )) |
| return results |
|
|