Spaces:
Sleeping
Sleeping
File size: 10,588 Bytes
dde0b86 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
import cv2
import threading
import queue
import time
from typing import Optional, Callable, Union
import numpy as np
class CameraManager:
"""
Manages video capture from various sources including webcams, IP cameras, and video files.
Provides threaded video capture for real-time processing.
"""
def __init__(self, source: Union[int, str] = 0, buffer_size: int = 10):
"""
Initialize camera manager.
Args:
source: Camera source (0 for default webcam, URL for IP camera, path for video file)
buffer_size: Size of frame buffer for threading
"""
self.source = source
self.buffer_size = buffer_size
self.cap = None
self.frame_queue = queue.Queue(maxsize=buffer_size)
self.capture_thread = None
self.is_running = False
self.fps = 60 # Higher FPS target
self.frame_width = 640
self.frame_height = 480
def connect(self) -> bool:
"""
Connect to the video source.
Returns:
True if connection successful, False otherwise
"""
try:
self.cap = cv2.VideoCapture(self.source)
if not self.cap.isOpened():
print(f"Error: Could not open video source: {self.source}")
return False
# Set camera properties for higher performance
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_height)
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer to minimize delay
# Additional optimizations for higher FPS
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # Use MJPEG for speed
self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25) # Disable auto exposure for consistent timing
# Get actual properties
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
print(f"Connected to camera: {self.frame_width}x{self.frame_height} @ {self.fps}fps")
return True
except Exception as e:
print(f"Error connecting to camera: {e}")
return False
def start_capture(self) -> bool:
"""
Start threaded video capture.
Returns:
True if capture started successfully, False otherwise
"""
if not self.cap or not self.cap.isOpened():
if not self.connect():
return False
if self.is_running:
print("Capture is already running")
return True
self.is_running = True
self.capture_thread = threading.Thread(target=self._capture_frames, daemon=True)
self.capture_thread.start()
print("Video capture started")
return True
def stop_capture(self):
"""Stop video capture and clean up resources."""
self.is_running = False
if self.capture_thread and self.capture_thread.is_alive():
self.capture_thread.join(timeout=2.0)
if self.cap:
self.cap.release()
self.cap = None
# Clear the frame queue
while not self.frame_queue.empty():
try:
self.frame_queue.get_nowait()
except queue.Empty:
break
print("Video capture stopped")
def _capture_frames(self):
"""Internal method to capture frames in a separate thread."""
while self.is_running and self.cap and self.cap.isOpened():
try:
ret, frame = self.cap.read()
if not ret:
print("Failed to capture frame")
if isinstance(self.source, str) and not self.source.isdigit():
# For video files, we might have reached the end
print("Reached end of video file")
break
continue
# Add timestamp to frame
timestamp = time.time()
# If queue is full, remove oldest frame
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except queue.Empty:
pass
# Add new frame to queue
self.frame_queue.put((frame, timestamp), block=False)
except Exception as e:
print(f"Error in frame capture: {e}")
time.sleep(0.1)
self.is_running = False
def get_frame(self) -> Optional[tuple]:
"""
Get the latest frame from the capture queue.
Returns:
Tuple of (frame, timestamp) or None if no frame available
"""
try:
return self.frame_queue.get_nowait()
except queue.Empty:
return None
def get_latest_frame(self) -> Optional[tuple]:
"""
Get the most recent frame, discarding any older frames in the queue.
Returns:
Tuple of (frame, timestamp) or None if no frame available
"""
latest_frame = None
# Get all frames and keep only the latest
while True:
try:
frame_data = self.frame_queue.get_nowait()
latest_frame = frame_data
except queue.Empty:
break
return latest_frame
def is_connected(self) -> bool:
"""
Check if camera is connected and capturing.
Returns:
True if connected and running, False otherwise
"""
return self.is_running and self.cap is not None and self.cap.isOpened()
def get_properties(self) -> dict:
"""
Get camera properties.
Returns:
Dictionary of camera properties
"""
if not self.cap:
return {}
return {
'width': self.frame_width,
'height': self.frame_height,
'fps': self.fps,
'source': self.source,
'is_running': self.is_running,
'buffer_size': self.buffer_size
}
def set_resolution(self, width: int, height: int) -> bool:
"""
Set camera resolution.
Args:
width: Frame width
height: Frame height
Returns:
True if successful, False otherwise
"""
if not self.cap:
return False
try:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
# Verify the change
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.frame_width = actual_width
self.frame_height = actual_height
print(f"Resolution set to: {actual_width}x{actual_height}")
return True
except Exception as e:
print(f"Error setting resolution: {e}")
return False
def __enter__(self):
"""Context manager entry."""
self.start_capture()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.stop_capture()
class MultiCameraManager:
"""
Manages multiple camera sources simultaneously.
"""
def __init__(self):
self.cameras = {}
self.is_running = False
def add_camera(self, camera_id: str, source: Union[int, str],
buffer_size: int = 10) -> bool:
"""
Add a camera to the manager.
Args:
camera_id: Unique identifier for the camera
source: Camera source
buffer_size: Frame buffer size
Returns:
True if camera added successfully, False otherwise
"""
try:
camera = CameraManager(source, buffer_size)
if camera.connect():
self.cameras[camera_id] = camera
print(f"Camera '{camera_id}' added successfully")
return True
else:
print(f"Failed to add camera '{camera_id}'")
return False
except Exception as e:
print(f"Error adding camera '{camera_id}': {e}")
return False
def remove_camera(self, camera_id: str):
"""Remove a camera from the manager."""
if camera_id in self.cameras:
self.cameras[camera_id].stop_capture()
del self.cameras[camera_id]
print(f"Camera '{camera_id}' removed")
def start_all(self):
"""Start capture for all cameras."""
for camera_id, camera in self.cameras.items():
if camera.start_capture():
print(f"Started capture for camera '{camera_id}'")
else:
print(f"Failed to start capture for camera '{camera_id}'")
self.is_running = True
def stop_all(self):
"""Stop capture for all cameras."""
for camera_id, camera in self.cameras.items():
camera.stop_capture()
print(f"Stopped capture for camera '{camera_id}'")
self.is_running = False
def get_frame(self, camera_id: str) -> Optional[tuple]:
"""Get frame from specific camera."""
if camera_id in self.cameras:
return self.cameras[camera_id].get_frame()
return None
def get_all_frames(self) -> dict:
"""Get frames from all cameras."""
frames = {}
for camera_id, camera in self.cameras.items():
frame_data = camera.get_latest_frame()
if frame_data:
frames[camera_id] = frame_data
return frames
def get_camera_list(self) -> list:
"""Get list of all camera IDs."""
return list(self.cameras.keys()) |