marketing-campaign / videostream_utils.py
analytics-jiten's picture
Upload 5 files
8f76218
raw
history blame contribute delete
No virus
9.47 kB
"""The VideoStreamInterface provides an easy way to apply transforms to a video stream"""
import base64
import io
import time
import numpy as np
import panel as pn
import param
import PIL
from PIL import Image
HEIGHT = 400
WIDTH = 400
TIMEOUT = 250
ACCENT = "#fef3c7"
def to_instance(value, **params):
"""Converts the value to an instance
Args:
value: A param.Parameterized class or instance
Returns:
An instance of the param.Parameterized class
"""
if isinstance(value, param.Parameterized):
value.param.update(**params)
return value
return value(**params)
class Timer(pn.viewable.Viewer):
"""Helper Component used to show duration trends"""
_trends = param.Dict()
def __init__(self, **params):
super().__init__()
self.last_updates = {}
self._trends = {}
self._layout = pn.Row(**params)
def time_it(self, name, func, *args, **kwargs):
"""Measures the duration of the execution of the func function and reports it under the
name specified"""
start = time.time()
result = func(*args, **kwargs)
end = time.time()
duration = round(end - start, 2)
self._report(name=name, duration=duration)
return result
def inc_it(self, name):
"""Measures the duration since the last time `inc_it` was called and reports it under the
specified name"""
start = self.last_updates.get(name, time.time())
end = time.time()
duration = round(end - start, 2)
self._report(name=name, duration=duration)
self.last_updates[name] = end
def _report(self, name, duration):
if not name in self._trends:
self._trends[name] = pn.indicators.Trend(
title=name,
data={"x": [1], "y": [duration]},
plot_color=ACCENT,
height=100,
width=150,
sizing_mode="fixed",
)
self.param.trigger("_trends")
else:
trend = self._trends[name]
next_x = max(trend.data["x"]) + 1
trend.stream({"x": [next_x], "y": [duration]}, rollover=10)
@pn.depends("_trends")
def _panel(self):
self._layout[:] = list(self._trends.values())
return self._layout
def __panel__(self):
return self._panel
class ImageTransform(pn.viewable.Viewer):
"""Base class for image transforms."""
def __init__(self, **params):
super().__init__(**params)
with param.edit_constant(self):
self.name = self.__class__.name.replace("Transform", "")
self.view = self.create_view()
def __panel__(self):
return self.view
def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> str:
"""Transforms the base64 encoded jpg image to a base64 encoded jpg BytesIO object"""
raise NotImplementedError()
def create_view(self):
"""Creates a view of the parameters of the transform to enable the user to configure them"""
return pn.Param(self, name=self.name)
def transform(self, image):
"""Transforms the image"""
raise NotImplementedError()
class PILImageTransform(ImageTransform):
"""Base class for PIL image transforms"""
@staticmethod
def to_pil_img(value: str, height=HEIGHT, width=WIDTH):
"""Converts a base64 jpeg image string to a PIL.Image"""
encoded_data = value.split(",")[1]
base64_decoded = base64.b64decode(encoded_data)
image = Image.open(io.BytesIO(base64_decoded))
image.draft("RGB", (height, width))
return image
@staticmethod
def from_pil_img(image: Image):
"""Converts a PIL.Image to a base64 encoded JPG BytesIO object"""
buff = io.BytesIO()
image.save(buff, format="JPEG")
return buff
def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO:
pil_img = self.to_pil_img(image, height=height, width=width)
transformed_image = self.transform(pil_img)
return self.from_pil_img(transformed_image)
def transform(self, image: PIL.Image) -> PIL.Image:
"""Transforms the PIL.Image image"""
raise NotImplementedError()
class NumpyImageTransform(ImageTransform):
"""Base class for np.ndarray image transforms"""
@staticmethod
def to_np_ndarray(image: str, height=HEIGHT, width=WIDTH) -> np.ndarray:
"""Converts a base64 encoded jpeg string to a np.ndarray"""
pil_img = PILImageTransform.to_pil_img(image, height=height, width=width)
return np.array(pil_img)
@staticmethod
def from_np_ndarray(image: np.ndarray) -> io.BytesIO:
"""Converts np.ndarray jpeg image to a jpeg BytesIO instance"""
if image.dtype == np.dtype("float64"):
image = (image * 255).astype(np.uint8)
pil_img = PIL.Image.fromarray(image)
return PILImageTransform.from_pil_img(pil_img)
def run(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO:
np_array = self.to_np_ndarray(image, height=height, width=width)
transformed_image = self.transform(np_array)
return self.from_np_ndarray(transformed_image)
def transform(self, image: np.ndarray) -> np.ndarray:
"""Transforms the nd.array image"""
raise NotImplementedError()
class VideoStreamInterface(pn.viewable.Viewer):
"""An easy to use interface for a VideoStream and a set of transforms"""
video_stream = param.ClassSelector(
class_=pn.widgets.VideoStream, constant=True, doc="The source VideoStream"
)
height = param.Integer(
HEIGHT,
bounds=(10, 2000),
step=10,
doc="""The height of the image converted and shown""",
)
width = param.Integer(
WIDTH,
bounds=(10, 2000),
step=10,
doc="""The width of the image converted and shown""",
)
transform = param.Selector(doc="The currently selected transform")
def __init__(
self,
transforms,
timeout=TIMEOUT,
paused=False,
**params,
):
super().__init__(
video_stream=pn.widgets.VideoStream(
name="Video Stream",
timeout=timeout,
paused=paused,
height=0,
width=0,
visible=False,
format="jpeg",
),
**params,
)
self.image = pn.pane.JPG(
height=self.height, width=self.width, sizing_mode="fixed"
)
self._updating = False
transforms = [to_instance(transform) for transform in transforms]
self.param.transform.objects = transforms
self.transform = transforms[0]
self.timer = Timer(sizing_mode="stretch_width")
self.settings = self._create_settings()
self._panel = self._create_panel()
def _create_settings(self):
return pn.Column(
pn.Param(
self.video_stream,
parameters=["timeout", "paused"],
widgets={
"timeout": {
"widget_type": pn.widgets.IntSlider,
"start": 10,
"end": 2000,
"step": 10,
}
},
),
self.timer,
pn.Param(self, parameters=["height", "width"], name="Image"),
pn.Param(
self,
parameters=["transform"],
expand_button=False,
expand=False,
widgets={
"transform": {
"widget_type": pn.widgets.RadioButtonGroup,
"orientation": "vertical",
"button_type": "success",
}
},
name="Transform",
),
self._get_transform,
)
def _create_panel(self):
return pn.Row(
self.video_stream,
pn.layout.HSpacer(),
self.image,
pn.layout.HSpacer(),
sizing_mode="stretch_width",
align="center",
)
@pn.depends("height", "width", watch=True)
def _update_height_width(self):
self.image.height = self.height
self.image.width = self.width
@pn.depends("transform")
def _get_transform(self):
# Hack: returning self.transform stops working after browsing the transforms for a while
return self.transform.view
def __panel__(self):
return self._panel
@pn.depends("video_stream.value", watch=True)
def _handle_stream(self):
if self._updating:
return
self._updating = True
if self.transform and self.video_stream.value:
value = self.video_stream.value
try:
image = self.timer.time_it(
name="Transform",
func=self.transform.run,
image=value,
height=self.height,
width=self.width,
)
self.image.object = image
except PIL.UnidentifiedImageError:
print("unidentified image")
self.timer.inc_it("last update")
self._updating = False