|
import os |
|
import platform |
|
import sys |
|
import threading |
|
import time |
|
import urllib.parse |
|
|
|
from os import PathLike |
|
from pathlib import Path |
|
from typing import List, NamedTuple, Optional, Tuple |
|
from tqdm import tqdm |
|
|
|
import numpy as np |
|
from openvino.runtime import Core, Type, get_version |
|
from IPython.display import HTML, Image, display |
|
|
|
import openvino as ov |
|
from openvino.runtime.passes import Manager, MatcherPass, WrapType, Matcher |
|
from openvino.runtime import opset10 as ops |
|
|
|
|
|
|
|
|
|
|
|
|
|
def device_widget(default="AUTO", exclude=None, added=None): |
|
import openvino as ov |
|
import ipywidgets as widgets |
|
|
|
core = ov.Core() |
|
|
|
supported_devices = core.available_devices + ["AUTO"] |
|
exclude = exclude or [] |
|
if exclude: |
|
for ex_device in exclude: |
|
if ex_device in supported_devices: |
|
supported_devices.remove(ex_device) |
|
|
|
added = added or [] |
|
if added: |
|
for add_device in added: |
|
if add_device not in supported_devices: |
|
supported_devices.append(add_device) |
|
|
|
device = widgets.Dropdown( |
|
options=supported_devices, |
|
value=default, |
|
description="Device:", |
|
disabled=False, |
|
) |
|
return device |
|
|
|
|
|
def quantization_widget(default=True): |
|
import ipywidgets as widgets |
|
|
|
to_quantize = widgets.Checkbox( |
|
value=default, |
|
description="Quantization", |
|
disabled=False, |
|
) |
|
|
|
return to_quantize |
|
|
|
|
|
def pip_install(*args): |
|
import subprocess |
|
|
|
cli_args = [] |
|
for arg in args: |
|
cli_args.extend(str(arg).split(" ")) |
|
subprocess.run([sys.executable, "-m", "pip", "install", *cli_args], shell=(platform.system() == "Windows"), check=True) |
|
|
|
|
|
def load_image(path: str) -> np.ndarray: |
|
""" |
|
Loads an image from `path` and returns it as BGR numpy array. `path` |
|
should point to an image file, either a local filename or a url. The image is |
|
not stored to the filesystem. Use the `download_file` function to download and |
|
store an image. |
|
|
|
:param path: Local path name or URL to image. |
|
:return: image as BGR numpy array |
|
""" |
|
import cv2 |
|
import requests |
|
|
|
if path.startswith("http"): |
|
|
|
|
|
response = requests.get(path, headers={"User-Agent": "Mozilla/5.0"}) |
|
array = np.asarray(bytearray(response.content), dtype="uint8") |
|
image = cv2.imdecode(array, -1) |
|
else: |
|
image = cv2.imread(path) |
|
return image |
|
|
|
|
|
def download_file( |
|
url: PathLike, |
|
filename: PathLike = None, |
|
directory: PathLike = None, |
|
show_progress: bool = True, |
|
silent: bool = False, |
|
timeout: int = 10, |
|
) -> PathLike: |
|
""" |
|
Download a file from a url and save it to the local filesystem. The file is saved to the |
|
current directory by default, or to `directory` if specified. If a filename is not given, |
|
the filename of the URL will be used. |
|
|
|
:param url: URL that points to the file to download |
|
:param filename: Name of the local file to save. Should point to the name of the file only, |
|
not the full path. If None the filename from the url will be used |
|
:param directory: Directory to save the file to. Will be created if it doesn't exist |
|
If None the file will be saved to the current working directory |
|
:param show_progress: If True, show an TQDM ProgressBar |
|
:param silent: If True, do not print a message if the file already exists |
|
:param timeout: Number of seconds before cancelling the connection attempt |
|
:return: path to downloaded file |
|
""" |
|
from tqdm.notebook import tqdm_notebook |
|
import requests |
|
|
|
filename = filename or Path(urllib.parse.urlparse(url).path).name |
|
chunk_size = 16384 |
|
|
|
filename = Path(filename) |
|
if len(filename.parts) > 1: |
|
raise ValueError( |
|
"`filename` should refer to the name of the file, excluding the directory. " |
|
"Use the `directory` parameter to specify a target directory for the downloaded file." |
|
) |
|
|
|
|
|
if directory is not None: |
|
directory = Path(directory) |
|
directory.mkdir(parents=True, exist_ok=True) |
|
filename = directory / Path(filename) |
|
|
|
try: |
|
response = requests.get(url=url, headers={"User-agent": "Mozilla/5.0"}, stream=True) |
|
response.raise_for_status() |
|
except ( |
|
requests.exceptions.HTTPError |
|
) as error: |
|
raise Exception(error) from None |
|
except requests.exceptions.Timeout: |
|
raise Exception( |
|
"Connection timed out. If you access the internet through a proxy server, please " |
|
"make sure the proxy is set in the shell from where you launched Jupyter." |
|
) from None |
|
except requests.exceptions.RequestException as error: |
|
raise Exception(f"File downloading failed with error: {error}") from None |
|
|
|
|
|
filesize = int(response.headers.get("Content-length", 0)) |
|
if not filename.exists() or (os.stat(filename).st_size != filesize): |
|
with tqdm( |
|
total=filesize, |
|
unit="B", |
|
unit_scale=True, |
|
unit_divisor=1024, |
|
desc=str(filename), |
|
disable=not show_progress, |
|
) as progress_bar: |
|
with open(filename, "wb") as file_object: |
|
for chunk in response.iter_content(chunk_size): |
|
file_object.write(chunk) |
|
progress_bar.update(len(chunk)) |
|
progress_bar.refresh() |
|
else: |
|
if not silent: |
|
print(f"'{filename}' already exists.") |
|
|
|
response.close() |
|
|
|
return filename.resolve() |
|
|
|
|
|
def download_ir_model(model_xml_url: str, destination_folder: PathLike = None) -> PathLike: |
|
""" |
|
Download IR model from `model_xml_url`. Downloads model xml and bin file; the weights file is |
|
assumed to exist at the same location and name as model_xml_url with a ".bin" extension. |
|
|
|
:param model_xml_url: URL to model xml file to download |
|
:param destination_folder: Directory where downloaded model xml and bin are saved. If None, model |
|
files are saved to the current directory |
|
:return: path to downloaded xml model file |
|
""" |
|
model_bin_url = model_xml_url[:-4] + ".bin" |
|
model_xml_path = download_file(model_xml_url, directory=destination_folder, show_progress=False) |
|
download_file(model_bin_url, directory=destination_folder) |
|
return model_xml_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_minmax(data): |
|
""" |
|
Normalizes the values in `data` between 0 and 1 |
|
""" |
|
if data.max() == data.min(): |
|
raise ValueError("Normalization is not possible because all elements of" f"`data` have the same value: {data.max()}.") |
|
return (data - data.min()) / (data.max() - data.min()) |
|
|
|
|
|
def to_rgb(image_data: np.ndarray) -> np.ndarray: |
|
""" |
|
Convert image_data from BGR to RGB |
|
""" |
|
import cv2 |
|
|
|
return cv2.cvtColor(image_data, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
def to_bgr(image_data: np.ndarray) -> np.ndarray: |
|
""" |
|
Convert image_data from RGB to BGR |
|
""" |
|
import cv2 |
|
|
|
return cv2.cvtColor(image_data, cv2.COLOR_RGB2BGR) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoPlayer: |
|
""" |
|
Custom video player to fulfill FPS requirements. You can set target FPS and output size, |
|
flip the video horizontally or skip first N frames. |
|
|
|
:param source: Video source. It could be either camera device or video file. |
|
:param size: Output frame size. |
|
:param flip: Flip source horizontally. |
|
:param fps: Target FPS. |
|
:param skip_first_frames: Skip first N frames. |
|
""" |
|
|
|
def __init__(self, source, size=None, flip=False, fps=None, skip_first_frames=0, width=1280, height=720): |
|
import cv2 |
|
|
|
self.cv2 = cv2 |
|
self.__cap = cv2.VideoCapture(source) |
|
|
|
self.__cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) |
|
self.__cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) |
|
|
|
if not self.__cap.isOpened(): |
|
raise RuntimeError(f"Cannot open {'camera' if isinstance(source, int) else ''} {source}") |
|
|
|
self.__cap.set(cv2.CAP_PROP_POS_FRAMES, skip_first_frames) |
|
|
|
self.__input_fps = self.__cap.get(cv2.CAP_PROP_FPS) |
|
if self.__input_fps <= 0: |
|
self.__input_fps = 60 |
|
|
|
self.__output_fps = fps if fps is not None else self.__input_fps |
|
self.__flip = flip |
|
self.__size = None |
|
self.__interpolation = None |
|
if size is not None: |
|
self.__size = size |
|
|
|
self.__interpolation = cv2.INTER_AREA if size[0] < self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH) else cv2.INTER_LINEAR |
|
|
|
_, self.__frame = self.__cap.read() |
|
self.__lock = threading.Lock() |
|
self.__thread = None |
|
self.__stop = False |
|
|
|
""" |
|
Start playing. |
|
""" |
|
|
|
def start(self): |
|
self.__stop = False |
|
self.__thread = threading.Thread(target=self.__run, daemon=True) |
|
self.__thread.start() |
|
|
|
""" |
|
Stop playing and release resources. |
|
""" |
|
|
|
def stop(self): |
|
self.__stop = True |
|
if self.__thread is not None: |
|
self.__thread.join() |
|
self.__cap.release() |
|
|
|
def __run(self): |
|
prev_time = 0 |
|
while not self.__stop: |
|
t1 = time.time() |
|
ret, frame = self.__cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
if 1 / self.__output_fps < time.time() - prev_time: |
|
prev_time = time.time() |
|
|
|
with self.__lock: |
|
self.__frame = frame |
|
|
|
t2 = time.time() |
|
|
|
wait_time = 1 / self.__input_fps - (t2 - t1) |
|
|
|
time.sleep(max(0, wait_time)) |
|
|
|
self.__frame = None |
|
|
|
""" |
|
Get current frame. |
|
""" |
|
|
|
def next(self): |
|
import cv2 |
|
|
|
with self.__lock: |
|
if self.__frame is None: |
|
return None |
|
|
|
frame = self.__frame.copy() |
|
if self.__size is not None: |
|
frame = self.cv2.resize(frame, self.__size, interpolation=self.__interpolation) |
|
if self.__flip: |
|
frame = self.cv2.flip(frame, 1) |
|
return frame |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Label(NamedTuple): |
|
index: int |
|
color: Tuple |
|
name: Optional[str] = None |
|
|
|
|
|
|
|
|
|
|
|
class SegmentationMap(NamedTuple): |
|
labels: List |
|
|
|
def get_colormap(self): |
|
return np.array([label.color for label in self.labels]) |
|
|
|
def get_labels(self): |
|
labelnames = [label.name for label in self.labels] |
|
if any(labelnames): |
|
return labelnames |
|
else: |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
cityscape_labels = [ |
|
Label(index=0, color=(128, 64, 128), name="road"), |
|
Label(index=1, color=(244, 35, 232), name="sidewalk"), |
|
Label(index=2, color=(70, 70, 70), name="building"), |
|
Label(index=3, color=(102, 102, 156), name="wall"), |
|
Label(index=4, color=(190, 153, 153), name="fence"), |
|
Label(index=5, color=(153, 153, 153), name="pole"), |
|
Label(index=6, color=(250, 170, 30), name="traffic light"), |
|
Label(index=7, color=(220, 220, 0), name="traffic sign"), |
|
Label(index=8, color=(107, 142, 35), name="vegetation"), |
|
Label(index=9, color=(152, 251, 152), name="terrain"), |
|
Label(index=10, color=(70, 130, 180), name="sky"), |
|
Label(index=11, color=(220, 20, 60), name="person"), |
|
Label(index=12, color=(255, 0, 0), name="rider"), |
|
Label(index=13, color=(0, 0, 142), name="car"), |
|
Label(index=14, color=(0, 0, 70), name="truck"), |
|
Label(index=15, color=(0, 60, 100), name="bus"), |
|
Label(index=16, color=(0, 80, 100), name="train"), |
|
Label(index=17, color=(0, 0, 230), name="motorcycle"), |
|
Label(index=18, color=(119, 11, 32), name="bicycle"), |
|
Label(index=19, color=(255, 255, 255), name="background"), |
|
] |
|
|
|
CityScapesSegmentation = SegmentationMap(cityscape_labels) |
|
|
|
binary_labels = [ |
|
Label(index=0, color=(255, 255, 255), name="background"), |
|
Label(index=1, color=(0, 0, 0), name="foreground"), |
|
] |
|
|
|
BinarySegmentation = SegmentationMap(binary_labels) |
|
|
|
|
|
|
|
|
|
|
|
def segmentation_map_to_image(result: np.ndarray, colormap: np.ndarray, remove_holes: bool = False) -> np.ndarray: |
|
""" |
|
Convert network result of floating point numbers to an RGB image with |
|
integer values from 0-255 by applying a colormap. |
|
|
|
:param result: A single network result after converting to pixel values in H,W or 1,H,W shape. |
|
:param colormap: A numpy array of shape (num_classes, 3) with an RGB value per class. |
|
:param remove_holes: If True, remove holes in the segmentation result. |
|
:return: An RGB image where each pixel is an int8 value according to colormap. |
|
""" |
|
import cv2 |
|
|
|
if len(result.shape) != 2 and result.shape[0] != 1: |
|
raise ValueError(f"Expected result with shape (H,W) or (1,H,W), got result with shape {result.shape}") |
|
|
|
if len(np.unique(result)) > colormap.shape[0]: |
|
raise ValueError( |
|
f"Expected max {colormap[0]} classes in result, got {len(np.unique(result))} " |
|
"different output values. Please make sure to convert the network output to " |
|
"pixel values before calling this function." |
|
) |
|
elif result.shape[0] == 1: |
|
result = result.squeeze(0) |
|
|
|
result = result.astype(np.uint8) |
|
|
|
contour_mode = cv2.RETR_EXTERNAL if remove_holes else cv2.RETR_TREE |
|
mask = np.zeros((result.shape[0], result.shape[1], 3), dtype=np.uint8) |
|
for label_index, color in enumerate(colormap): |
|
label_index_map = result == label_index |
|
label_index_map = label_index_map.astype(np.uint8) * 255 |
|
contours, hierarchies = cv2.findContours(label_index_map, contour_mode, cv2.CHAIN_APPROX_SIMPLE) |
|
cv2.drawContours( |
|
mask, |
|
contours, |
|
contourIdx=-1, |
|
color=color.tolist(), |
|
thickness=cv2.FILLED, |
|
) |
|
|
|
return mask |
|
|
|
|
|
def segmentation_map_to_overlay(image, result, alpha, colormap, remove_holes=False) -> np.ndarray: |
|
""" |
|
Returns a new image where a segmentation mask (created with colormap) is overlayed on |
|
the source image. |
|
|
|
:param image: Source image. |
|
:param result: A single network result after converting to pixel values in H,W or 1,H,W shape. |
|
:param alpha: Alpha transparency value for the overlay image. |
|
:param colormap: A numpy array of shape (num_classes, 3) with an RGB value per class. |
|
:param remove_holes: If True, remove holes in the segmentation result. |
|
:return: An RGP image with segmentation mask overlayed on the source image. |
|
""" |
|
import cv2 |
|
|
|
if len(image.shape) == 2: |
|
image = np.repeat(np.expand_dims(image, -1), 3, 2) |
|
mask = segmentation_map_to_image(result, colormap, remove_holes) |
|
image_height, image_width = image.shape[:2] |
|
mask = cv2.resize(src=mask, dsize=(image_width, image_height)) |
|
return cv2.addWeighted(mask, alpha, image, 1 - alpha, 0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def viz_result_image( |
|
result_image: np.ndarray, |
|
source_image: np.ndarray = None, |
|
source_title: str = None, |
|
result_title: str = None, |
|
labels: List[Label] = None, |
|
resize: bool = False, |
|
bgr_to_rgb: bool = False, |
|
hide_axes: bool = False, |
|
): |
|
""" |
|
Show result image, optionally together with source images, and a legend with labels. |
|
|
|
:param result_image: Numpy array of RGB result image. |
|
:param source_image: Numpy array of source image. If provided this image will be shown |
|
next to the result image. source_image is expected to be in RGB format. |
|
Set bgr_to_rgb to True if source_image is in BGR format. |
|
:param source_title: Title to display for the source image. |
|
:param result_title: Title to display for the result image. |
|
:param labels: List of labels. If provided, a legend will be shown with the given labels. |
|
:param resize: If true, resize the result image to the same shape as the source image. |
|
:param bgr_to_rgb: If true, convert the source image from BGR to RGB. Use this option if |
|
source_image is a BGR image. |
|
:param hide_axes: If true, do not show matplotlib axes. |
|
:return: Matplotlib figure with result image |
|
""" |
|
import cv2 |
|
import matplotlib.pyplot as plt |
|
from matplotlib.lines import Line2D |
|
|
|
if bgr_to_rgb: |
|
source_image = to_rgb(source_image) |
|
if resize: |
|
result_image = cv2.resize(result_image, (source_image.shape[1], source_image.shape[0])) |
|
|
|
num_images = 1 if source_image is None else 2 |
|
|
|
fig, ax = plt.subplots(1, num_images, figsize=(16, 8), squeeze=False) |
|
if source_image is not None: |
|
ax[0, 0].imshow(source_image) |
|
ax[0, 0].set_title(source_title) |
|
|
|
ax[0, num_images - 1].imshow(result_image) |
|
ax[0, num_images - 1].set_title(result_title) |
|
|
|
if hide_axes: |
|
for a in ax.ravel(): |
|
a.axis("off") |
|
if labels: |
|
colors = labels.get_colormap() |
|
lines = [ |
|
Line2D( |
|
[0], |
|
[0], |
|
color=[item / 255 for item in c.tolist()], |
|
linewidth=3, |
|
linestyle="-", |
|
) |
|
for c in colors |
|
] |
|
plt.legend( |
|
lines, |
|
labels.get_labels(), |
|
bbox_to_anchor=(1, 1), |
|
loc="upper left", |
|
prop={"size": 12}, |
|
) |
|
plt.close(fig) |
|
return fig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def show_array(frame: np.ndarray, display_handle=None): |
|
""" |
|
Display array `frame`. Replace information at `display_handle` with `frame` |
|
encoded as jpeg image. `frame` is expected to have data in BGR order. |
|
|
|
Create a display_handle with: `display_handle = display(display_id=True)` |
|
""" |
|
import cv2 |
|
|
|
_, frame = cv2.imencode(ext=".jpeg", img=frame) |
|
if display_handle is None: |
|
display_handle = display(Image(data=frame.tobytes()), display_id=True) |
|
else: |
|
display_handle.update(Image(data=frame.tobytes())) |
|
return display_handle |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NotebookAlert(Exception): |
|
def __init__(self, message: str, alert_class: str): |
|
""" |
|
Show an alert box with the given message. |
|
|
|
:param message: The message to display. |
|
:param alert_class: The class for styling the message. Options: info, warning, success, danger. |
|
""" |
|
self.message = message |
|
self.alert_class = alert_class |
|
self.show_message() |
|
|
|
def show_message(self): |
|
display(HTML(f"""<div class="alert alert-{self.alert_class}">{self.message}""")) |
|
|
|
|
|
class DeviceNotFoundAlert(NotebookAlert): |
|
def __init__(self, device: str): |
|
""" |
|
Show a warning message about an unavailable device. This class does not check whether or |
|
not the device is available, use the `check_device` function to check this. `check_device` |
|
also shows the warning if the device is not found. |
|
|
|
:param device: The unavailable device. |
|
:return: A formatted alert box with the message that `device` is not available, and a list |
|
of devices that are available. |
|
""" |
|
ie = Core() |
|
supported_devices = ie.available_devices |
|
self.message = f"Running this cell requires a {device} device, " "which is not available on this system. " |
|
self.alert_class = "warning" |
|
if len(supported_devices) == 1: |
|
self.message += f"The following device is available: {ie.available_devices[0]}" |
|
else: |
|
self.message += "The following devices are available: " f"{', '.join(ie.available_devices)}" |
|
super().__init__(self.message, self.alert_class) |
|
|
|
|
|
def check_device(device: str) -> bool: |
|
""" |
|
Check if the specified device is available on the system. |
|
|
|
:param device: Device to check. e.g. CPU, GPU |
|
:return: True if the device is available, False if not. If the device is not available, |
|
a DeviceNotFoundAlert will be shown. |
|
""" |
|
ie = Core() |
|
if device not in ie.available_devices: |
|
DeviceNotFoundAlert(device) |
|
return False |
|
else: |
|
return True |
|
|
|
|
|
def check_openvino_version(version: str) -> bool: |
|
""" |
|
Check if the specified OpenVINO version is installed. |
|
|
|
:param version: the OpenVINO version to check. Example: 2021.4 |
|
:return: True if the version is installed, False if not. If the version is not installed, |
|
an alert message will be shown. |
|
""" |
|
installed_version = get_version() |
|
if version not in installed_version: |
|
NotebookAlert( |
|
f"This notebook requires OpenVINO {version}. " |
|
f"The version on your system is: <i>{installed_version}</i>.<br>" |
|
"Please run <span style='font-family:monospace'>pip install --upgrade -r requirements.txt</span> " |
|
"in the openvino_env environment to install this version. " |
|
"See the <a href='https://github.com/openvinotoolkit/openvino_notebooks'>" |
|
"OpenVINO Notebooks README</a> for detailed instructions", |
|
alert_class="danger", |
|
) |
|
return False |
|
else: |
|
return True |
|
|
|
|
|
packed_layername_tensor_dict_list = [{"name": "aten::mul/Multiply"}] |
|
|
|
|
|
class ReplaceTensor(MatcherPass): |
|
def __init__(self, packed_layername_tensor_dict_list): |
|
MatcherPass.__init__(self) |
|
self.model_changed = False |
|
|
|
param = WrapType("opset10.Multiply") |
|
|
|
def callback(matcher: Matcher) -> bool: |
|
root = matcher.get_match_root() |
|
if root is None: |
|
return False |
|
for y in packed_layername_tensor_dict_list: |
|
root_name = root.get_friendly_name() |
|
if root_name.find(y["name"]) != -1: |
|
max_fp16 = np.array([[[[-np.finfo(np.float16).max]]]]).astype(np.float32) |
|
new_tenser = ops.constant(max_fp16, Type.f32, name="Constant_4431") |
|
root.set_arguments([root.input_value(0).node, new_tenser]) |
|
packed_layername_tensor_dict_list.remove(y) |
|
|
|
return True |
|
|
|
self.register_matcher(Matcher(param, "ReplaceTensor"), callback) |
|
|
|
|
|
def optimize_bge_embedding(model_path, output_model_path): |
|
""" |
|
optimize_bge_embedding used to optimize BGE model for NPU device |
|
|
|
Arguments: |
|
model_path {str} -- original BGE IR model path |
|
output_model_path {str} -- Converted BGE IR model path |
|
""" |
|
core = Core() |
|
ov_model = core.read_model(model_path) |
|
manager = Manager() |
|
manager.register_pass(ReplaceTensor(packed_layername_tensor_dict_list)) |
|
manager.run_passes(ov_model) |
|
ov.save_model(ov_model, output_model_path, compress_to_fp16=False) |
|
|