| """Инференс по полному фото детали кузова. |
| |
| Алгоритм: |
| 1) Вырезаем панель из фона. |
| 2) Скользящим окном (PATCH_SIZE с шагом PATCH_STRIDE) собираем патчи. |
| 3) Прогоняем батчем через сеть -> вероятность "defect" для каждого патча. |
| 4) Аккумулируем вероятности в карту дефектов того же размера, что панель. |
| 5) Возвращаем: вердикт по детали, маску, координаты bounding box'ов дефектов, |
| визуализацию (наложение тепловой карты). |
| |
| Запуск: |
| python -m src.infer --image путь/к/фото.jpg --out runs/result.jpg |
| """ |
| from __future__ import annotations |
| import argparse |
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import cv2 |
| import numpy as np |
| import torch |
| import albumentations as A |
| from albumentations.pytorch import ToTensorV2 |
|
|
| from . import config as C |
| from .model import build_model |
| from .prepare_data import crop_panel, imread_unicode, imwrite_unicode |
|
|
|
|
| _TRANSFORM = A.Compose([ |
| A.Resize(C.IMG_SIZE, C.IMG_SIZE), |
| A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), |
| ToTensorV2(), |
| ]) |
|
|
|
|
| def load_model(checkpoint: Path | str = None, device: torch.device | str = "cpu"): |
| ckpt_path = Path(checkpoint) if checkpoint else C.CHECKPOINTS / "best.pt" |
| state = torch.load(ckpt_path, map_location=device, weights_only=False) |
| from .model import DefectClassifier |
| backbone = state.get("backbone", C.BACKBONE) |
| model = DefectClassifier(backbone=backbone, pretrained=False).to(device) |
| model.load_state_dict(state["model"]) |
| model.eval() |
| return model |
|
|
|
|
| def _slide_coords(h: int, w: int, size: int, stride: int) -> list[tuple[int, int]]: |
| if h < size or w < size: |
| return [(0, 0)] |
| ys = list(range(0, h - size + 1, stride)) |
| xs = list(range(0, w - size + 1, stride)) |
| if ys[-1] != h - size: ys.append(h - size) |
| if xs[-1] != w - size: xs.append(w - size) |
| return [(y, x) for y in ys for x in xs] |
|
|
|
|
| def _to_batch(patches: list[np.ndarray]) -> torch.Tensor: |
| tensors = [_TRANSFORM(image=cv2.cvtColor(p, cv2.COLOR_BGR2RGB))["image"] |
| for p in patches] |
| return torch.stack(tensors, dim=0) |
|
|
|
|
| def predict_image(image_bgr: np.ndarray, model, device, |
| threshold: float = C.DEFECT_THRESHOLD, |
| panel_defect_ratio: float = C.PANEL_DEFECT_RATIO) -> dict[str, Any]: |
| """Возвращает dict с результатом анализа полного фото.""" |
| panel = crop_panel(image_bgr) if C.PANEL_CROP else image_bgr |
| H, W = panel.shape[:2] |
| coords = _slide_coords(H, W, C.PATCH_SIZE, C.PATCH_STRIDE) |
| patches = [panel[y:y + C.PATCH_SIZE, x:x + C.PATCH_SIZE] for y, x in coords] |
| if not patches: |
| patches = [cv2.resize(panel, (C.PATCH_SIZE, C.PATCH_SIZE))] |
| coords = [(0, 0)] |
|
|
| |
| bs = 32 |
| probs = [] |
| with torch.no_grad(): |
| for i in range(0, len(patches), bs): |
| batch = _to_batch(patches[i:i + bs]).to(device) |
| logits = model(batch) |
| p = torch.softmax(logits, dim=1)[:, 1].cpu().numpy() |
| probs.extend(p.tolist()) |
|
|
| |
| heatmap = np.zeros((H, W), dtype=np.float32) |
| weights = np.zeros((H, W), dtype=np.float32) |
| for (y, x), p in zip(coords, probs): |
| ye = min(y + C.PATCH_SIZE, H); xe = min(x + C.PATCH_SIZE, W) |
| heatmap[y:ye, x:xe] += p |
| weights[y:ye, x:xe] += 1.0 |
| heatmap = heatmap / np.maximum(weights, 1e-6) |
|
|
| |
| mask = (heatmap >= threshold).astype(np.uint8) * 255 |
| defect_pixels = int(mask.sum() / 255) |
| defect_ratio = defect_pixels / max(H * W, 1) |
|
|
| |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| boxes = [] |
| for c in contours: |
| if cv2.contourArea(c) < 200: |
| continue |
| x, y, w, h = cv2.boundingRect(c) |
| roi = heatmap[y:y + h, x:x + w] |
| boxes.append({ |
| "x": int(x), "y": int(y), "w": int(w), "h": int(h), |
| "confidence": float(roi.max()), |
| "mean_prob": float(roi.mean()), |
| }) |
|
|
| is_defect = bool(defect_ratio >= panel_defect_ratio and len(boxes) > 0) |
|
|
| return { |
| "is_defect": is_defect, |
| "defect_ratio": float(defect_ratio), |
| "max_prob": float(heatmap.max()), |
| "boxes": boxes, |
| "panel_size": {"h": int(H), "w": int(W)}, |
| "heatmap": heatmap, |
| "panel": panel, |
| } |
|
|
|
|
| def render_visualization(result: dict) -> np.ndarray: |
| """Накладывает тепловую карту и bbox'ы на панель.""" |
| panel = result["panel"].copy() |
| hm = result["heatmap"] |
| hm_norm = np.clip(hm, 0.0, 1.0) |
| colored = cv2.applyColorMap((hm_norm * 255).astype(np.uint8), cv2.COLORMAP_JET) |
| overlay = cv2.addWeighted(panel, 0.6, colored, 0.4, 0) |
|
|
| for b in result["boxes"]: |
| x, y, w, h = b["x"], b["y"], b["w"], b["h"] |
| cv2.rectangle(overlay, (x, y), (x + w, y + h), (0, 0, 255), 3) |
| label = f"{b['confidence']:.2f}" |
| cv2.putText(overlay, label, (x, max(20, y - 8)), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) |
|
|
| verdict = "DEFECT" if result["is_defect"] else "OK" |
| color = (0, 0, 255) if result["is_defect"] else (0, 200, 0) |
| cv2.rectangle(overlay, (0, 0), (320, 60), (0, 0, 0), -1) |
| cv2.putText(overlay, verdict, (12, 44), cv2.FONT_HERSHEY_SIMPLEX, 1.4, color, 3) |
| return overlay |
|
|
|
|
| def main() -> None: |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--image", required=True, type=Path) |
| ap.add_argument("--checkpoint", type=Path, default=None) |
| ap.add_argument("--out", type=Path, default=C.RUNS / "result.jpg") |
| ap.add_argument("--threshold", type=float, default=C.DEFECT_THRESHOLD) |
| args = ap.parse_args() |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model = load_model(args.checkpoint, device) |
| bgr = imread_unicode(args.image) |
| if bgr is None: |
| raise SystemExit(f"Не удалось прочитать {args.image}") |
| res = predict_image(bgr, model, device, threshold=args.threshold) |
|
|
| args.out.parent.mkdir(parents=True, exist_ok=True) |
| imwrite_unicode(args.out, render_visualization(res), [cv2.IMWRITE_JPEG_QUALITY, 90]) |
|
|
| |
| report = {k: v for k, v in res.items() if k not in {"heatmap", "panel"}} |
| print(json.dumps(report, indent=2, ensure_ascii=False)) |
| print(f"\nВизуализация: {args.out}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|