Spaces:
Running
Running
from typing import * | |
from pathlib import Path | |
import importlib | |
import albumentations as A | |
import cv2 | |
from omegaconf import DictConfig, OmegaConf | |
import numpy as np | |
from .utils import read_image, standardize, normalize_cdf, normalize_min_max, get_boxes | |
if importlib.util.find_spec("openvino") is not None: | |
import openvino as ov | |
else: | |
raise ImportError("OpenVINO is not installed.") | |
class OpenVINOInferencer(): | |
"""For perform inference with OpenVINO. | |
Args: | |
path (str | Path): Folder path to exported OpenVINO model. Must contains | |
model.xml, model.bin, and metadata.json. | |
device (str): Device to run inference on. Defaults to "AUTO". | |
cache_dir (str | Path): Cache directory for OpenVINO | |
""" | |
def __init__(self, | |
path: Union[str, Path], | |
device: str="AUTO", | |
cache_dir: Union[str, Path, None]=None | |
) -> None: | |
if isinstance(path, str): | |
path = Path(path) | |
self.model = self._load_model(path, device, cache_dir) | |
self.metadata = self._load_metadata(path) | |
# Note: Transformation require Albumentations package | |
self.transform = A.from_dict(self.metadata["transform"]) | |
self.metadata["expand_offset"] = self._get_expand_offset(self.transform) | |
# Record input & output blob (key) | |
self.metadata["input_blob"] = self.model.input(0).get_names().pop() | |
self.metadata["output_blob"] = self.model.output(0).get_names().pop() | |
def _load_model(self, path: Path, device: str, cache_dir: Union[str, Path, None]) -> ov.CompiledModel: | |
xml_path = path / "model.xml" | |
bin_path = path / "model.bin" | |
ov_core = ov.Core() | |
model = ov_core.read_model(xml_path, bin_path) | |
# Create cache directory | |
if cache_dir is None: | |
cache_dir = "cache" | |
if isinstance(cache_dir, str): | |
cache_dir = Path(cache_dir) | |
cache_dir.mkdir(parents=True, exist_ok=True) | |
ov_core.set_property({"CACHE_DIR": cache_dir}) | |
model = ov_core.compile_model(model=model, device_name=device.upper()) | |
return model | |
def _load_metadata(self, path: Path) -> DictConfig: | |
metadata = path / "metadata.json" | |
metadata = OmegaConf.load(metadata) | |
metadata = cast(DictConfig, metadata) | |
return metadata | |
def _get_expand_offset(self, transform): | |
is_center_cropped = False | |
for t in reversed(transform.transforms): | |
if isinstance(t, A.CenterCrop): | |
is_center_cropped = True | |
cropped_h = t.height | |
cropped_w = t.width | |
elif isinstance(t, A.Resize) and is_center_cropped: | |
return (t.height - cropped_h) // 2, (t.width - cropped_w) // 2 | |
def predict(self, image: Union[str, Path, np.ndarray]) -> Dict[str, np.ndarray]: | |
if isinstance(image, (str, Path)): | |
image = read_image(image) | |
# Record input image size | |
self.metadata["image_shape"] = image.shape[:2] | |
inputs = self._pre_process(image, self.transform) | |
predictions = self.model(inputs) | |
outputs = self._post_processs(predictions, self.metadata) | |
outputs.update({"image": image}) | |
return outputs | |
def __call__(self, image: Union[str, Path, np.ndarray]) -> Dict[str, np.ndarray]: | |
return self.predict(image) | |
def _pre_process(self, image: np.ndarray, transform=None) -> np.ndarray: | |
if transform is not None: | |
image = transform(image=image)["image"] | |
if len(image.shape) == 3: | |
# Add batch_size axis | |
image = np.expand_dims(image, axis=0) | |
if image.shape[3] == 3: | |
# Transpose the color_channel axis | |
# Expected shape: [b, c, h, w] | |
image = image.transpose(0, 3, 1, 2) | |
return image | |
def _post_processs(self, predictions: np.ndarray, metadata: DictConfig) -> Dict[str, np.ndarray]: | |
predictions = predictions[metadata["output_blob"]] | |
anomaly_map: np.ndarray = None | |
pred_label: float = None | |
pred_mask: float = None | |
if metadata["task"] == "classification": | |
pred_score = predictions | |
else: | |
anomaly_map = predictions.squeeze() | |
pred_score = anomaly_map.reshape(-1).max() | |
if "image_threshold" in metadata: | |
# Assign anomalous label to predictions with score >= threshold | |
pred_label = pred_score >= metadata["image_threshold"] | |
if metadata["task"] == "classification": | |
_, pred_score = self._normalize(pred_scores=pred_score, metadata=metadata) | |
else: | |
if "pixel_threshold" in metadata: | |
pred_mask = (anomaly_map >= metadata["pixel_threshold"]).astype(np.uint8) | |
anomaly_map, pred_score = self._normalize( | |
pred_scores=pred_score, | |
metadata=metadata, | |
anomaly_map=anomaly_map | |
) | |
if "image_shape" in metadata and anomaly_map.shape != metadata["image_shape"]: | |
if "expand_offset" in metadata and metadata["expand_offset"] is not None: | |
anomaly_map = self._expand(anomaly_map, metadata["expand_offset"][0], metadata["expand_offset"][1]) | |
pred_mask = self._expand(pred_mask, metadata["expand_offset"][0], metadata["expand_offset"][1]) | |
h, w = metadata["image_shape"] # Fix: cv2.resize take (w, h) as argument | |
anomaly_map = cv2.resize(anomaly_map, (w, h)) | |
if pred_mask is not None: | |
pred_mask = cv2.resize(pred_mask, (w, h)) | |
if metadata["task"] == "detection": | |
pred_boxes = get_boxes(pred_mask) | |
box_labels = np.ones(pred_boxes.shape[0]) | |
else: | |
pred_boxes: np.ndarray | None = None | |
box_labels: np.ndarray | None = None | |
return { | |
"anomaly_map": anomaly_map, | |
"pred_label": pred_label, | |
"pred_score": pred_score, | |
"pred_mask": pred_mask, | |
"pred_boxes": pred_boxes, | |
"box_labels": box_labels | |
} | |
def _expand(map, offset_h, offset_w): | |
h, w = map.shape | |
if map is not None: | |
expanded_map = np.zeros((h + offset_h * 2, w + offset_w * 2), dtype=map.dtype) | |
expanded_map[offset_h:offset_h+h, offset_w:offset_w+w] = map | |
return expanded_map | |
def _normalize( | |
pred_scores: np.float32, | |
metadata: DictConfig, | |
anomaly_map: np.ndarray | None = None | |
) -> Tuple[Union[np.ndarray, None], float]: | |
# Min-max normalization | |
if "min" in metadata and "max" in metadata: | |
if anomaly_map is not None: | |
anomaly_map = normalize_min_max( | |
anomaly_map, | |
metadata["pixel_threshold"], | |
metadata["min"], | |
metadata["max"] | |
) | |
pred_scores = normalize_min_max( | |
pred_scores, | |
metadata["image_threshold"], | |
metadata["min"], | |
metadata["max"] | |
) | |
# Standardize pixel scores | |
if "pixel_mean" in metadata and "pixel_std" in metadata: | |
if anomaly_map is not None: | |
anomaly_map = standardize( | |
anomaly_map, | |
metadata["pixel_mean"], | |
metadata["pixel_std"], | |
center_at=metadata["image_mean"] | |
) | |
anomaly_map = normalize_cdf( | |
anomaly_map, | |
metadata["pixel_threshold"] | |
) | |
# Standardize image scores | |
if "image_mean" in metadata and "image_std" in metadata: | |
pred_scores = standardize( | |
pred_scores, | |
metadata["image_mean"], | |
metadata["image_std"] | |
) | |
pred_scores = normalize_cdf( | |
pred_scores, | |
metadata["image_threshold"] | |
) | |
return anomaly_map, float(pred_scores) |