File size: 1,339 Bytes
3afff35 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
from collections import deque
from threading import Thread
from cv2 import VideoCapture
from typing_extensions import Self
class Camera:
def __init__(self, capture_id: int=0, buffer_size: int=1):
self.capture_id = capture_id
self.buffer_size = buffer_size
self.capture: VideoCapture
self.capture_thread: Thread
self.buffer = deque([], buffer_size)
self.stop_capture = False
def __enter__(self) -> Self:
self.capture = VideoCapture(self.capture_id)
if not self.capture.isOpened():
raise IOError("Unable to open device.")
self.capture_thread = Thread(target=self.start_capture)
self.capture_thread.start()
return self
def __exit__(self, *_):
self.stop_capture = True
self.capture_thread.join()
self.capture.release()
def start_capture(self):
while True:
if self.stop_capture:
break
_, frame = self.capture.read()
if len(self.buffer) == self.buffer_size:
self.buffer.popleft()
self.buffer.append(frame)
def is_capturing(self) -> bool:
print(f"Filling buffer: {len(self.buffer)}/{self.buffer_size}")
return len(self.buffer) == self.buffer_size
|