Spaces:
Sleeping
Sleeping
"""The VideoStreamInterface provides an easy way to apply transforms to a video stream""" | |
import base64 | |
import io | |
import time | |
import numpy as np | |
import panel as pn | |
import param | |
import PIL | |
from PIL import Image | |
HEIGHT = 400 | |
WIDTH = 400 | |
TIMEOUT = 250 | |
ACCENT = "#fef3c7" | |
def to_instance(value, **params): | |
"""Converts the value to an instance | |
Args: | |
value: A param.Parameterized class or instance | |
Returns: | |
An instance of the param.Parameterized class | |
""" | |
if isinstance(value, param.Parameterized): | |
value.param.update(**params) | |
return value | |
return value(**params) | |
class Timer(pn.viewable.Viewer): | |
"""Helper Component used to show duration trends""" | |
_trends = param.Dict() | |
def __init__(self, **params): | |
super().__init__() | |
self.last_updates = {} | |
self._trends = {} | |
self._layout = pn.Row(**params) | |
def time_it(self, name, func, *args, **kwargs): | |
"""Measures the duration of the execution of the func function and reports it under the | |
name specified""" | |
start = time.time() | |
result = func(*args, **kwargs) | |
end = time.time() | |
duration = round(end - start, 2) | |
self._report(name=name, duration=duration) | |
return result | |
def inc_it(self, name): | |
"""Measures the duration since the last time `inc_it` was called and reports it under the | |
specified name""" | |
start = self.last_updates.get(name, time.time()) | |
end = time.time() | |
duration = round(end - start, 2) | |
self._report(name=name, duration=duration) | |
self.last_updates[name] = end | |
def _report(self, name, duration): | |
if not name in self._trends: | |
self._trends[name] = pn.indicators.Trend( | |
title=name, | |
data={"x": [1], "y": [duration]}, | |
plot_color=ACCENT, | |
height=100, | |
width=150, | |
sizing_mode="fixed", | |
) | |
self.param.trigger("_trends") | |
else: | |
trend = self._trends[name] | |
next_x = max(trend.data["x"]) + 1 | |
trend.stream({"x": [next_x], "y": [duration]}, rollover=10) | |
def _panel(self): | |
self._layout[:] = list(self._trends.values()) | |
return self._layout | |
def __panel__(self): | |
return self._panel | |
class ImageTransform(pn.viewable.Viewer): | |
"""Base class for image transforms.""" | |
def __init__(self, **params): | |
super().__init__(**params) | |
with param.edit_constant(self): | |
self.name = self.__class__.name.replace("Transform", "") | |
self.view = self.create_view() | |
def __panel__(self): | |
return self.view | |
def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> str: | |
"""Transforms the base64 encoded jpg image to a base64 encoded jpg BytesIO object""" | |
raise NotImplementedError() | |
def create_view(self): | |
"""Creates a view of the parameters of the transform to enable the user to configure them""" | |
return pn.Param(self, name=self.name) | |
def transform(self, image): | |
"""Transforms the image""" | |
raise NotImplementedError() | |
class PILImageTransform(ImageTransform): | |
"""Base class for PIL image transforms""" | |
def to_pil_img(value: str, height=HEIGHT, width=WIDTH): | |
"""Converts a base64 jpeg image string to a PIL.Image""" | |
encoded_data = value.split(",")[1] | |
base64_decoded = base64.b64decode(encoded_data) | |
image = Image.open(io.BytesIO(base64_decoded)) | |
image.draft("RGB", (height, width)) | |
return image | |
def from_pil_img(image: Image): | |
"""Converts a PIL.Image to a base64 encoded JPG BytesIO object""" | |
buff = io.BytesIO() | |
image.save(buff, format="JPEG") | |
return buff | |
def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO: | |
pil_img = self.to_pil_img(image, height=height, width=width) | |
transformed_image = self.transform(pil_img) | |
return self.from_pil_img(transformed_image) | |
def transform(self, image: PIL.Image) -> PIL.Image: | |
"""Transforms the PIL.Image image""" | |
raise NotImplementedError() | |
class NumpyImageTransform(ImageTransform): | |
"""Base class for np.ndarray image transforms""" | |
def to_np_ndarray(image: str, height=HEIGHT, width=WIDTH) -> np.ndarray: | |
"""Converts a base64 encoded jpeg string to a np.ndarray""" | |
pil_img = PILImageTransform.to_pil_img(image, height=height, width=width) | |
return np.array(pil_img) | |
def from_np_ndarray(image: np.ndarray) -> io.BytesIO: | |
"""Converts np.ndarray jpeg image to a jpeg BytesIO instance""" | |
if image.dtype == np.dtype("float64"): | |
image = (image * 255).astype(np.uint8) | |
pil_img = PIL.Image.fromarray(image) | |
return PILImageTransform.from_pil_img(pil_img) | |
def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO: | |
np_array = self.to_np_ndarray(image, height=height, width=width) | |
transformed_image = self.transform(np_array) | |
return self.from_np_ndarray(transformed_image) | |
def transform(self, image: np.ndarray) -> np.ndarray: | |
"""Transforms the nd.array image""" | |
raise NotImplementedError() | |
class VideoStreamInterface(pn.viewable.Viewer): | |
"""An easy to use interface for a VideoStream and a set of transforms""" | |
video_stream = param.ClassSelector( | |
class_=pn.widgets.VideoStream, constant=True, doc="The source VideoStream" | |
) | |
height = param.Integer( | |
HEIGHT, | |
bounds=(10, 2000), | |
step=10, | |
doc="""The height of the image converted and shown""", | |
) | |
width = param.Integer( | |
WIDTH, | |
bounds=(10, 2000), | |
step=10, | |
doc="""The width of the image converted and shown""", | |
) | |
transform = param.Selector(doc="The currently selected transform") | |
def __init__( | |
self, | |
transforms, | |
timeout=TIMEOUT, | |
paused=False, | |
**params, | |
): | |
super().__init__( | |
video_stream=pn.widgets.VideoStream( | |
name="Video Stream", | |
timeout=timeout, | |
paused=paused, | |
height=0, | |
width=0, | |
visible=False, | |
format="jpeg", | |
), | |
**params, | |
) | |
self.image = pn.pane.JPG( | |
height=self.height, width=self.width, sizing_mode="fixed" | |
) | |
self._updating = False | |
transforms = [to_instance(transform) for transform in transforms] | |
self.param.transform.objects = transforms | |
self.transform = transforms[0] | |
self.timer = Timer(sizing_mode="stretch_width") | |
self.settings = self._create_settings() | |
self._panel = self._create_panel() | |
def _create_settings(self): | |
return pn.Column( | |
pn.Param( | |
self.video_stream, | |
parameters=["timeout", "paused"], | |
widgets={ | |
"timeout": { | |
"widget_type": pn.widgets.IntSlider, | |
"start": 10, | |
"end": 2000, | |
"step": 10, | |
} | |
}, | |
), | |
self.timer, | |
pn.Param(self, parameters=["height", "width"], name="Image"), | |
pn.Param( | |
self, | |
parameters=["transform"], | |
expand_button=False, | |
expand=False, | |
widgets={ | |
"transform": { | |
"widget_type": pn.widgets.RadioButtonGroup, | |
"orientation": "vertical", | |
"button_type": "success", | |
} | |
}, | |
name="Transform", | |
), | |
self._get_transform, | |
) | |
def _create_panel(self): | |
return pn.Row( | |
self.video_stream, | |
pn.layout.HSpacer(), | |
self.image, | |
pn.layout.HSpacer(), | |
sizing_mode="stretch_width", | |
align="center", | |
) | |
def _update_height_width(self): | |
self.image.height = self.height | |
self.image.width = self.width | |
def _get_transform(self): | |
# Hack: returning self.transform stops working after browsing the transforms for a while | |
return self.transform.view | |
def __panel__(self): | |
return self._panel | |
def _handle_stream(self): | |
if self._updating: | |
return | |
self._updating = True | |
if self.transform and self.video_stream.value: | |
value = self.video_stream.value | |
try: | |
image = self.timer.time_it( | |
name="Transform", | |
func=self.transform.run, | |
image=value, | |
height=self.height, | |
width=self.width, | |
) | |
self.image.object = image | |
except PIL.UnidentifiedImageError: | |
print("unidentified image") | |
self.timer.inc_it("last update") | |
self._updating = False | |