Spaces:
Runtime error
Runtime error
import queue | |
import threading | |
import cv2 | |
class BufferlessVideoCapture: | |
def __init__(self, name, width=None, height=None): | |
self.cap = cv2.VideoCapture(name) | |
if width is not None and height is not None: | |
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) | |
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) | |
self.q = queue.Queue() | |
t = threading.Thread(target=self._reader) | |
t.daemon = True | |
t.start() | |
# read frames as soon as they are available, keeping only most recent one | |
def _reader(self): | |
while True: | |
ret, frame = self.cap.read() | |
if not ret: | |
break | |
if not self.q.empty(): | |
try: | |
self.q.get_nowait() # discard previous (unprocessed) frame | |
except queue.Empty: | |
pass | |
self.q.put((ret, frame)) | |
def read(self): | |
return self.q.get() | |