|
import cv2 |
|
import numpy as np |
|
from typing import Optional, Tuple, Callable |
|
import platform |
|
import threading |
|
|
|
|
|
if platform.system() == "Windows": |
|
from pygrabber.dshow_graph import FilterGraph |
|
|
|
|
|
class VideoCapturer: |
|
def __init__(self, device_index: int): |
|
self.device_index = device_index |
|
self.frame_callback = None |
|
self._current_frame = None |
|
self._frame_ready = threading.Event() |
|
self.is_running = False |
|
self.cap = None |
|
|
|
|
|
if platform.system() == "Windows": |
|
self.graph = FilterGraph() |
|
|
|
devices = self.graph.get_input_devices() |
|
if self.device_index >= len(devices): |
|
raise ValueError( |
|
f"Invalid device index {device_index}. Available devices: {len(devices)}" |
|
) |
|
|
|
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: |
|
"""Initialize and start video capture""" |
|
try: |
|
if platform.system() == "Windows": |
|
|
|
capture_methods = [ |
|
(self.device_index, cv2.CAP_DSHOW), |
|
(self.device_index, cv2.CAP_ANY), |
|
(-1, cv2.CAP_ANY), |
|
(0, cv2.CAP_ANY), |
|
] |
|
|
|
for dev_id, backend in capture_methods: |
|
try: |
|
self.cap = cv2.VideoCapture(dev_id, backend) |
|
if self.cap.isOpened(): |
|
break |
|
self.cap.release() |
|
except Exception: |
|
continue |
|
else: |
|
|
|
self.cap = cv2.VideoCapture(self.device_index) |
|
|
|
if not self.cap or not self.cap.isOpened(): |
|
raise RuntimeError("Failed to open camera") |
|
|
|
|
|
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) |
|
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) |
|
self.cap.set(cv2.CAP_PROP_FPS, fps) |
|
|
|
self.is_running = True |
|
return True |
|
|
|
except Exception as e: |
|
print(f"Failed to start capture: {str(e)}") |
|
if self.cap: |
|
self.cap.release() |
|
return False |
|
|
|
def read(self) -> Tuple[bool, Optional[np.ndarray]]: |
|
"""Read a frame from the camera""" |
|
if not self.is_running or self.cap is None: |
|
return False, None |
|
|
|
ret, frame = self.cap.read() |
|
if ret: |
|
self._current_frame = frame |
|
if self.frame_callback: |
|
self.frame_callback(frame) |
|
return True, frame |
|
return False, None |
|
|
|
def release(self) -> None: |
|
"""Stop capture and release resources""" |
|
if self.is_running and self.cap is not None: |
|
self.cap.release() |
|
self.is_running = False |
|
self.cap = None |
|
|
|
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: |
|
"""Set callback for frame processing""" |
|
self.frame_callback = callback |
|
|