"""The VideoStreamInterface provides an easy way to apply transforms to a video stream""" import base64 import io import time import numpy as np import panel as pn import param import PIL from PIL import Image HEIGHT = 400 WIDTH = 400 TIMEOUT = 250 ACCENT = "#fef3c7" def to_instance(value, **params): """Converts the value to an instance Args: value: A param.Parameterized class or instance Returns: An instance of the param.Parameterized class """ if isinstance(value, param.Parameterized): value.param.update(**params) return value return value(**params) class Timer(pn.viewable.Viewer): """Helper Component used to show duration trends""" _trends = param.Dict() def __init__(self, **params): super().__init__() self.last_updates = {} self._trends = {} self._layout = pn.Row(**params) def time_it(self, name, func, *args, **kwargs): """Measures the duration of the execution of the func function and reports it under the name specified""" start = time.time() result = func(*args, **kwargs) end = time.time() duration = round(end - start, 2) self._report(name=name, duration=duration) return result def inc_it(self, name): """Measures the duration since the last time `inc_it` was called and reports it under the specified name""" start = self.last_updates.get(name, time.time()) end = time.time() duration = round(end - start, 2) self._report(name=name, duration=duration) self.last_updates[name] = end def _report(self, name, duration): if not name in self._trends: self._trends[name] = pn.indicators.Trend( title=name, data={"x": [1], "y": [duration]}, plot_color=ACCENT, height=100, width=150, sizing_mode="fixed", ) self.param.trigger("_trends") else: trend = self._trends[name] next_x = max(trend.data["x"]) + 1 trend.stream({"x": [next_x], "y": [duration]}, rollover=10) @pn.depends("_trends") def _panel(self): self._layout[:] = list(self._trends.values()) return self._layout def __panel__(self): return self._panel class ImageTransform(pn.viewable.Viewer): """Base class for image transforms.""" def __init__(self, **params): super().__init__(**params) with param.edit_constant(self): self.name = self.__class__.name.replace("Transform", "") self.view = self.create_view() def __panel__(self): return self.view def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> str: """Transforms the base64 encoded jpg image to a base64 encoded jpg BytesIO object""" raise NotImplementedError() def create_view(self): """Creates a view of the parameters of the transform to enable the user to configure them""" return pn.Param(self, name=self.name) def transform(self, image): """Transforms the image""" raise NotImplementedError() class PILImageTransform(ImageTransform): """Base class for PIL image transforms""" @staticmethod def to_pil_img(value: str, height=HEIGHT, width=WIDTH): """Converts a base64 jpeg image string to a PIL.Image""" encoded_data = value.split(",")[1] base64_decoded = base64.b64decode(encoded_data) image = Image.open(io.BytesIO(base64_decoded)) image.draft("RGB", (height, width)) return image @staticmethod def from_pil_img(image: Image): """Converts a PIL.Image to a base64 encoded JPG BytesIO object""" buff = io.BytesIO() image.save(buff, format="JPEG") return buff def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO: pil_img = self.to_pil_img(image, height=height, width=width) transformed_image = self.transform(pil_img) return self.from_pil_img(transformed_image) def transform(self, image: PIL.Image) -> PIL.Image: """Transforms the PIL.Image image""" raise NotImplementedError() class NumpyImageTransform(ImageTransform): """Base class for np.ndarray image transforms""" @staticmethod def to_np_ndarray(image: str, height=HEIGHT, width=WIDTH) -> np.ndarray: """Converts a base64 encoded jpeg string to a np.ndarray""" pil_img = PILImageTransform.to_pil_img(image, height=height, width=width) return np.array(pil_img) @staticmethod def from_np_ndarray(image: np.ndarray) -> io.BytesIO: """Converts np.ndarray jpeg image to a jpeg BytesIO instance""" if image.dtype == np.dtype("float64"): image = (image * 255).astype(np.uint8) pil_img = PIL.Image.fromarray(image) return PILImageTransform.from_pil_img(pil_img) def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO: np_array = self.to_np_ndarray(image, height=height, width=width) transformed_image = self.transform(np_array) return self.from_np_ndarray(transformed_image) def transform(self, image: np.ndarray) -> np.ndarray: """Transforms the nd.array image""" raise NotImplementedError() class VideoStreamInterface(pn.viewable.Viewer): """An easy to use interface for a VideoStream and a set of transforms""" video_stream = param.ClassSelector( class_=pn.widgets.VideoStream, constant=True, doc="The source VideoStream" ) height = param.Integer( HEIGHT, bounds=(10, 2000), step=10, doc="""The height of the image converted and shown""", ) width = param.Integer( WIDTH, bounds=(10, 2000), step=10, doc="""The width of the image converted and shown""", ) transform = param.Selector(doc="The currently selected transform") def __init__( self, transforms, timeout=TIMEOUT, paused=False, **params, ): super().__init__( video_stream=pn.widgets.VideoStream( name="Video Stream", timeout=timeout, paused=paused, height=0, width=0, visible=False, format="jpeg", ), **params, ) self.image = pn.pane.JPG( height=self.height, width=self.width, sizing_mode="fixed" ) self._updating = False transforms = [to_instance(transform) for transform in transforms] self.param.transform.objects = transforms self.transform = transforms[0] self.timer = Timer(sizing_mode="stretch_width") self.settings = self._create_settings() self._panel = self._create_panel() def _create_settings(self): return pn.Column( pn.Param( self.video_stream, parameters=["timeout", "paused"], widgets={ "timeout": { "widget_type": pn.widgets.IntSlider, "start": 10, "end": 2000, "step": 10, } }, ), self.timer, pn.Param(self, parameters=["height", "width"], name="Image"), pn.Param( self, parameters=["transform"], expand_button=False, expand=False, widgets={ "transform": { "widget_type": pn.widgets.RadioButtonGroup, "orientation": "vertical", "button_type": "success", } }, name="Transform", ), self._get_transform, ) def _create_panel(self): return pn.Row( self.video_stream, pn.layout.HSpacer(), self.image, pn.layout.HSpacer(), sizing_mode="stretch_width", align="center", ) @pn.depends("height", "width", watch=True) def _update_height_width(self): self.image.height = self.height self.image.width = self.width @pn.depends("transform") def _get_transform(self): # Hack: returning self.transform stops working after browsing the transforms for a while return self.transform.view def __panel__(self): return self._panel @pn.depends("video_stream.value", watch=True) def _handle_stream(self): if self._updating: return self._updating = True if self.transform and self.video_stream.value: value = self.video_stream.value try: image = self.timer.time_it( name="Transform", func=self.transform.run, image=value, height=self.height, width=self.width, ) self.image.object = image except PIL.UnidentifiedImageError: print("unidentified image") self.timer.inc_it("last update") self._updating = False