beverage-1 / miner.py
fitleech's picture
Upload folder using huggingface_hub
35f07a7 verified
"""TurboVision beverage detection miner — score-beverage-v3.
YOLO11s @ 1280x1280, 3-class beverage detection (bottle/can/cup),
ONNX with end-to-end NMS baked in (output [1, 300, 6] = x1, y1, x2, y2, conf, cls).
Inference pipeline (v3):
1) Primary forward pass on the full image.
2) Hflip TTA: forward on horizontally-flipped image, transform boxes back.
3) Per-class hard-NMS to merge primary + flip outputs.
4) Cross-class IoU dedup (suppresses same physical object getting two class labels).
5) Consensus-confidence boost: when both views agree on a cluster, take the max
score so true-positives rank higher in the validator's PR curve.
6) Sanity filter (min size, aspect ratio).
"""
from pathlib import Path
import math
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
def __init__(self, path_hf_repo: Path) -> None:
model_path = path_hf_repo / "weights.onnx"
cn_path = model_path.with_name("class_names.txt")
if cn_path.is_file():
self.class_names = [
ln.strip()
for ln in cn_path.read_text(encoding="utf-8").splitlines()
if ln.strip() and not ln.strip().startswith("#")
]
else:
self.class_names = ["cup", "bottle", "can"]
self.cls_remap = np.arange(len(self.class_names), dtype=np.int32)
print("ORT version:", ort.__version__)
try:
ort.preload_dlls()
print("✅ onnxruntime.preload_dlls() success")
except Exception as e:
print(f"⚠️ preload_dlls failed: {e}")
print("ORT available providers BEFORE session:", ort.get_available_providers())
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
try:
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
print("✅ Created ORT session with preferred CUDA provider list")
except Exception as e:
print(f"⚠️ CUDA session creation failed, falling back to CPU: {e}")
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
print("ORT session providers:", self.session.get_providers())
inp = self.session.get_inputs()[0]
self.input_name = inp.name
self.output_names = [o.name for o in self.session.get_outputs()]
self.input_shape = inp.shape
self.input_dtype = np.float16 if "float16" in inp.type else np.float32
self.input_height = self._safe_dim(self.input_shape[2], default=1280)
self.input_width = self._safe_dim(self.input_shape[3], default=1280)
self.conf_thres = 0.20
self.iou_thres = 0.5
self.cross_iou_thresh = 0.7
self.max_det = 300
self.use_tta = True
# Sanity filter — reject obviously bad boxes
self.min_box_area = 6 * 6
self.min_side = 4
self.max_aspect_ratio = 8.0
self.max_box_area_ratio = 0.95
print(f"✅ ONNX loaded: {model_path}")
print(f"✅ providers: {self.session.get_providers()}")
print(f"✅ input: name={self.input_name}, shape={self.input_shape}, dtype={self.input_dtype}")
print(f"✅ classes: {self.class_names}")
print(f"✅ config: conf={self.conf_thres}, iou={self.iou_thres}, "
f"cross_iou={self.cross_iou_thresh}, TTA={self.use_tta}")
def __repr__(self) -> str:
return (
f"ONNXRuntime(session={type(self.session).__name__}, "
f"providers={self.session.get_providers()})"
)
@staticmethod
def _safe_dim(value, default: int) -> int:
return value if isinstance(value, int) and value > 0 else default
def _letterbox(
self,
image: ndarray,
new_shape: tuple[int, int],
color=(114, 114, 114),
) -> tuple[ndarray, float, tuple[float, float]]:
h, w = image.shape[:2]
new_w, new_h = new_shape
ratio = min(new_w / w, new_h / h)
resized_w = int(round(w * ratio))
resized_h = int(round(h * ratio))
if (resized_w, resized_h) != (w, h):
interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
image = cv2.resize(image, (resized_w, resized_h), interpolation=interp)
dw = (new_w - resized_w) / 2.0
dh = (new_h - resized_h) / 2.0
left = int(round(dw - 0.1))
right = int(round(dw + 0.1))
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
padded = cv2.copyMakeBorder(
image, top, bottom, left, right,
borderType=cv2.BORDER_CONSTANT, value=color,
)
return padded, ratio, (dw, dh)
def _preprocess(self, image: ndarray):
orig_h, orig_w = image.shape[:2]
img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(self.input_dtype) / 255.0
img = np.transpose(img, (2, 0, 1))[None, ...]
img = np.ascontiguousarray(img)
return img, ratio, pad, (orig_w, orig_h)
@staticmethod
def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray:
w, h = image_size
boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1)
boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1)
boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1)
boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1)
return boxes
def _filter_sane_boxes(
self,
boxes: np.ndarray,
scores: np.ndarray,
cls_ids: np.ndarray,
orig_size: tuple[int, int],
):
if len(boxes) == 0:
return boxes, scores, cls_ids
orig_w, orig_h = orig_size
image_area = float(orig_w * orig_h)
keep = []
for i, box in enumerate(boxes):
x1, y1, x2, y2 = box.tolist()
bw = x2 - x1
bh = y2 - y1
if bw <= 0 or bh <= 0:
continue
if bw < self.min_side or bh < self.min_side:
continue
area = bw * bh
if area < self.min_box_area:
continue
if area > self.max_box_area_ratio * image_area:
continue
ar = max(bw / max(bh, 1e-6), bh / max(bw, 1e-6))
if ar > self.max_aspect_ratio:
continue
keep.append(i)
if not keep:
return (
np.empty((0, 4), dtype=np.float32),
np.empty((0,), dtype=np.float32),
np.empty((0,), dtype=np.int32),
)
k = np.array(keep, dtype=np.intp)
return boxes[k], scores[k], cls_ids[k]
@staticmethod
def _hard_nms(
boxes: np.ndarray,
scores: np.ndarray,
iou_thresh: float,
) -> np.ndarray:
N = len(boxes)
if N == 0:
return np.array([], dtype=np.intp)
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
order = np.argsort(scores)[::-1]
keep: list[int] = []
suppressed = np.zeros(N, dtype=bool)
for i in range(N):
idx = order[i]
if suppressed[idx]:
continue
keep.append(int(idx))
bi = boxes[idx]
for k in range(i + 1, N):
jdx = order[k]
if suppressed[jdx]:
continue
bj = boxes[jdx]
xx1 = max(bi[0], bj[0])
yy1 = max(bi[1], bj[1])
xx2 = min(bi[2], bj[2])
yy2 = min(bi[3], bj[3])
inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1)
area_i = (bi[2] - bi[0]) * (bi[3] - bi[1])
area_j = (bj[2] - bj[0]) * (bj[3] - bj[1])
iou = inter / (area_i + area_j - inter + 1e-7)
if iou > iou_thresh:
suppressed[jdx] = True
return np.array(keep, dtype=np.intp)
def _per_class_hard_nms(
self,
boxes: np.ndarray,
scores: np.ndarray,
cls_ids: np.ndarray,
iou_thresh: float,
) -> np.ndarray:
if len(boxes) == 0:
return np.array([], dtype=np.intp)
all_keep: list[int] = []
for c in np.unique(cls_ids):
mask = cls_ids == c
indices = np.where(mask)[0]
keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh)
all_keep.extend(indices[keep].tolist())
all_keep.sort()
return np.array(all_keep, dtype=np.intp)
@staticmethod
def _cross_class_dedup(
boxes: np.ndarray,
scores: np.ndarray,
cls_ids: np.ndarray,
iou_thresh: float,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
n = len(boxes)
if n <= 1:
return boxes, scores, cls_ids
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
cls_ids = np.asarray(cls_ids, dtype=np.int32)
areas = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(
0.0, boxes[:, 3] - boxes[:, 1]
)
# Keep larger boxes first, then higher score.
order = np.lexsort((-scores, -areas))
suppressed = np.zeros(n, dtype=bool)
keep: list[int] = []
for i in order:
if suppressed[i]:
continue
keep.append(int(i))
bi = boxes[i]
xx1 = np.maximum(bi[0], boxes[:, 0])
yy1 = np.maximum(bi[1], boxes[:, 1])
xx2 = np.minimum(bi[2], boxes[:, 2])
yy2 = np.minimum(bi[3], boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1])))
union = area_i + areas - inter + 1e-7
iou = inter / union
dup = iou > iou_thresh
dup[i] = False
suppressed |= dup
keep_idx = np.array(keep, dtype=np.intp)
return boxes[keep_idx], scores[keep_idx], cls_ids[keep_idx]
@staticmethod
def _max_score_per_cluster(
coords: np.ndarray,
scores: np.ndarray,
keep_indices: np.ndarray,
iou_thresh: float,
) -> np.ndarray:
n_keep = len(keep_indices)
if n_keep == 0:
return np.array([], dtype=np.float32)
coords = np.asarray(coords, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
out = np.empty(n_keep, dtype=np.float32)
for i in range(n_keep):
idx = keep_indices[i]
bi = coords[idx]
xx1 = np.maximum(bi[0], coords[:, 0])
yy1 = np.maximum(bi[1], coords[:, 1])
xx2 = np.minimum(bi[2], coords[:, 2])
yy2 = np.minimum(bi[3], coords[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = (bi[2] - bi[0]) * (bi[3] - bi[1])
areas_j = (coords[:, 2] - coords[:, 0]) * (coords[:, 3] - coords[:, 1])
iou = inter / (area_i + areas_j - inter + 1e-7)
in_cluster = iou >= iou_thresh
out[i] = float(np.max(scores[in_cluster]))
return out
def _decode_raw_dets(
self,
preds: np.ndarray,
ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int],
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Decode end2end NMS output and return (boxes, scores, cls_ids)
in original image coordinates, after conf-threshold + remap + letterbox-reverse + sanity."""
if preds.ndim == 3 and preds.shape[0] == 1:
preds = preds[0]
if preds.ndim != 2 or preds.shape[1] < 6:
raise ValueError(f"Unexpected ONNX output shape: {preds.shape}")
boxes = preds[:, :4].astype(np.float32)
scores = preds[:, 4].astype(np.float32)
cls_ids = preds[:, 5].astype(np.int32)
valid = (cls_ids >= 0) & (cls_ids < len(self.cls_remap))
boxes, scores, cls_ids = boxes[valid], scores[valid], cls_ids[valid]
cls_ids = self.cls_remap[cls_ids]
keep = scores >= self.conf_thres
boxes = boxes[keep]
scores = scores[keep]
cls_ids = cls_ids[keep]
if len(boxes) == 0:
return (
np.empty((0, 4), dtype=np.float32),
np.empty((0,), dtype=np.float32),
np.empty((0,), dtype=np.int32),
)
pad_w, pad_h = pad
orig_w, orig_h = orig_size
boxes[:, [0, 2]] -= pad_w
boxes[:, [1, 3]] -= pad_h
boxes /= ratio
boxes = self._clip_boxes(boxes, (orig_w, orig_h))
boxes, scores, cls_ids = self._filter_sane_boxes(boxes, scores, cls_ids, orig_size)
return boxes, scores, cls_ids
def _forward(
self, image: np.ndarray
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
x, ratio, pad, orig_size = self._preprocess(image)
out = self.session.run(self.output_names, {self.input_name: x})[0]
return self._decode_raw_dets(out, ratio, pad, orig_size)
def _predict_single(self, image: np.ndarray) -> list[BoundingBox]:
boxes, scores, cls_ids = self._forward(image)
if len(boxes) == 0:
return []
return self._build_results(boxes, scores, cls_ids)
def _predict_tta(self, image: np.ndarray) -> list[BoundingBox]:
"""Hflip TTA: merge primary + flipped via per-class hard-NMS,
then cross-class dedup, with consensus-confidence boost."""
ow = image.shape[1]
b1, s1, c1 = self._forward(image)
flipped = cv2.flip(image, 1)
b2, s2, c2 = self._forward(flipped)
if len(b2):
x1f = ow - b2[:, 2]
x2f = ow - b2[:, 0]
b2 = np.stack([x1f, b2[:, 1], x2f, b2[:, 3]], axis=1)
if len(b1) == 0 and len(b2) == 0:
return []
boxes = np.concatenate([b1, b2], axis=0) if len(b2) else b1
scores = np.concatenate([s1, s2], axis=0) if len(b2) else s1
cls_ids = np.concatenate([c1, c2], axis=0) if len(b2) else c1
keep = self._per_class_hard_nms(boxes, scores, cls_ids, self.iou_thres)
if len(keep) == 0:
return []
keep = keep[: self.max_det]
# Consensus-confidence boost: cluster by IoU and take max score.
boosted = self._max_score_per_cluster(boxes, scores, keep, self.iou_thres)
boxes = boxes[keep]
cls_ids = cls_ids[keep]
scores = boosted
boxes, scores, cls_ids = self._cross_class_dedup(
boxes, scores, cls_ids, self.cross_iou_thresh
)
if len(boxes) == 0:
return []
return self._build_results(boxes, scores, cls_ids)
def _build_results(
self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray
) -> list[BoundingBox]:
results: list[BoundingBox] = []
for box, conf, cls_id in zip(boxes, scores, cls_ids):
x1, y1, x2, y2 = box.tolist()
if x2 <= x1 or y2 <= y1:
continue
results.append(
BoundingBox(
x1=int(math.floor(x1)),
y1=int(math.floor(y1)),
x2=int(math.ceil(x2)),
y2=int(math.ceil(y2)),
cls_id=int(cls_id),
conf=float(conf),
)
)
return results
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
results: list[TVFrameResult] = []
for frame_number_in_batch, image in enumerate(batch_images):
if image is None or not isinstance(image, np.ndarray) or image.ndim != 3:
results.append(
TVFrameResult(
frame_id=offset + frame_number_in_batch,
boxes=[],
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
)
)
continue
if image.dtype != np.uint8:
image = image.astype(np.uint8)
try:
if self.use_tta:
boxes = self._predict_tta(image)
else:
boxes = self._predict_single(image)
except Exception as e:
print(f"⚠️ Inference failed for frame {offset + frame_number_in_batch}: {e}")
boxes = []
results.append(
TVFrameResult(
frame_id=offset + frame_number_in_batch,
boxes=boxes,
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
)
)
return results