Fucius's picture
Upload 422 files
df6c67d verified
raw
history blame
4 kB
import time
from enum import Enum
from typing import Generator, Iterable, Optional, Tuple, Union
import numpy as np
from inference.core.interfaces.camera.entities import (
FrameID,
FrameTimestamp,
VideoFrame,
)
from inference.core.interfaces.camera.video_source import SourceProperties, VideoSource
MINIMAL_FPS = 0.01
class FPSLimiterStrategy(Enum):
DROP = "drop"
WAIT = "wait"
def get_video_frames_generator(
video: Union[VideoSource, str, int],
max_fps: Optional[Union[float, int]] = None,
limiter_strategy: Optional[FPSLimiterStrategy] = None,
) -> Generator[VideoFrame, None, None]:
"""
Util function to create a frames generator from `VideoSource` with possibility to
limit FPS of consumed frames and dictate what to do if frames are produced to fast.
Args:
video (Union[VideoSource, str, int]): Either instance of VideoSource or video reference accepted
by VideoSource.init(...)
max_fps (Optional[Union[float, int]]): value of maximum FPS rate of generated frames - can be used to limit
generation frequency
limiter_strategy (Optional[FPSLimiterStrategy]): strategy used to deal with frames decoding exceeding
limit of `max_fps`. By default - for files, in the interest of processing all frames -
generation will be awaited, for streams - frames will be dropped on the floor.
Returns: generator of `VideoFrame`
Example:
```python
for frame in get_video_frames_generator(
video="./some.mp4",
max_fps=50,
):
pass
```
"""
if issubclass(type(video), str) or issubclass(type(video), int):
video = VideoSource.init(
video_reference=video,
)
video.start()
if max_fps is None:
yield from video
return None
limiter_strategy = resolve_limiter_strategy(
explicitly_defined_strategy=limiter_strategy,
source_properties=video.describe_source().source_properties,
)
yield from limit_frame_rate(
frames_generator=video, max_fps=max_fps, strategy=limiter_strategy
)
def resolve_limiter_strategy(
explicitly_defined_strategy: Optional[FPSLimiterStrategy],
source_properties: Optional[SourceProperties],
) -> FPSLimiterStrategy:
if explicitly_defined_strategy is not None:
return explicitly_defined_strategy
limiter_strategy = FPSLimiterStrategy.DROP
if source_properties is not None and source_properties.is_file:
limiter_strategy = FPSLimiterStrategy.WAIT
return limiter_strategy
def limit_frame_rate(
frames_generator: Iterable[Tuple[FrameTimestamp, FrameID, np.ndarray]],
max_fps: Union[float, int],
strategy: FPSLimiterStrategy,
) -> Generator[Tuple[FrameTimestamp, FrameID, np.ndarray], None, None]:
rate_limiter = RateLimiter(desired_fps=max_fps)
for frame_data in frames_generator:
delay = rate_limiter.estimate_next_action_delay()
if delay <= 0.0:
rate_limiter.tick()
yield frame_data
continue
if strategy is FPSLimiterStrategy.WAIT:
time.sleep(delay)
rate_limiter.tick()
yield frame_data
class RateLimiter:
"""
Implements rate upper-bound rate limiting by ensuring estimate_next_tick_delay()
to be at min 1 / desired_fps, not letting the client obeying outcomes to exceed
assumed rate.
"""
def __init__(self, desired_fps: Union[float, int]):
self._desired_fps = max(desired_fps, MINIMAL_FPS)
self._last_tick: Optional[float] = None
def tick(self) -> None:
self._last_tick = time.monotonic()
def estimate_next_action_delay(self) -> float:
if self._last_tick is None:
return 0.0
desired_delay = 1 / self._desired_fps
time_since_last_tick = time.monotonic() - self._last_tick
return max(desired_delay - time_since_last_tick, 0.0)