Spaces:
Sleeping
Sleeping
| """The VideoStreamInterface provides an easy way to apply transforms to a video stream""" | |
| import base64 | |
| import io | |
| import time | |
| import numpy as np | |
| import panel as pn | |
| import param | |
| import PIL | |
| from PIL import Image | |
| HEIGHT = 400 | |
| WIDTH = 400 | |
| TIMEOUT = 250 | |
| ACCENT = "#fef3c7" | |
| def to_instance(value, **params): | |
| """Converts the value to an instance | |
| Args: | |
| value: A param.Parameterized class or instance | |
| Returns: | |
| An instance of the param.Parameterized class | |
| """ | |
| if isinstance(value, param.Parameterized): | |
| value.param.update(**params) | |
| return value | |
| return value(**params) | |
| class Timer(pn.viewable.Viewer): | |
| """Helper Component used to show duration trends""" | |
| _trends = param.Dict() | |
| def __init__(self, **params): | |
| super().__init__() | |
| self.last_updates = {} | |
| self._trends = {} | |
| self._layout = pn.Row(**params) | |
| def time_it(self, name, func, *args, **kwargs): | |
| """Measures the duration of the execution of the func function and reports it under the | |
| name specified""" | |
| start = time.time() | |
| result = func(*args, **kwargs) | |
| end = time.time() | |
| duration = round(end - start, 2) | |
| self._report(name=name, duration=duration) | |
| return result | |
| def inc_it(self, name): | |
| """Measures the duration since the last time `inc_it` was called and reports it under the | |
| specified name""" | |
| start = self.last_updates.get(name, time.time()) | |
| end = time.time() | |
| duration = round(end - start, 2) | |
| self._report(name=name, duration=duration) | |
| self.last_updates[name] = end | |
| def _report(self, name, duration): | |
| if not name in self._trends: | |
| self._trends[name] = pn.indicators.Trend( | |
| title=name, | |
| data={"x": [1], "y": [duration]}, | |
| plot_color=ACCENT, | |
| height=100, | |
| width=150, | |
| sizing_mode="fixed", | |
| ) | |
| self.param.trigger("_trends") | |
| else: | |
| trend = self._trends[name] | |
| next_x = max(trend.data["x"]) + 1 | |
| trend.stream({"x": [next_x], "y": [duration]}, rollover=10) | |
| def _panel(self): | |
| self._layout[:] = list(self._trends.values()) | |
| return self._layout | |
| def __panel__(self): | |
| return self._panel | |
| class ImageTransform(pn.viewable.Viewer): | |
| """Base class for image transforms.""" | |
| def __init__(self, **params): | |
| super().__init__(**params) | |
| with param.edit_constant(self): | |
| self.name = self.__class__.name.replace("Transform", "") | |
| self.view = self.create_view() | |
| def __panel__(self): | |
| return self.view | |
| def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> str: | |
| """Transforms the base64 encoded jpg image to a base64 encoded jpg BytesIO object""" | |
| raise NotImplementedError() | |
| def create_view(self): | |
| """Creates a view of the parameters of the transform to enable the user to configure them""" | |
| return pn.Param(self, name=self.name) | |
| def transform(self, image): | |
| """Transforms the image""" | |
| raise NotImplementedError() | |
| class PILImageTransform(ImageTransform): | |
| """Base class for PIL image transforms""" | |
| def to_pil_img(value: str, height=HEIGHT, width=WIDTH): | |
| """Converts a base64 jpeg image string to a PIL.Image""" | |
| encoded_data = value.split(",")[1] | |
| base64_decoded = base64.b64decode(encoded_data) | |
| image = Image.open(io.BytesIO(base64_decoded)) | |
| image.draft("RGB", (height, width)) | |
| return image | |
| def from_pil_img(image: Image): | |
| """Converts a PIL.Image to a base64 encoded JPG BytesIO object""" | |
| buff = io.BytesIO() | |
| image.save(buff, format="JPEG") | |
| return buff | |
| def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO: | |
| pil_img = self.to_pil_img(image, height=height, width=width) | |
| transformed_image = self.transform(pil_img) | |
| return self.from_pil_img(transformed_image) | |
| def transform(self, image: PIL.Image) -> PIL.Image: | |
| """Transforms the PIL.Image image""" | |
| raise NotImplementedError() | |
| class NumpyImageTransform(ImageTransform): | |
| """Base class for np.ndarray image transforms""" | |
| def to_np_ndarray(image: str, height=HEIGHT, width=WIDTH) -> np.ndarray: | |
| """Converts a base64 encoded jpeg string to a np.ndarray""" | |
| pil_img = PILImageTransform.to_pil_img(image, height=height, width=width) | |
| return np.array(pil_img) | |
| def from_np_ndarray(image: np.ndarray) -> io.BytesIO: | |
| """Converts np.ndarray jpeg image to a jpeg BytesIO instance""" | |
| if image.dtype == np.dtype("float64"): | |
| image = (image * 255).astype(np.uint8) | |
| pil_img = PIL.Image.fromarray(image) | |
| return PILImageTransform.from_pil_img(pil_img) | |
| def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO: | |
| np_array = self.to_np_ndarray(image, height=height, width=width) | |
| transformed_image = self.transform(np_array) | |
| return self.from_np_ndarray(transformed_image) | |
| def transform(self, image: np.ndarray) -> np.ndarray: | |
| """Transforms the nd.array image""" | |
| raise NotImplementedError() | |
| class VideoStreamInterface(pn.viewable.Viewer): | |
| """An easy to use interface for a VideoStream and a set of transforms""" | |
| video_stream = param.ClassSelector( | |
| class_=pn.widgets.VideoStream, constant=True, doc="The source VideoStream" | |
| ) | |
| height = param.Integer( | |
| HEIGHT, | |
| bounds=(10, 2000), | |
| step=10, | |
| doc="""The height of the image converted and shown""", | |
| ) | |
| width = param.Integer( | |
| WIDTH, | |
| bounds=(10, 2000), | |
| step=10, | |
| doc="""The width of the image converted and shown""", | |
| ) | |
| transform = param.Selector(doc="The currently selected transform") | |
| def __init__( | |
| self, | |
| transforms, | |
| timeout=TIMEOUT, | |
| paused=False, | |
| **params, | |
| ): | |
| super().__init__( | |
| video_stream=pn.widgets.VideoStream( | |
| name="Video Stream", | |
| timeout=timeout, | |
| paused=paused, | |
| height=0, | |
| width=0, | |
| visible=False, | |
| format="jpeg", | |
| ), | |
| **params, | |
| ) | |
| self.image = pn.pane.JPG( | |
| height=self.height, width=self.width, sizing_mode="fixed" | |
| ) | |
| self._updating = False | |
| transforms = [to_instance(transform) for transform in transforms] | |
| self.param.transform.objects = transforms | |
| self.transform = transforms[0] | |
| self.timer = Timer(sizing_mode="stretch_width") | |
| self.settings = self._create_settings() | |
| self._panel = self._create_panel() | |
| def _create_settings(self): | |
| return pn.Column( | |
| pn.Param( | |
| self.video_stream, | |
| parameters=["timeout", "paused"], | |
| widgets={ | |
| "timeout": { | |
| "widget_type": pn.widgets.IntSlider, | |
| "start": 10, | |
| "end": 2000, | |
| "step": 10, | |
| } | |
| }, | |
| ), | |
| self.timer, | |
| pn.Param(self, parameters=["height", "width"], name="Image"), | |
| pn.Param( | |
| self, | |
| parameters=["transform"], | |
| expand_button=False, | |
| expand=False, | |
| widgets={ | |
| "transform": { | |
| "widget_type": pn.widgets.RadioButtonGroup, | |
| "orientation": "vertical", | |
| "button_type": "success", | |
| } | |
| }, | |
| name="Transform", | |
| ), | |
| self._get_transform, | |
| ) | |
| def _create_panel(self): | |
| return pn.Row( | |
| self.video_stream, | |
| pn.layout.HSpacer(), | |
| self.image, | |
| pn.layout.HSpacer(), | |
| sizing_mode="stretch_width", | |
| align="center", | |
| ) | |
| def _update_height_width(self): | |
| self.image.height = self.height | |
| self.image.width = self.width | |
| def _get_transform(self): | |
| # Hack: returning self.transform stops working after browsing the transforms for a while | |
| return self.transform.view | |
| def __panel__(self): | |
| return self._panel | |
| def _handle_stream(self): | |
| if self._updating: | |
| return | |
| self._updating = True | |
| if self.transform and self.video_stream.value: | |
| value = self.video_stream.value | |
| try: | |
| image = self.timer.time_it( | |
| name="Transform", | |
| func=self.transform.run, | |
| image=value, | |
| height=self.height, | |
| width=self.width, | |
| ) | |
| self.image.object = image | |
| except PIL.UnidentifiedImageError: | |
| print("unidentified image") | |
| self.timer.inc_it("last update") | |
| self._updating = False | |