|
|
""" |
|
|
Hugging Face Inference Endpoint Custom Handler |
|
|
Handles inference for multiple models: |
|
|
- business/finishing: YOLO classification models |
|
|
- rdd: YOLO road damage detection (object detection with bounding boxes) |
|
|
- surfaceai: EfficientNetV2 models for surface type, road type, and quality classification |
|
|
""" |
|
|
|
|
|
import base64 |
|
|
import io |
|
|
from typing import Any, Dict, List, Tuple |
|
|
from PIL import Image |
|
|
from ultralytics import YOLO |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from torchvision import transforms |
|
|
from torchvision.models import efficientnet_v2_s |
|
|
|
|
|
|
|
|
class EfficientNetClassifier: |
|
|
"""Wrapper for EfficientNetV2 classification models.""" |
|
|
|
|
|
def __init__(self, model_path: str, device: str = "cpu"): |
|
|
checkpoint = torch.load(model_path, map_location=device, weights_only=False) |
|
|
self.class_to_idx = checkpoint["class_to_idx"] |
|
|
self.idx_to_class = {v: k for k, v in self.class_to_idx.items()} |
|
|
self.num_classes = len(self.class_to_idx) |
|
|
self.is_regression = checkpoint.get("is_regression", False) |
|
|
self.device = device |
|
|
|
|
|
|
|
|
output_size = 1 if self.is_regression else self.num_classes |
|
|
|
|
|
|
|
|
self.model = efficientnet_v2_s(weights=None) |
|
|
self.model.classifier = nn.Sequential( |
|
|
nn.Dropout(p=0.2, inplace=True), |
|
|
nn.Linear(self.model.classifier[1].in_features, output_size) |
|
|
) |
|
|
self.model.load_state_dict(checkpoint["model_state_dict"]) |
|
|
self.model.to(device) |
|
|
self.model.eval() |
|
|
|
|
|
|
|
|
self.transform = transforms.Compose([ |
|
|
transforms.Resize((384, 384)), |
|
|
transforms.ToTensor(), |
|
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
|
|
]) |
|
|
|
|
|
def predict(self, image: Image.Image) -> Tuple[str, int, float, Dict[str, float]]: |
|
|
"""Run inference and return class, id, confidence, and all probabilities.""" |
|
|
image = image.convert("RGB") |
|
|
input_tensor = self.transform(image).unsqueeze(0).to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model(input_tensor) |
|
|
|
|
|
if self.is_regression: |
|
|
|
|
|
raw_score = float(outputs[0, 0]) |
|
|
|
|
|
min_idx = min(self.idx_to_class.keys()) |
|
|
max_idx = max(self.idx_to_class.keys()) |
|
|
score = max(min_idx, min(max_idx, raw_score)) |
|
|
class_id = int(round(score)) |
|
|
|
|
|
if class_id not in self.idx_to_class: |
|
|
class_id = min(self.idx_to_class.keys(), key=lambda x: abs(x - score)) |
|
|
class_name = self.idx_to_class[class_id] |
|
|
|
|
|
|
|
|
all_probs = {} |
|
|
for idx, name in self.idx_to_class.items(): |
|
|
distance = abs(idx - score) |
|
|
all_probs[name] = max(0, 1 - distance * 0.25) |
|
|
|
|
|
return class_name, class_id, score, all_probs |
|
|
else: |
|
|
|
|
|
probs = F.softmax(outputs, dim=1)[0] |
|
|
top_prob, top_idx = torch.max(probs, 0) |
|
|
top_class_id = int(top_idx) |
|
|
top_class_name = self.idx_to_class[top_class_id] |
|
|
top_confidence = float(top_prob) |
|
|
|
|
|
all_probs = { |
|
|
self.idx_to_class[i]: float(probs[i]) |
|
|
for i in range(self.num_classes) |
|
|
} |
|
|
|
|
|
return top_class_name, top_class_id, top_confidence, all_probs |
|
|
|
|
|
|
|
|
class EndpointHandler: |
|
|
def __init__(self, path: str = ""): |
|
|
""" |
|
|
Initialize the handler by loading all models. |
|
|
|
|
|
Args: |
|
|
path: Path to the model directory (provided by HF) |
|
|
""" |
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
|
|
|
self.models = { |
|
|
"business": YOLO(f"{path}/models/business_best.pt"), |
|
|
"finishing": YOLO(f"{path}/models/finishing_best.pt") |
|
|
} |
|
|
|
|
|
|
|
|
self.rdd_model = YOLO(f"{path}/models/rdd/yolo12s_RDD2022_best.pt") |
|
|
|
|
|
|
|
|
self.surfaceai_models = { |
|
|
"surface_type": EfficientNetClassifier(f"{path}/models/surfaceai/surface_type_v1.pt", self.device), |
|
|
"road_type": EfficientNetClassifier(f"{path}/models/surfaceai/road_type_v1.pt", self.device), |
|
|
"quality": { |
|
|
"asphalt": EfficientNetClassifier(f"{path}/models/surfaceai/quality/surface_quality_asphalt_v1.pt", self.device), |
|
|
"concrete": EfficientNetClassifier(f"{path}/models/surfaceai/quality/surface_quality_concrete_v1.pt", self.device), |
|
|
"paving_stones": EfficientNetClassifier(f"{path}/models/surfaceai/quality/surface_quality_paving_stones_v1.pt", self.device), |
|
|
"sett": EfficientNetClassifier(f"{path}/models/surfaceai/quality/surface_quality_sett_v1.pt", self.device), |
|
|
"unpaved": EfficientNetClassifier(f"{path}/models/surfaceai/quality/surface_quality_unpaved_v1.pt", self.device), |
|
|
} |
|
|
} |
|
|
|
|
|
def _decode_image(self, image_input: Any) -> Image.Image: |
|
|
""" |
|
|
Decode image from various input formats. |
|
|
|
|
|
Args: |
|
|
image_input: Base64 string, URL, or raw bytes |
|
|
|
|
|
Returns: |
|
|
PIL Image object |
|
|
""" |
|
|
if isinstance(image_input, str): |
|
|
if image_input.startswith(("http://", "https://")): |
|
|
import requests |
|
|
response = requests.get(image_input, timeout=30) |
|
|
response.raise_for_status() |
|
|
return Image.open(io.BytesIO(response.content)) |
|
|
else: |
|
|
|
|
|
if "base64," in image_input: |
|
|
image_input = image_input.split("base64,")[1] |
|
|
image_data = base64.b64decode(image_input) |
|
|
return Image.open(io.BytesIO(image_data)) |
|
|
elif isinstance(image_input, bytes): |
|
|
return Image.open(io.BytesIO(image_input)) |
|
|
else: |
|
|
raise ValueError(f"Unsupported image input type: {type(image_input)}") |
|
|
|
|
|
def _run_classification(self, model: YOLO, image: Image.Image) -> Dict[str, Any]: |
|
|
"""Run classification inference and return formatted results.""" |
|
|
prediction = model.predict(image, verbose=False)[0] |
|
|
probs = prediction.probs |
|
|
top_class_id = int(probs.top1) |
|
|
top_class_name = prediction.names[top_class_id] |
|
|
top_confidence = float(probs.top1conf) |
|
|
|
|
|
all_probs = { |
|
|
prediction.names[i]: float(probs.data[i]) |
|
|
for i in range(len(probs.data)) |
|
|
} |
|
|
|
|
|
return { |
|
|
"class": top_class_name, |
|
|
"class_id": top_class_id, |
|
|
"confidence": round(top_confidence, 4), |
|
|
"all_probs": {k: round(v, 4) for k, v in all_probs.items()} |
|
|
} |
|
|
|
|
|
def _run_rdd(self, image: Image.Image, conf_threshold: float = 0.25) -> Dict[str, Any]: |
|
|
""" |
|
|
Run Road Damage Detection and return detections with bounding boxes. |
|
|
|
|
|
Returns: |
|
|
{ |
|
|
"detections": [ |
|
|
{ |
|
|
"class": "D00", |
|
|
"class_id": 0, |
|
|
"confidence": 0.85, |
|
|
"bbox": [x1, y1, x2, y2] |
|
|
}, |
|
|
... |
|
|
], |
|
|
"count": 2 |
|
|
} |
|
|
""" |
|
|
prediction = self.rdd_model.predict(image, verbose=False, conf=conf_threshold)[0] |
|
|
detections = [] |
|
|
|
|
|
if prediction.boxes is not None and len(prediction.boxes) > 0: |
|
|
for box in prediction.boxes: |
|
|
class_id = int(box.cls[0]) |
|
|
class_name = prediction.names[class_id] |
|
|
confidence = float(box.conf[0]) |
|
|
bbox = box.xyxy[0].tolist() |
|
|
|
|
|
detections.append({ |
|
|
"class": class_name, |
|
|
"class_id": class_id, |
|
|
"confidence": round(confidence, 4), |
|
|
"bbox": [round(coord, 2) for coord in bbox] |
|
|
}) |
|
|
|
|
|
return { |
|
|
"detections": detections, |
|
|
"count": len(detections) |
|
|
} |
|
|
|
|
|
def _run_efficientnet(self, model: EfficientNetClassifier, image: Image.Image) -> Dict[str, Any]: |
|
|
"""Run EfficientNet classification and return formatted results.""" |
|
|
class_name, class_id, confidence, all_probs = model.predict(image) |
|
|
return { |
|
|
"class": class_name, |
|
|
"class_id": class_id, |
|
|
"confidence": round(confidence, 4), |
|
|
"all_probs": {k: round(v, 4) for k, v in all_probs.items()} |
|
|
} |
|
|
|
|
|
def _run_surfaceai(self, image: Image.Image) -> Dict[str, Any]: |
|
|
""" |
|
|
Run SurfaceAI models for surface type, road type, and quality assessment. |
|
|
|
|
|
Returns: |
|
|
{ |
|
|
"surface_type": { |
|
|
"class": "asphalt", |
|
|
"confidence": 0.92, |
|
|
"all_probs": {...} |
|
|
}, |
|
|
"road_type": { |
|
|
"class": "primary", |
|
|
"confidence": 0.88, |
|
|
"all_probs": {...} |
|
|
}, |
|
|
"surface_quality": { |
|
|
"class": "good", |
|
|
"confidence": 0.75, |
|
|
"all_probs": {...}, |
|
|
"model_used": "asphalt" |
|
|
} |
|
|
} |
|
|
""" |
|
|
results = {} |
|
|
|
|
|
|
|
|
surface_result = self._run_efficientnet( |
|
|
self.surfaceai_models["surface_type"], image |
|
|
) |
|
|
results["surface_type"] = surface_result |
|
|
|
|
|
|
|
|
road_result = self._run_efficientnet( |
|
|
self.surfaceai_models["road_type"], image |
|
|
) |
|
|
results["road_type"] = road_result |
|
|
|
|
|
|
|
|
detected_surface = surface_result["class"].lower() |
|
|
if detected_surface in self.surfaceai_models["quality"]: |
|
|
quality_model = self.surfaceai_models["quality"][detected_surface] |
|
|
quality_result = self._run_efficientnet(quality_model, image) |
|
|
quality_result["model_used"] = detected_surface |
|
|
results["surface_quality"] = quality_result |
|
|
else: |
|
|
|
|
|
quality_model = self.surfaceai_models["quality"]["asphalt"] |
|
|
quality_result = self._run_efficientnet(quality_model, image) |
|
|
quality_result["model_used"] = "asphalt" |
|
|
quality_result["note"] = f"Surface type '{detected_surface}' not recognized, using asphalt model" |
|
|
results["surface_quality"] = quality_result |
|
|
|
|
|
return results |
|
|
|
|
|
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Process inference request. |
|
|
|
|
|
Expected input format: |
|
|
{ |
|
|
"inputs": "<base64_string or URL>", |
|
|
"parameters": { |
|
|
"model": "business" | "finishing" | "both" | "rdd" | "surfaceai" |
|
|
"conf_threshold": 0.25 # optional, for RDD only |
|
|
} |
|
|
} |
|
|
|
|
|
Returns for business/finishing/both: |
|
|
[ |
|
|
{ |
|
|
"business": {"class": "...", "class_id": 0, "confidence": 0.95, "all_probs": {...}}, |
|
|
"finishing": {"class": "...", "class_id": 0, "confidence": 0.92, "all_probs": {...}} |
|
|
} |
|
|
] |
|
|
|
|
|
Returns for rdd: |
|
|
[ |
|
|
{ |
|
|
"detections": [ |
|
|
{"class": "D00", "class_id": 0, "confidence": 0.85, "bbox": [x1, y1, x2, y2]}, |
|
|
... |
|
|
], |
|
|
"count": 2 |
|
|
} |
|
|
] |
|
|
|
|
|
Returns for surfaceai: |
|
|
[ |
|
|
{ |
|
|
"surface_type": {"class": "asphalt", "confidence": 0.92, "all_probs": {...}}, |
|
|
"road_type": {"class": "primary", "confidence": 0.88, "all_probs": {...}}, |
|
|
"surface_quality": {"class": "good", "confidence": 0.75, "all_probs": {...}, "model_used": "asphalt"} |
|
|
} |
|
|
] |
|
|
""" |
|
|
|
|
|
image_input = data.get("inputs") |
|
|
if not image_input: |
|
|
return [{"error": "Missing required field: inputs"}] |
|
|
|
|
|
|
|
|
parameters = data.get("parameters", {}) |
|
|
model_choice = parameters.get("model", "both") |
|
|
|
|
|
try: |
|
|
|
|
|
image = self._decode_image(image_input) |
|
|
|
|
|
|
|
|
if model_choice == "rdd": |
|
|
conf_threshold = parameters.get("conf_threshold", 0.25) |
|
|
return [self._run_rdd(image, conf_threshold)] |
|
|
|
|
|
|
|
|
if model_choice == "surfaceai": |
|
|
return [self._run_surfaceai(image)] |
|
|
|
|
|
|
|
|
if model_choice == "both": |
|
|
models_to_run = ["business", "finishing"] |
|
|
elif model_choice in self.models: |
|
|
models_to_run = [model_choice] |
|
|
else: |
|
|
return [{"error": f"Invalid model choice: {model_choice}. Use 'business', 'finishing', 'both', 'rdd', or 'surfaceai'"}] |
|
|
|
|
|
|
|
|
results = {} |
|
|
for model_name in models_to_run: |
|
|
model = self.models[model_name] |
|
|
results[model_name] = self._run_classification(model, image) |
|
|
|
|
|
return [results] |
|
|
|
|
|
except Exception as e: |
|
|
return [{"error": str(e)}] |
|
|
|