|
|
""" |
|
|
Vehicle Detection, Tracking, Counting, and Speed Estimation System |
|
|
=================================================================== |
|
|
|
|
|
A comprehensive computer vision pipeline for analyzing traffic videos, |
|
|
detecting vehicles, tracking their movement, counting them, and estimating |
|
|
their speeds using YOLO object detection and perspective transformation. |
|
|
|
|
|
Authors: |
|
|
- Abhay Gupta (0205CC221005) |
|
|
- Aditi Lakhera (0205CC221011) |
|
|
- Balraj Patel (0205CC221049) |
|
|
- Bhumika Patel (0205CC221050) |
|
|
|
|
|
Technical Approach: |
|
|
- YOLO for real-time object detection |
|
|
- ByteTrack for multi-object tracking |
|
|
- Perspective transformation for speed calculation |
|
|
- Line zones for vehicle counting |
|
|
""" |
|
|
|
|
|
import sys |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Dict, Optional, Callable |
|
|
from time import time |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import supervision as sv |
|
|
from ultralytics import YOLO |
|
|
|
|
|
from src import FrameAnnotator, VehicleSpeedEstimator, PerspectiveTransformer |
|
|
from src.exceptions import ( |
|
|
VideoProcessingError, |
|
|
ModelLoadError, |
|
|
ConfigurationError |
|
|
) |
|
|
from config import VehicleDetectionConfig |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class VehicleDetectionPipeline: |
|
|
""" |
|
|
Main pipeline for vehicle detection, tracking, counting, and speed estimation. |
|
|
|
|
|
This class orchestrates the entire processing workflow, from loading the model |
|
|
to processing each frame and generating the output video. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: VehicleDetectionConfig): |
|
|
""" |
|
|
Initialize the detection pipeline. |
|
|
|
|
|
Args: |
|
|
config: Configuration object with all parameters |
|
|
|
|
|
Raises: |
|
|
ModelLoadError: If model cannot be loaded |
|
|
ConfigurationError: If configuration is invalid |
|
|
""" |
|
|
self.config = config |
|
|
self.model = None |
|
|
self.tracker = None |
|
|
self.line_zone = None |
|
|
self.speed_estimator = None |
|
|
self.annotator = None |
|
|
self.video_info = None |
|
|
|
|
|
logger.info(f"Initializing pipeline with config: {config}") |
|
|
self._initialize_components() |
|
|
|
|
|
def _initialize_components(self) -> None: |
|
|
"""Initialize all pipeline components.""" |
|
|
try: |
|
|
|
|
|
logger.info(f"Loading YOLO model: {self.config.model_path}") |
|
|
self.model = YOLO(self.config.model_path) |
|
|
self.model.conf = self.config.confidence_threshold |
|
|
self.model.iou = self.config.iou_threshold |
|
|
logger.info("Model loaded successfully") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load model: {e}") |
|
|
raise ModelLoadError(f"Could not load YOLO model from {self.config.model_path}: {e}") |
|
|
|
|
|
def _setup_video_components(self, video_path: str) -> None: |
|
|
""" |
|
|
Set up video-specific components. |
|
|
|
|
|
Args: |
|
|
video_path: Path to input video |
|
|
|
|
|
Raises: |
|
|
VideoProcessingError: If video cannot be opened |
|
|
""" |
|
|
try: |
|
|
|
|
|
self.video_info = sv.VideoInfo.from_video_path(video_path) |
|
|
logger.info(f"Video info: {self.video_info.width}x{self.video_info.height} @ {self.video_info.fps}fps") |
|
|
|
|
|
|
|
|
self.tracker = sv.ByteTrack( |
|
|
frame_rate=self.video_info.fps, |
|
|
track_activation_threshold=self.config.confidence_threshold |
|
|
) |
|
|
logger.info("Tracker initialized") |
|
|
|
|
|
|
|
|
line_start = sv.Point( |
|
|
x=self.config.line_offset, |
|
|
y=self.config.line_y |
|
|
) |
|
|
line_end = sv.Point( |
|
|
x=self.video_info.width - self.config.line_offset, |
|
|
y=self.config.line_y |
|
|
) |
|
|
|
|
|
self.line_zone = sv.LineZone( |
|
|
start=line_start, |
|
|
end=line_end, |
|
|
triggering_anchors=(sv.Position.BOTTOM_CENTER,) |
|
|
) |
|
|
logger.info(f"Line zone created at y={self.config.line_y}") |
|
|
|
|
|
|
|
|
source_pts = np.array(self.config.source_points, dtype=np.float32) |
|
|
target_pts = np.array(self.config.target_points, dtype=np.float32) |
|
|
|
|
|
transformer = PerspectiveTransformer( |
|
|
source_points=source_pts, |
|
|
target_points=target_pts |
|
|
) |
|
|
logger.info("Perspective transformer initialized") |
|
|
|
|
|
|
|
|
self.speed_estimator = VehicleSpeedEstimator( |
|
|
fps=self.video_info.fps, |
|
|
transformer=transformer, |
|
|
history_duration=self.config.speed_history_seconds, |
|
|
speed_unit=self.config.speed_unit |
|
|
) |
|
|
logger.info("Speed estimator initialized") |
|
|
|
|
|
|
|
|
self.annotator = FrameAnnotator( |
|
|
video_resolution=(self.video_info.width, self.video_info.height), |
|
|
show_boxes=self.config.enable_boxes, |
|
|
show_labels=self.config.enable_labels, |
|
|
show_traces=self.config.enable_traces, |
|
|
show_line_zones=self.config.enable_line_zones, |
|
|
trace_length=self.config.trace_length, |
|
|
zone_polygon=source_pts |
|
|
) |
|
|
logger.info("Frame annotator initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to setup video components: {e}") |
|
|
raise VideoProcessingError(f"Error setting up video processing: {e}") |
|
|
|
|
|
def _process_single_frame(self, frame: np.ndarray) -> tuple: |
|
|
""" |
|
|
Process a single video frame. |
|
|
|
|
|
Args: |
|
|
frame: Input video frame |
|
|
|
|
|
Returns: |
|
|
Tuple of (annotated_frame, detections) |
|
|
""" |
|
|
|
|
|
results = self.model(frame, verbose=False)[0] |
|
|
detections = sv.Detections.from_ultralytics(results) |
|
|
|
|
|
|
|
|
detections = self.tracker.update_with_detections(detections) |
|
|
|
|
|
|
|
|
self.line_zone.trigger(detections) |
|
|
|
|
|
|
|
|
detections = self.speed_estimator.estimate(detections) |
|
|
|
|
|
|
|
|
labels = self._create_labels(detections) |
|
|
|
|
|
|
|
|
annotated_frame = self.annotator.draw_annotations( |
|
|
frame=frame, |
|
|
detections=detections, |
|
|
labels=labels, |
|
|
line_zones=[self.line_zone] |
|
|
) |
|
|
|
|
|
return annotated_frame, detections |
|
|
|
|
|
def _create_labels(self, detections: sv.Detections) -> list: |
|
|
""" |
|
|
Create display labels for detected vehicles. |
|
|
|
|
|
Args: |
|
|
detections: Detection results |
|
|
|
|
|
Returns: |
|
|
List of label strings |
|
|
""" |
|
|
labels = [] |
|
|
|
|
|
if not hasattr(detections, 'tracker_id') or detections.tracker_id is None: |
|
|
return labels |
|
|
|
|
|
for idx, tracker_id in enumerate(detections.tracker_id): |
|
|
|
|
|
class_name = "Vehicle" |
|
|
if "class_name" in detections.data: |
|
|
class_name = detections.data["class_name"][idx] |
|
|
|
|
|
|
|
|
speed_text = "" |
|
|
if "speed" in detections.data: |
|
|
speed = detections.data["speed"][idx] |
|
|
if speed > 0: |
|
|
speed_text = f" {speed:.0f}{self.config.speed_unit}" |
|
|
|
|
|
|
|
|
label = f"{class_name} #{tracker_id}{speed_text}" |
|
|
labels.append(label) |
|
|
|
|
|
return labels |
|
|
|
|
|
def process_video( |
|
|
self, |
|
|
progress_callback: Optional[Callable[[float], None]] = None |
|
|
) -> Dict: |
|
|
""" |
|
|
Process the entire video. |
|
|
|
|
|
Args: |
|
|
progress_callback: Optional callback for progress updates |
|
|
|
|
|
Returns: |
|
|
Dictionary with processing statistics |
|
|
|
|
|
Raises: |
|
|
VideoProcessingError: If video processing fails |
|
|
""" |
|
|
start_time = time() |
|
|
|
|
|
try: |
|
|
|
|
|
if not Path(self.config.input_video).exists(): |
|
|
raise VideoProcessingError(f"Input video not found: {self.config.input_video}") |
|
|
|
|
|
|
|
|
self._setup_video_components(self.config.input_video) |
|
|
|
|
|
|
|
|
output_path = Path(self.config.output_video) |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
frame_count = 0 |
|
|
total_frames = self.video_info.total_frames or 0 |
|
|
all_speeds = [] |
|
|
|
|
|
|
|
|
if self.config.display_enabled: |
|
|
try: |
|
|
cv2.namedWindow(self.config.window_name, cv2.WINDOW_NORMAL) |
|
|
cv2.resizeWindow( |
|
|
self.config.window_name, |
|
|
self.video_info.width, |
|
|
self.video_info.height |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not create display window (headless environment?): {e}") |
|
|
self.config.display_enabled = False |
|
|
|
|
|
|
|
|
logger.info("Starting video processing...") |
|
|
frame_generator = sv.get_video_frames_generator(self.config.input_video) |
|
|
|
|
|
with sv.VideoSink(self.config.output_video, self.video_info) as sink: |
|
|
for frame in frame_generator: |
|
|
try: |
|
|
|
|
|
annotated_frame, detections = self._process_single_frame(frame) |
|
|
|
|
|
|
|
|
if "speed" in detections.data: |
|
|
speeds = detections.data["speed"] |
|
|
all_speeds.extend([s for s in speeds if s > 0]) |
|
|
|
|
|
|
|
|
sink.write_frame(annotated_frame) |
|
|
|
|
|
|
|
|
if self.config.display_enabled: |
|
|
cv2.imshow(self.config.window_name, annotated_frame) |
|
|
|
|
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'): |
|
|
logger.info("Processing interrupted by user") |
|
|
break |
|
|
|
|
|
|
|
|
if cv2.getWindowProperty( |
|
|
self.config.window_name, |
|
|
cv2.WND_PROP_VISIBLE |
|
|
) < 1: |
|
|
logger.info("Window closed by user") |
|
|
break |
|
|
|
|
|
|
|
|
frame_count += 1 |
|
|
if progress_callback and total_frames > 0: |
|
|
progress = frame_count / total_frames |
|
|
progress_callback(progress) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error processing frame {frame_count}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if self.config.display_enabled: |
|
|
cv2.destroyAllWindows() |
|
|
|
|
|
|
|
|
processing_time = time() - start_time |
|
|
stats = { |
|
|
'total_count': self.line_zone.in_count + self.line_zone.out_count, |
|
|
'in_count': self.line_zone.in_count, |
|
|
'out_count': self.line_zone.out_count, |
|
|
'avg_speed': np.mean(all_speeds) if all_speeds else 0.0, |
|
|
'max_speed': np.max(all_speeds) if all_speeds else 0.0, |
|
|
'min_speed': np.min(all_speeds) if all_speeds else 0.0, |
|
|
'frames_processed': frame_count, |
|
|
'processing_time': processing_time, |
|
|
'fps': frame_count / processing_time if processing_time > 0 else 0 |
|
|
} |
|
|
|
|
|
logger.info(f"Processing complete: {frame_count} frames in {processing_time:.2f}s") |
|
|
logger.info(f"Vehicles counted: {stats['total_count']} (In: {stats['in_count']}, Out: {stats['out_count']})") |
|
|
|
|
|
return stats |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video processing failed: {e}", exc_info=True) |
|
|
raise VideoProcessingError(f"Failed to process video: {e}") |
|
|
|
|
|
|
|
|
def process_video( |
|
|
config: VehicleDetectionConfig, |
|
|
progress_callback: Optional[Callable[[float], None]] = None |
|
|
) -> Dict: |
|
|
""" |
|
|
Convenience function to process a video with given configuration. |
|
|
|
|
|
Args: |
|
|
config: Configuration object |
|
|
progress_callback: Optional progress callback |
|
|
|
|
|
Returns: |
|
|
Processing statistics dictionary |
|
|
""" |
|
|
pipeline = VehicleDetectionPipeline(config) |
|
|
return pipeline.process_video(progress_callback) |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main entry point for CLI usage.""" |
|
|
try: |
|
|
logger.info("=" * 60) |
|
|
logger.info("Vehicle Speed Estimation & Counting System") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
|
|
|
config = VehicleDetectionConfig() |
|
|
logger.info(f"Configuration: {config}") |
|
|
|
|
|
|
|
|
stats = process_video(config) |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("PROCESSING RESULTS") |
|
|
print("=" * 60) |
|
|
print(f"Output saved to: {config.output_video}") |
|
|
print(f"\nVehicle Count:") |
|
|
print(f" Total: {stats['total_count']}") |
|
|
print(f" In: {stats['in_count']}") |
|
|
print(f" Out: {stats['out_count']}") |
|
|
print(f"\nSpeed Statistics ({config.speed_unit}):") |
|
|
print(f" Average: {stats['avg_speed']:.1f}") |
|
|
print(f" Maximum: {stats['max_speed']:.1f}") |
|
|
print(f" Minimum: {stats['min_speed']:.1f}") |
|
|
print(f"\nProcessing Info:") |
|
|
print(f" Frames: {stats['frames_processed']}") |
|
|
print(f" Time: {stats['processing_time']:.2f}s") |
|
|
print(f" FPS: {stats['fps']:.1f}") |
|
|
print("=" * 60) |
|
|
|
|
|
return 0 |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
logger.info("Processing interrupted by user") |
|
|
return 1 |
|
|
except Exception as e: |
|
|
logger.error(f"Fatal error: {e}", exc_info=True) |
|
|
print(f"\n❌ Error: {e}", file=sys.stderr) |
|
|
return 1 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
sys.exit(main()) |
|
|
|