KB Teo
Create space
098c98c
raw
history blame
8.22 kB
from typing import *
from pathlib import Path
import importlib
import albumentations as A
import cv2
from omegaconf import DictConfig, OmegaConf
import numpy as np
from .utils import read_image, standardize, normalize_cdf, normalize_min_max, get_boxes
if importlib.util.find_spec("openvino") is not None:
import openvino as ov
else:
raise ImportError("OpenVINO is not installed.")
class OpenVINOInferencer():
"""For perform inference with OpenVINO.
Args:
path (str | Path): Folder path to exported OpenVINO model. Must contains
model.xml, model.bin, and metadata.json.
device (str): Device to run inference on. Defaults to "AUTO".
cache_dir (str | Path): Cache directory for OpenVINO
"""
def __init__(self,
path: Union[str, Path],
device: str="AUTO",
cache_dir: Union[str, Path, None]=None
) -> None:
if isinstance(path, str):
path = Path(path)
self.model = self._load_model(path, device, cache_dir)
self.metadata = self._load_metadata(path)
# Note: Transformation require Albumentations package
self.transform = A.from_dict(self.metadata["transform"])
self.metadata["expand_offset"] = self._get_expand_offset(self.transform)
# Record input & output blob (key)
self.metadata["input_blob"] = self.model.input(0).get_names().pop()
self.metadata["output_blob"] = self.model.output(0).get_names().pop()
def _load_model(self, path: Path, device: str, cache_dir: Union[str, Path, None]) -> ov.CompiledModel:
xml_path = path / "model.xml"
bin_path = path / "model.bin"
ov_core = ov.Core()
model = ov_core.read_model(xml_path, bin_path)
# Create cache directory
if cache_dir is None:
cache_dir = "cache"
if isinstance(cache_dir, str):
cache_dir = Path(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
ov_core.set_property({"CACHE_DIR": cache_dir})
model = ov_core.compile_model(model=model, device_name=device.upper())
return model
def _load_metadata(self, path: Path) -> DictConfig:
metadata = path / "metadata.json"
metadata = OmegaConf.load(metadata)
metadata = cast(DictConfig, metadata)
return metadata
def _get_expand_offset(self, transform):
is_center_cropped = False
for t in reversed(transform.transforms):
if isinstance(t, A.CenterCrop):
is_center_cropped = True
cropped_h = t.height
cropped_w = t.width
elif isinstance(t, A.Resize) and is_center_cropped:
return (t.height - cropped_h) // 2, (t.width - cropped_w) // 2
def predict(self, image: Union[str, Path, np.ndarray]) -> Dict[str, np.ndarray]:
if isinstance(image, (str, Path)):
image = read_image(image)
# Record input image size
self.metadata["image_shape"] = image.shape[:2]
inputs = self._pre_process(image, self.transform)
predictions = self.model(inputs)
outputs = self._post_processs(predictions, self.metadata)
outputs.update({"image": image})
return outputs
def __call__(self, image: Union[str, Path, np.ndarray]) -> Dict[str, np.ndarray]:
return self.predict(image)
def _pre_process(self, image: np.ndarray, transform=None) -> np.ndarray:
if transform is not None:
image = transform(image=image)["image"]
if len(image.shape) == 3:
# Add batch_size axis
image = np.expand_dims(image, axis=0)
if image.shape[3] == 3:
# Transpose the color_channel axis
# Expected shape: [b, c, h, w]
image = image.transpose(0, 3, 1, 2)
return image
def _post_processs(self, predictions: np.ndarray, metadata: DictConfig) -> Dict[str, np.ndarray]:
predictions = predictions[metadata["output_blob"]]
anomaly_map: np.ndarray = None
pred_label: float = None
pred_mask: float = None
if metadata["task"] == "classification":
pred_score = predictions
else:
anomaly_map = predictions.squeeze()
pred_score = anomaly_map.reshape(-1).max()
if "image_threshold" in metadata:
# Assign anomalous label to predictions with score >= threshold
pred_label = pred_score >= metadata["image_threshold"]
if metadata["task"] == "classification":
_, pred_score = self._normalize(pred_scores=pred_score, metadata=metadata)
else:
if "pixel_threshold" in metadata:
pred_mask = (anomaly_map >= metadata["pixel_threshold"]).astype(np.uint8)
anomaly_map, pred_score = self._normalize(
pred_scores=pred_score,
metadata=metadata,
anomaly_map=anomaly_map
)
if "image_shape" in metadata and anomaly_map.shape != metadata["image_shape"]:
if "expand_offset" in metadata and metadata["expand_offset"] is not None:
anomaly_map = self._expand(anomaly_map, metadata["expand_offset"][0], metadata["expand_offset"][1])
pred_mask = self._expand(pred_mask, metadata["expand_offset"][0], metadata["expand_offset"][1])
h, w = metadata["image_shape"] # Fix: cv2.resize take (w, h) as argument
anomaly_map = cv2.resize(anomaly_map, (w, h))
if pred_mask is not None:
pred_mask = cv2.resize(pred_mask, (w, h))
if metadata["task"] == "detection":
pred_boxes = get_boxes(pred_mask)
box_labels = np.ones(pred_boxes.shape[0])
else:
pred_boxes: np.ndarray | None = None
box_labels: np.ndarray | None = None
return {
"anomaly_map": anomaly_map,
"pred_label": pred_label,
"pred_score": pred_score,
"pred_mask": pred_mask,
"pred_boxes": pred_boxes,
"box_labels": box_labels
}
@staticmethod
def _expand(map, offset_h, offset_w):
h, w = map.shape
if map is not None:
expanded_map = np.zeros((h + offset_h * 2, w + offset_w * 2), dtype=map.dtype)
expanded_map[offset_h:offset_h+h, offset_w:offset_w+w] = map
return expanded_map
@staticmethod
def _normalize(
pred_scores: np.float32,
metadata: DictConfig,
anomaly_map: np.ndarray | None = None
) -> Tuple[Union[np.ndarray, None], float]:
# Min-max normalization
if "min" in metadata and "max" in metadata:
if anomaly_map is not None:
anomaly_map = normalize_min_max(
anomaly_map,
metadata["pixel_threshold"],
metadata["min"],
metadata["max"]
)
pred_scores = normalize_min_max(
pred_scores,
metadata["image_threshold"],
metadata["min"],
metadata["max"]
)
# Standardize pixel scores
if "pixel_mean" in metadata and "pixel_std" in metadata:
if anomaly_map is not None:
anomaly_map = standardize(
anomaly_map,
metadata["pixel_mean"],
metadata["pixel_std"],
center_at=metadata["image_mean"]
)
anomaly_map = normalize_cdf(
anomaly_map,
metadata["pixel_threshold"]
)
# Standardize image scores
if "image_mean" in metadata and "image_std" in metadata:
pred_scores = standardize(
pred_scores,
metadata["image_mean"],
metadata["image_std"]
)
pred_scores = normalize_cdf(
pred_scores,
metadata["image_threshold"]
)
return anomaly_map, float(pred_scores)