Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| from ultralytics import YOLO | |
| import supervision as sv | |
| from typing import List, Dict, Optional, Tuple | |
| import time | |
| import logging | |
| import os | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class QueueMonitor: | |
| def __init__(self, weights: str = "yolov8s.pt", confidence: float = 0.3, fps: float = 30.0, hf_token: str = None): | |
| try: | |
| if hf_token is None: | |
| hf_token = os.getenv("HF_TOKEN") | |
| self.hf_token = hf_token | |
| self.model = YOLO(weights) | |
| self.tracker = sv.ByteTrack() | |
| self.confidence = confidence | |
| self.fps = fps | |
| self.frame_count = 0 | |
| self.colors = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"]) | |
| self.color_annotator = sv.ColorAnnotator(color=self.colors) | |
| self.label_annotator = sv.LabelAnnotator( | |
| color=self.colors, text_color=sv.Color.from_hex("#000000") | |
| ) | |
| self.zones = [] | |
| self.time_in_zone_trackers = {} | |
| self.zone_annotators = [] | |
| logger.info(f"QueueMonitor initialized with model: {weights}, confidence: {confidence}") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize QueueMonitor: {e}") | |
| raise | |
| def setup_zones(self, polygons: List[np.ndarray]): | |
| try: | |
| if not polygons or len(polygons) == 0: | |
| raise ValueError("At least one zone polygon is required") | |
| self.zones = [] | |
| self.zone_annotators = [] | |
| self.time_tracking = {} | |
| for idx, polygon in enumerate(polygons): | |
| if polygon.shape[0] < 3: | |
| raise ValueError(f"Zone {idx} polygon must have at least 3 points") | |
| zone = sv.PolygonZone( | |
| polygon=polygon, | |
| triggering_anchors=(sv.Position.CENTER,), | |
| ) | |
| self.zones.append(zone) | |
| zone_annotator = sv.PolygonZoneAnnotator( | |
| zone=zone, | |
| color=self.colors.by_idx(idx), | |
| thickness=2, | |
| text_thickness=2, | |
| text_scale=0.5 | |
| ) | |
| self.zone_annotators.append(zone_annotator) | |
| self.time_tracking[idx] = {} | |
| logger.info(f"Setup {len(self.zones)} zones successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to setup zones: {e}") | |
| raise | |
| def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, Dict]: | |
| try: | |
| if frame is None or frame.size == 0: | |
| raise ValueError("Invalid frame: frame is None or empty") | |
| if len(self.zones) == 0: | |
| raise ValueError("No zones configured. Please setup zones first.") | |
| self.frame_count += 1 | |
| current_time = self.frame_count / self.fps if self.fps > 0 else self.frame_count | |
| results = self.model(frame, verbose=False, conf=self.confidence)[0] | |
| detections = sv.Detections.from_ultralytics(results) | |
| detections = detections[detections.class_id == 0] | |
| if len(detections) == 0: | |
| detections = self.tracker.update_with_detections(detections) | |
| else: | |
| detections = self.tracker.update_with_detections(detections) | |
| annotated_frame = frame.copy() | |
| zone_stats = [] | |
| for idx, (zone, zone_annotator) in enumerate(zip(self.zones, self.zone_annotators)): | |
| try: | |
| annotated_frame = zone_annotator.annotate(scene=annotated_frame) | |
| mask = zone.trigger(detections) | |
| detections_in_zone = detections[mask] | |
| current_count = len(detections_in_zone) | |
| tracker_ids = detections_in_zone.tracker_id.tolist() if detections_in_zone.tracker_id is not None else [] | |
| frame_time = 1.0 / self.fps if self.fps > 0 else 1.0 | |
| current_trackers_in_zone = set(tracker_ids) | |
| zone_time_tracking = self.time_tracking[idx] | |
| for tid in current_trackers_in_zone: | |
| if tid not in zone_time_tracking: | |
| zone_time_tracking[tid] = {"start_time": current_time, "total_time": 0.0, "visits": 1} | |
| else: | |
| zone_time_tracking[tid]["total_time"] += frame_time | |
| for tid in list(zone_time_tracking.keys()): | |
| if tid not in current_trackers_in_zone: | |
| if zone_time_tracking[tid]["total_time"] <= 0: | |
| del zone_time_tracking[tid] | |
| time_data = {} | |
| for tid in tracker_ids: | |
| if tid in zone_time_tracking: | |
| time_data[str(tid)] = round(zone_time_tracking[tid]["total_time"], 2) | |
| time_values = [tracking["total_time"] for tracking in zone_time_tracking.values()] | |
| avg_time = np.mean(time_values) if time_values else 0.0 | |
| max_time = max(time_values) if time_values else 0.0 | |
| total_unique_visits = sum(tracking.get("visits", 1) for tracking in zone_time_tracking.values()) | |
| zone_stats.append({ | |
| "zone_id": idx, | |
| "count": current_count, | |
| "tracker_ids": tracker_ids, | |
| "time_in_zone_seconds": time_data, | |
| "avg_time_seconds": round(avg_time, 2), | |
| "max_time_seconds": round(max_time, 2), | |
| "total_visits": total_unique_visits | |
| }) | |
| if len(detections_in_zone) > 0: | |
| custom_color_lookup = np.full(detections_in_zone.class_id.shape, idx) | |
| annotated_frame = self.color_annotator.annotate( | |
| scene=annotated_frame, | |
| detections=detections_in_zone, | |
| custom_color_lookup=custom_color_lookup, | |
| ) | |
| if detections_in_zone.tracker_id is not None: | |
| labels = [] | |
| for tid in detections_in_zone.tracker_id: | |
| time_str = f"{time_data.get(str(tid), 0):.1f}s" if str(tid) in time_data else f"#{tid}" | |
| labels.append(f"#{tid} ({time_str})") | |
| annotated_frame = self.label_annotator.annotate( | |
| scene=annotated_frame, | |
| detections=detections_in_zone, | |
| labels=labels, | |
| custom_color_lookup=custom_color_lookup, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Error processing zone {idx}: {e}") | |
| zone_stats.append({ | |
| "zone_id": idx, | |
| "count": 0, | |
| "tracker_ids": [], | |
| "time_in_zone_seconds": {}, | |
| "avg_time_seconds": 0.0, | |
| "max_time_seconds": 0.0, | |
| "total_visits": 0, | |
| "error": str(e) | |
| }) | |
| return annotated_frame, zone_stats | |
| except Exception as e: | |
| logger.error(f"Error processing frame: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| # Example usage with a dummy frame | |
| monitor = QueueMonitor() | |
| dummy_frame = np.zeros((720, 1280, 3), dtype=np.uint8) | |
| # Define a simple rectangular zone | |
| polygon = np.array([[100, 100], [600, 100], [600, 600], [100, 600]]) | |
| monitor.setup_zones([polygon]) | |
| processed, stats = monitor.process_frame(dummy_frame) | |
| print(f"Stats: {stats}") | |