|
|
import base64 |
|
|
import io |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
from ultralytics import YOLO |
|
|
import torch |
|
|
import os |
|
|
import time |
|
|
class EndpointHandler: |
|
|
def __init__(self, path=""): |
|
|
model_path = "yolo11n-seg.pt" |
|
|
|
|
|
if torch.backends.mps.is_available(): |
|
|
self.device = "mps" |
|
|
elif torch.cuda.is_available(): |
|
|
self.device = "cuda" |
|
|
else: |
|
|
self.device = "cpu" |
|
|
|
|
|
|
|
|
self.model = YOLO(model_path) |
|
|
print(f"✅ Model loaded on {self.device}") |
|
|
|
|
|
def __call__(self, data): |
|
|
""" |
|
|
Expected input: |
|
|
{ |
|
|
"inputs": <base64 image string> |
|
|
} |
|
|
""" |
|
|
if "inputs" not in data: |
|
|
return {"error": "No input image provided"} |
|
|
|
|
|
|
|
|
img_b64 = data["inputs"] |
|
|
img_bytes = base64.b64decode(img_b64) |
|
|
image = Image.open(io.BytesIO(img_bytes)).convert("RGB") |
|
|
image = np.array(image) |
|
|
t0 = time.perf_counter() |
|
|
|
|
|
results = self.model(image, device=self.device, verbose=False)[0] |
|
|
elapsed = time.perf_counter() - t0 |
|
|
detections = [] |
|
|
if results.masks is not None: |
|
|
for box, mask, conf, cls, poly in zip( |
|
|
results.boxes.xyxy, |
|
|
results.masks.data, |
|
|
results.boxes.conf, |
|
|
results.boxes.cls, |
|
|
results.masks.xy |
|
|
): |
|
|
x1, y1, x2, y2 = map(int, box.tolist()) |
|
|
poly_np = poly.tolist() |
|
|
detections.append({ |
|
|
"box": [x1, y1, x2, y2], |
|
|
"confidence": float(conf), |
|
|
"class_id": int(cls), |
|
|
"polygon": poly_np |
|
|
|
|
|
}) |
|
|
else: |
|
|
for box, conf, cls in zip(results.boxes.xyxy, |
|
|
results.boxes.conf, |
|
|
results.boxes.cls): |
|
|
x1, y1, x2, y2 = map(int, box.tolist()) |
|
|
detections.append({ |
|
|
"box": [x1, y1, x2, y2], |
|
|
"confidence": float(conf), |
|
|
"class_id": int(cls), |
|
|
"polygon": None |
|
|
}) |
|
|
|
|
|
return {"detections": detections,"model_latency":elapsed} |