Fucius's picture
Upload 422 files
df6c67d verified
raw
history blame
No virus
5.52 kB
import os
import time
from threading import Thread
import cv2
from PIL import Image
from inference.core.logger import logger
class WebcamStream:
"""Class to handle webcam streaming using a separate thread.
Attributes:
stream_id (int): The ID of the webcam stream.
frame_id (int): A counter for the current frame.
vcap (VideoCapture): OpenCV video capture object.
width (int): The width of the video frame.
height (int): The height of the video frame.
fps_input_stream (int): Frames per second of the input stream.
grabbed (bool): A flag indicating if a frame was successfully grabbed.
frame (array): The current frame as a NumPy array.
pil_image (Image): The current frame as a PIL image.
stopped (bool): A flag indicating if the stream is stopped.
t (Thread): The thread used to update the stream.
"""
def __init__(self, stream_id=0, enforce_fps=False):
"""Initialize the webcam stream.
Args:
stream_id (int, optional): The ID of the webcam stream. Defaults to 0.
"""
self.stream_id = stream_id
self.enforce_fps = enforce_fps
self.frame_id = 0
self.vcap = cv2.VideoCapture(self.stream_id)
for key in os.environ:
if key.startswith("CV2_CAP_PROP"):
opencv_prop = key[4:]
opencv_constant = getattr(cv2, opencv_prop, None)
if opencv_constant is not None:
value = int(os.getenv(key))
self.vcap.set(opencv_constant, value)
logger.info(f"set {opencv_prop} to {value}")
else:
logger.warn(f"Property {opencv_prop} not found in cv2")
self.width = int(self.vcap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.vcap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.file_mode = self.vcap.get(cv2.CAP_PROP_FRAME_COUNT) > 0
if self.enforce_fps and not self.file_mode:
logger.warn(
"Ignoring enforce_fps flag for this stream. It is not compatible with streams and will cause the process to crash"
)
self.enforce_fps = False
self.max_fps = None
if self.vcap.isOpened() is False:
logger.debug("[Exiting]: Error accessing webcam stream.")
exit(0)
self.fps_input_stream = int(self.vcap.get(cv2.CAP_PROP_FPS))
logger.debug(
"FPS of webcam hardware/input stream: {}".format(self.fps_input_stream)
)
self.grabbed, self.frame = self.vcap.read()
self.pil_image = Image.fromarray(cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGB))
if self.grabbed is False:
logger.debug("[Exiting] No more frames to read")
exit(0)
self.stopped = True
self.t = Thread(target=self.update, args=())
self.t.daemon = True
def start(self):
"""Start the thread for reading frames."""
self.stopped = False
self.t.start()
def update(self):
"""Update the frame by reading from the webcam."""
frame_id = 0
next_frame_time = 0
t0 = time.perf_counter()
while True:
t1 = time.perf_counter()
if self.stopped is True:
break
self.grabbed = self.vcap.grab()
if self.grabbed is False:
logger.debug("[Exiting] No more frames to read")
self.stopped = True
break
frame_id += 1
# We can't retrieve each frame on nano and other lower powered devices quickly enough to keep up with the stream.
# By default, we will only retrieve frames when we'll be ready process them (determined by self.max_fps).
if t1 > next_frame_time:
ret, frame = self.vcap.retrieve()
if frame is None:
logger.debug("[Exiting] Frame not available for read")
self.stopped = True
break
logger.debug(
f"retrieved frame {frame_id}, effective FPS: {frame_id / (t1 - t0):.2f}"
)
self.frame_id = frame_id
self.frame = frame
while self.file_mode and self.enforce_fps and self.max_fps is None:
# sleep until we have processed the first frame and we know what our FPS should be
time.sleep(0.01)
if self.max_fps is None:
self.max_fps = 30
next_frame_time = t1 + (1 / self.max_fps) + 0.02
if self.file_mode:
t2 = time.perf_counter()
if self.enforce_fps:
# when enforce_fps is true, grab video frames 1:1 with inference speed
time_to_sleep = next_frame_time - t2
else:
# otherwise, grab at native FPS of the video file
time_to_sleep = (1 / self.fps_input_stream) - (t2 - t1)
if time_to_sleep > 0:
time.sleep(time_to_sleep)
self.vcap.release()
def read_opencv(self):
"""Read the current frame using OpenCV.
Returns:
array, int: The current frame as a NumPy array, and the frame ID.
"""
return self.frame, self.frame_id
def stop(self):
"""Stop the webcam stream."""
self.stopped = True