shelf_demo / processor.py
OverMind0's picture
Upload 3 files
11306a6 verified
# -*- coding: utf-8 -*-
"""Shelf detection and grouping utilities."""
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Tuple, Dict, Any
import numpy as np
from PIL import Image, ImageDraw
@dataclass
class ShelfMetadata:
shelf_id: int
num_items: int
y_range: Tuple[int, int]
confidence: float
status: str
class ShelfInventoryProcessor:
def __init__(
self,
model,
overlap_threshold: float = 0.5,
min_box_height: int = 20,
min_items_per_shelf: int = 8,
merge_overlap_threshold: float = 0.3,
) -> None:
self.model = model
self.overlap_threshold = overlap_threshold
self.min_box_height = min_box_height
self.min_items_per_shelf = min_items_per_shelf
self.merge_overlap_threshold = merge_overlap_threshold
# ---------- Geometry Utilities ----------
@staticmethod
def vertical_overlap(range1: Tuple[float, float], range2: Tuple[float, float]) -> float:
inter = min(range1[1], range2[1]) - max(range1[0], range2[0])
if inter <= 0:
return 0.0
h1 = range1[1] - range1[0]
return inter / h1 if h1 > 0 else 0.0
# ---------- Inference ----------
def run_inference(self, image: Image.Image) -> Tuple[np.ndarray | None, Image.Image, ImageDraw.ImageDraw]:
results = self.model.predict(image, verbose=False)[0]
img = image.convert("RGB")
draw = ImageDraw.Draw(img)
if not results.boxes:
return None, img, draw
boxes = results.boxes.xyxy.cpu().numpy()
boxes = boxes[np.argsort(boxes[:, 1])] # top → bottom
return boxes, img, draw
# ---------- Initial Shelf Grouping ----------
def group_boxes_into_shelves(self, boxes: np.ndarray) -> List[List[np.ndarray]]:
shelves: List[List[np.ndarray]] = []
for box in boxes:
x1, y1, x2, y2 = box
box_h = y2 - y1
if box_h < self.min_box_height:
continue
matched = False
for shelf in shelves:
s_y1 = np.median([b[1] for b in shelf])
s_y2 = np.median([b[3] for b in shelf])
inter = min(y2, s_y2) - max(y1, s_y1)
overlap_ratio = inter / box_h if box_h > 0 else 0
if overlap_ratio > self.overlap_threshold:
shelf.append(box)
matched = True
break
if not matched:
shelves.append([box])
return shelves
# ---------- Shelf Object Builder ----------
def build_shelf_objects(self, shelves: List[List[np.ndarray]]) -> List[Dict[str, Any]]:
shelf_objs: List[Dict[str, Any]] = []
for shelf in shelves:
ys = [b[1] for b in shelf] + [b[3] for b in shelf]
shelf_objs.append({
"boxes": shelf,
"y_range": (min(ys), max(ys)),
})
return shelf_objs
# ---------- Post-processing Merge ----------
def merge_weak_shelves(self, shelf_objs: List[Dict[str, Any]]) -> List[List[np.ndarray]]:
merged: List[List[np.ndarray]] = []
used = [False] * len(shelf_objs)
for i in range(len(shelf_objs)):
if used[i]:
continue
cur_boxes = shelf_objs[i]["boxes"]
cur_range = shelf_objs[i]["y_range"]
for j in range(i + 1, len(shelf_objs)):
if used[j]:
continue
overlap = self.vertical_overlap(cur_range, shelf_objs[j]["y_range"])
if (
overlap > self.merge_overlap_threshold
and (
len(cur_boxes) < self.min_items_per_shelf
or len(shelf_objs[j]["boxes"]) < self.min_items_per_shelf
)
):
cur_boxes.extend(shelf_objs[j]["boxes"])
used[j] = True
merged.append(cur_boxes)
used[i] = True
return merged
# ---------- Annotation & Metadata ----------
def annotate_and_build_metadata(
self,
shelves: List[List[np.ndarray]],
draw: ImageDraw.ImageDraw,
) -> Tuple[List[np.ndarray], List[ShelfMetadata], List[Dict[str, Any]]]:
final_boxes: List[np.ndarray] = []
shelf_metadata: List[ShelfMetadata] = []
object_metadata: List[Dict[str, Any]] = []
avg_items = float(np.mean([len(s) for s in shelves])) if shelves else 1.0
box_counter = 0
for shelf_id, shelf in enumerate(shelves, start=1):
ys = [b[1] for b in shelf] + [b[3] for b in shelf]
min_y, max_y = min(ys), max(ys)
num_items = len(shelf)
confidence = round(num_items / avg_items, 2)
shelf_metadata.append(
ShelfMetadata(
shelf_id=shelf_id,
num_items=num_items,
y_range=(int(min_y), int(max_y)),
confidence=confidence,
status="stable" if confidence >= 0.5 else "unstable",
)
)
for b in shelf:
draw.rectangle([b[0], b[1], b[2], b[3]], outline="red", width=2)
draw.text((b[0], b[1] - 10), f"S{shelf_id}", fill="red")
final_boxes.append(b)
object_metadata.append(
{
"box_id": box_counter,
"shelf_id": shelf_id,
"box": [int(v) for v in b],
}
)
box_counter += 1
return final_boxes, shelf_metadata, object_metadata
# ---------- Crop Utilities ----------
def crop_annotated_image_by_object(
self,
annotated_img: Image.Image,
boxes: List[np.ndarray],
box_id: int | None = None,
padding: int = 5,
) -> Image.Image | Dict[int, Image.Image]:
width, height = annotated_img.size
def _safe_crop(x1, y1, x2, y2):
x1 = max(0, int(x1 - padding))
y1 = max(0, int(y1 - padding))
x2 = min(width, int(x2 + padding))
y2 = min(height, int(y2 + padding))
return annotated_img.crop((x1, y1, x2, y2))
if box_id is not None:
if box_id < 0 or box_id >= len(boxes):
raise IndexError(f"Box ID {box_id} out of range")
x1, y1, x2, y2 = boxes[box_id]
return _safe_crop(x1, y1, x2, y2)
cropped: Dict[int, Image.Image] = {}
for i, (x1, y1, x2, y2) in enumerate(boxes):
cropped[i] = _safe_crop(x1, y1, x2, y2)
return cropped
# ---------- Run Full Pipeline ----------
def run(self, image: Image.Image):
boxes, img, draw = self.run_inference(image)
if boxes is None:
return [], [], [], 0, img
shelves = self.group_boxes_into_shelves(boxes)
shelf_objs = self.build_shelf_objects(shelves)
merged_shelves = self.merge_weak_shelves(shelf_objs)
final_boxes, shelf_metadata, object_metadata = self.annotate_and_build_metadata(
merged_shelves, draw
)
return final_boxes, shelf_metadata, object_metadata, len(merged_shelves), img