fix: keyframe images, video clips, evidence images, live stream webcam+URL, remove demo mode
fd50325 verified | """ | |
| Event Aggregation and Deduplication Module | |
| This module handles: | |
| - Event detection and clustering | |
| - Temporal aggregation of related events | |
| - Duplicate frame removal using similarity detection | |
| - Canonical event generation | |
| """ | |
| import numpy as np | |
| import cv2 | |
| import json | |
| import os | |
| from typing import List, Dict, Tuple, Set, Any, Optional | |
| from dataclasses import dataclass, asdict | |
| import imagehash | |
| from PIL import Image | |
| from collections import defaultdict | |
| import logging | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class Event: | |
| """Represents a detected event""" | |
| event_id: str | |
| start_timestamp: float | |
| end_timestamp: float | |
| event_type: str | |
| confidence: float | |
| keyframes: List[str] # Frame paths | |
| importance_score: float | |
| motion_intensity: float | |
| description: str = "" | |
| # Object detection specific fields | |
| object_class: str = "" # For object-based events (fire, knife, gun) | |
| detection_count: int = 0 # Number of detections in this event | |
| max_confidence: float = 0.0 # Highest confidence detection | |
| is_object_event: bool = False # Flag to identify object-based events | |
| detection_details: List = None # Raw detection data | |
| class CanonicalEvent: | |
| """Canonical representation of aggregated events""" | |
| canonical_id: str | |
| event_type: str | |
| representative_frame: str | |
| start_time: float | |
| end_time: float | |
| duration: float | |
| confidence: float | |
| frame_count: int | |
| aggregated_events: List[str] # Event IDs | |
| description: str | |
| similarity_cluster: int | |
| # Enhanced object detection fields | |
| contains_objects: bool = False # Whether this canonical event has object detections | |
| detected_object_classes: List[str] = None # List of detected object classes | |
| object_detection_summary: Dict = None # Summary of object detections | |
| threat_level: str = "low" # Threat assessment: low, medium, high, critical | |
| class SimilarityCalculator: | |
| """Calculate similarity between frames using multiple methods""" | |
| def __init__(self, similarity_threshold: float = 0.85): | |
| self.similarity_threshold = similarity_threshold | |
| def calculate_histogram_similarity(self, frame1: np.ndarray, frame2: np.ndarray) -> float: | |
| """Calculate histogram-based similarity""" | |
| try: | |
| # Convert to HSV for better color comparison | |
| hsv1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2HSV) | |
| hsv2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2HSV) | |
| # Calculate histograms | |
| hist1 = cv2.calcHist([hsv1], [0, 1, 2], None, [50, 60, 60], [0, 180, 0, 256, 0, 256]) | |
| hist2 = cv2.calcHist([hsv2], [0, 1, 2], None, [50, 60, 60], [0, 180, 0, 256, 0, 256]) | |
| # Calculate correlation | |
| correlation = cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL) | |
| return max(0.0, correlation) | |
| except Exception as e: | |
| logger.error(f"Histogram similarity calculation failed: {e}") | |
| return 0.0 | |
| def calculate_perceptual_hash_similarity(self, frame1_path: str, frame2_path: str) -> float: | |
| """Calculate perceptual hash similarity""" | |
| try: | |
| # Load images with PIL for imagehash | |
| img1 = Image.open(frame1_path) | |
| img2 = Image.open(frame2_path) | |
| # Calculate perceptual hashes | |
| hash1 = imagehash.phash(img1) | |
| hash2 = imagehash.phash(img2) | |
| # Calculate similarity (lower hash difference = higher similarity) | |
| hash_diff = hash1 - hash2 | |
| similarity = 1.0 - (hash_diff / 64.0) # Normalize to 0-1 | |
| return max(0.0, similarity) | |
| except Exception as e: | |
| logger.error(f"Perceptual hash similarity calculation failed: {e}") | |
| return 0.0 | |
| def calculate_structural_similarity(self, frame1: np.ndarray, frame2: np.ndarray) -> float: | |
| """Calculate structural similarity using template matching""" | |
| try: | |
| # Convert to grayscale | |
| gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) | |
| gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) | |
| # Resize to same dimensions if needed | |
| if gray1.shape != gray2.shape: | |
| h, w = min(gray1.shape[0], gray2.shape[0]), min(gray1.shape[1], gray2.shape[1]) | |
| gray1 = cv2.resize(gray1, (w, h)) | |
| gray2 = cv2.resize(gray2, (w, h)) | |
| # Calculate normalized cross-correlation | |
| result = cv2.matchTemplate(gray1, gray2, cv2.TM_CCOEFF_NORMED) | |
| similarity = result[0, 0] | |
| return max(0.0, similarity) | |
| except Exception as e: | |
| logger.error(f"Structural similarity calculation failed: {e}") | |
| return 0.0 | |
| def calculate_combined_similarity(self, frame1_path: str, frame2_path: str) -> float: | |
| """Calculate combined similarity score using multiple methods""" | |
| try: | |
| # Load frames | |
| frame1 = cv2.imread(frame1_path) | |
| frame2 = cv2.imread(frame2_path) | |
| if frame1 is None or frame2 is None: | |
| return 0.0 | |
| # Calculate different similarity metrics | |
| hist_sim = self.calculate_histogram_similarity(frame1, frame2) | |
| hash_sim = self.calculate_perceptual_hash_similarity(frame1_path, frame2_path) | |
| struct_sim = self.calculate_structural_similarity(frame1, frame2) | |
| # Weighted combination | |
| combined_similarity = ( | |
| hist_sim * 0.4 + # Histogram similarity | |
| hash_sim * 0.4 + # Perceptual hash similarity | |
| struct_sim * 0.2 # Structural similarity | |
| ) | |
| return min(1.0, combined_similarity) | |
| except Exception as e: | |
| logger.error(f"Combined similarity calculation failed: {e}") | |
| return 0.0 | |
| class EventDetector: | |
| """Detect events from keyframes""" | |
| def __init__(self, config): | |
| self.config = config | |
| self.event_types = { | |
| 'high_motion': {'motion_threshold': config.motion_threshold * 2}, | |
| 'burst_activity': {'requires_burst': True}, | |
| 'scene_change': {'change_threshold': config.scene_change_threshold}, | |
| 'quality_peak': {'quality_threshold': config.base_quality_threshold * 1.5} | |
| } | |
| def detect_events(self, keyframes: List) -> List[Event]: | |
| """Detect events from keyframes""" | |
| logger.info(f"Detecting events from {len(keyframes)} keyframes") | |
| events = [] | |
| event_id_counter = 1 | |
| # Temporal clustering for event detection | |
| clusters = self._create_temporal_clusters(keyframes) | |
| for cluster in clusters: | |
| if len(cluster) == 0: | |
| continue | |
| # Analyze cluster for event types | |
| cluster_events = self._analyze_cluster_for_events(cluster, event_id_counter) | |
| events.extend(cluster_events) | |
| event_id_counter += len(cluster_events) | |
| logger.info(f"Detected {len(events)} events") | |
| return events | |
| def _create_temporal_clusters(self, keyframes: List) -> List[List]: | |
| """Create temporal clusters of keyframes""" | |
| if not keyframes: | |
| return [] | |
| # Sort keyframes by timestamp | |
| sorted_keyframes = sorted(keyframes, key=lambda x: x.frame_data.timestamp) | |
| clusters = [] | |
| current_cluster = [sorted_keyframes[0]] | |
| for i in range(1, len(sorted_keyframes)): | |
| current_kf = sorted_keyframes[i] | |
| last_kf = current_cluster[-1] | |
| time_gap = current_kf.frame_data.timestamp - last_kf.frame_data.timestamp | |
| # If gap is within clustering window, add to current cluster | |
| if time_gap <= self.config.temporal_clustering_window: | |
| current_cluster.append(current_kf) | |
| else: | |
| # Start new cluster | |
| if len(current_cluster) > 0: | |
| clusters.append(current_cluster) | |
| current_cluster = [current_kf] | |
| # Don't forget the last cluster | |
| if len(current_cluster) > 0: | |
| clusters.append(current_cluster) | |
| return clusters | |
| def _analyze_cluster_for_events(self, cluster: List, start_event_id: int) -> List[Event]: | |
| """Analyze a temporal cluster for different event types""" | |
| events = [] | |
| if not cluster: | |
| return events | |
| # Calculate cluster metrics | |
| motion_scores = [kf.frame_data.motion_score for kf in cluster] | |
| quality_scores = [kf.frame_data.quality_score for kf in cluster] | |
| burst_frames = [kf for kf in cluster if kf.frame_data.burst_active] | |
| start_time = min(kf.frame_data.timestamp for kf in cluster) | |
| end_time = max(kf.frame_data.timestamp for kf in cluster) | |
| max_motion = max(motion_scores) if motion_scores else 0 | |
| avg_motion = sum(motion_scores) / len(motion_scores) if motion_scores else 0 | |
| max_quality = max(quality_scores) if quality_scores else 0 | |
| # High motion event | |
| if max_motion > self.config.motion_threshold * 2: | |
| event = Event( | |
| event_id=f"event_{start_event_id:04d}", | |
| start_timestamp=start_time, | |
| end_timestamp=end_time, | |
| event_type="high_motion", | |
| confidence=min(max_motion * 2, 1.0), | |
| keyframes=[kf.frame_data.frame_path for kf in cluster], | |
| importance_score=max_motion + (avg_motion * 0.5), | |
| motion_intensity=max_motion, | |
| description=f"High motion event with peak intensity {max_motion:.3f}" | |
| ) | |
| events.append(event) | |
| start_event_id += 1 | |
| # Burst activity event | |
| if len(burst_frames) >= 2: | |
| event = Event( | |
| event_id=f"event_{start_event_id:04d}", | |
| start_timestamp=start_time, | |
| end_timestamp=end_time, | |
| event_type="burst_activity", | |
| confidence=min(len(burst_frames) / len(cluster), 1.0), | |
| keyframes=[kf.frame_data.frame_path for kf in burst_frames], | |
| importance_score=len(burst_frames) * 0.3 + avg_motion, | |
| motion_intensity=max_motion, | |
| description=f"Burst activity with {len(burst_frames)} active frames" | |
| ) | |
| events.append(event) | |
| start_event_id += 1 | |
| # Quality peak event | |
| if max_quality > self.config.base_quality_threshold * 1.5: | |
| high_quality_frames = [kf for kf in cluster if kf.frame_data.quality_score > self.config.base_quality_threshold * 1.3] | |
| if high_quality_frames: | |
| event = Event( | |
| event_id=f"event_{start_event_id:04d}", | |
| start_timestamp=start_time, | |
| end_timestamp=end_time, | |
| event_type="quality_peak", | |
| confidence=max_quality, | |
| keyframes=[kf.frame_data.frame_path for kf in high_quality_frames], | |
| importance_score=max_quality + (len(high_quality_frames) * 0.1), | |
| motion_intensity=max_motion, | |
| description=f"High quality event with peak score {max_quality:.3f}" | |
| ) | |
| events.append(event) | |
| return events | |
| def convert_object_events_to_standard_format(self, object_events: List[Dict]) -> List[Event]: | |
| """Convert object events from object detection module to standard Event format""" | |
| standard_events = [] | |
| for obj_event in object_events: | |
| # Convert object event dict to Event dataclass | |
| event = Event( | |
| event_id=obj_event['event_id'], | |
| start_timestamp=obj_event['start_timestamp'], | |
| end_timestamp=obj_event['end_timestamp'], | |
| event_type=obj_event['event_type'], | |
| confidence=obj_event['confidence'], | |
| keyframes=obj_event['keyframes'], | |
| importance_score=obj_event['importance_score'], | |
| motion_intensity=obj_event.get('motion_intensity', 0.0), | |
| description=obj_event['description'], | |
| # Object-specific fields | |
| object_class=obj_event.get('object_class', ''), | |
| detection_count=obj_event.get('detection_count', 0), | |
| max_confidence=obj_event.get('max_confidence', obj_event['confidence']), | |
| is_object_event=True, | |
| detection_details=obj_event.get('detection_details', []) | |
| ) | |
| standard_events.append(event) | |
| return standard_events | |
| def convert_behavior_events_to_standard_format(self, behavior_events: List) -> List[Event]: | |
| """Convert behavior events from behavior analysis module to standard Event format""" | |
| standard_events = [] | |
| for behavior_event in behavior_events: | |
| # Handle both dataclass and dict formats | |
| if hasattr(behavior_event, 'behavior_type'): | |
| # Dataclass format (from BehaviorEvent) | |
| event = Event( | |
| event_id=behavior_event.event_id, | |
| start_timestamp=behavior_event.start_timestamp, | |
| end_timestamp=behavior_event.end_timestamp, | |
| event_type=f"behavior_{behavior_event.behavior_type}", | |
| confidence=behavior_event.confidence, | |
| keyframes=behavior_event.keyframes, | |
| importance_score=behavior_event.importance_score, | |
| motion_intensity=0.0, # Behavior events don't have motion intensity | |
| description=f"{behavior_event.behavior_type.capitalize()} detected (confidence: {behavior_event.confidence:.2f})", | |
| # Use object_class field to store behavior type for consistency | |
| object_class=behavior_event.behavior_type, | |
| detection_count=len(behavior_event.frame_indices), | |
| max_confidence=behavior_event.confidence, | |
| is_object_event=False, # Behavior events are separate from object events | |
| detection_details=[{ | |
| 'model_used': behavior_event.model_used, | |
| 'frame_indices': behavior_event.frame_indices | |
| }] | |
| ) | |
| else: | |
| # Dict format (fallback) | |
| event = Event( | |
| event_id=behavior_event.get('event_id', f"behavior_{len(standard_events)}"), | |
| start_timestamp=behavior_event.get('start_timestamp', 0.0), | |
| end_timestamp=behavior_event.get('end_timestamp', 0.0), | |
| event_type=f"behavior_{behavior_event.get('behavior_type', 'unknown')}", | |
| confidence=behavior_event.get('confidence', 0.0), | |
| keyframes=behavior_event.get('keyframes', []), | |
| importance_score=behavior_event.get('importance_score', 0.0), | |
| motion_intensity=0.0, | |
| description=behavior_event.get('description', 'Behavior detected'), | |
| object_class=behavior_event.get('behavior_type', ''), | |
| detection_count=len(behavior_event.get('frame_indices', [])), | |
| max_confidence=behavior_event.get('confidence', 0.0), | |
| is_object_event=False, | |
| detection_details=[{ | |
| 'model_used': behavior_event.get('model_used', 'unknown'), | |
| 'frame_indices': behavior_event.get('frame_indices', []) | |
| }] | |
| ) | |
| standard_events.append(event) | |
| return standard_events | |
| def assess_threat_level(self, event: Event) -> str: | |
| """Assess threat level for events, particularly object-based events""" | |
| if not event.is_object_event: | |
| # For motion events, use motion intensity and burst activity | |
| if event.event_type == "high_motion" and event.motion_intensity > 0.015: | |
| return "medium" | |
| elif event.event_type == "burst_activity": | |
| return "medium" | |
| else: | |
| return "low" | |
| # Object-based threat assessment | |
| threat_map = { | |
| 'fire': { | |
| 'low': 0.3, # Confidence thresholds | |
| 'medium': 0.5, | |
| 'high': 0.7, | |
| 'critical': 0.85 | |
| }, | |
| 'gun': { | |
| 'low': 0.4, | |
| 'medium': 0.6, | |
| 'high': 0.8, | |
| 'critical': 0.9 | |
| }, | |
| 'knife': { | |
| 'low': 0.4, | |
| 'medium': 0.6, | |
| 'high': 0.75, | |
| 'critical': 0.85 | |
| } | |
| } | |
| obj_class = event.object_class.lower() | |
| confidence = event.max_confidence | |
| if obj_class in threat_map: | |
| thresholds = threat_map[obj_class] | |
| if confidence >= thresholds['critical']: | |
| return "critical" | |
| elif confidence >= thresholds['high']: | |
| return "high" | |
| elif confidence >= thresholds['medium']: | |
| return "medium" | |
| else: | |
| return "low" | |
| return "medium" # Default for unknown object types | |
| class EventDeduplicationEngine: | |
| """Remove duplicate events and create canonical representations""" | |
| def __init__(self, config): | |
| self.config = config | |
| self.similarity_calculator = SimilarityCalculator(config.similarity_threshold) | |
| def deduplicate_events(self, events: List[Event]) -> Tuple[List[CanonicalEvent], Dict[str, Any]]: | |
| """ | |
| Deduplicate events and create canonical representations | |
| Returns: | |
| Tuple of (canonical_events, deduplication_stats) | |
| """ | |
| logger.info(f"Deduplicating {len(events)} events") | |
| if not events: | |
| return [], {} | |
| # Group events by type first | |
| events_by_type = defaultdict(list) | |
| for event in events: | |
| events_by_type[event.event_type].append(event) | |
| canonical_events = [] | |
| dedup_stats = { | |
| 'original_events': len(events), | |
| 'canonical_events': 0, | |
| 'duplicates_removed': 0, | |
| 'similarity_clusters': 0 | |
| } | |
| canonical_id_counter = 1 | |
| # Process each event type separately | |
| for event_type, type_events in events_by_type.items(): | |
| type_canonical = self._deduplicate_events_by_type( | |
| type_events, event_type, canonical_id_counter | |
| ) | |
| canonical_events.extend(type_canonical) | |
| canonical_id_counter += len(type_canonical) | |
| # Update stats | |
| dedup_stats['canonical_events'] = len(canonical_events) | |
| dedup_stats['duplicates_removed'] = dedup_stats['original_events'] - dedup_stats['canonical_events'] | |
| dedup_stats['similarity_clusters'] = len(canonical_events) | |
| logger.info(f"Deduplication complete: {len(canonical_events)} canonical events created") | |
| return canonical_events, dedup_stats | |
| def _deduplicate_events_by_type(self, events: List[Event], event_type: str, | |
| start_canonical_id: int) -> List[CanonicalEvent]: | |
| """Deduplicate events of the same type""" | |
| if not events: | |
| return [] | |
| # Create similarity matrix | |
| similarity_matrix = self._create_similarity_matrix(events) | |
| # Cluster similar events | |
| clusters = self._cluster_similar_events(events, similarity_matrix) | |
| # Create canonical events from clusters | |
| canonical_events = [] | |
| for i, cluster in enumerate(clusters): | |
| canonical_event = self._create_canonical_event( | |
| cluster, event_type, start_canonical_id + i, i | |
| ) | |
| canonical_events.append(canonical_event) | |
| return canonical_events | |
| def _create_similarity_matrix(self, events: List[Event]) -> np.ndarray: | |
| """Create similarity matrix between events""" | |
| n = len(events) | |
| similarity_matrix = np.zeros((n, n)) | |
| for i in range(n): | |
| for j in range(i, n): | |
| if i == j: | |
| similarity_matrix[i, j] = 1.0 | |
| else: | |
| # Calculate similarity between representative frames | |
| sim_score = self._calculate_event_similarity(events[i], events[j]) | |
| similarity_matrix[i, j] = sim_score | |
| similarity_matrix[j, i] = sim_score | |
| return similarity_matrix | |
| def _calculate_event_similarity(self, event1: Event, event2: Event) -> float: | |
| """Calculate similarity between two events (enhanced for object events)""" | |
| try: | |
| # Object events similarity | |
| if event1.is_object_event and event2.is_object_event: | |
| return self._calculate_object_event_similarity(event1, event2) | |
| elif event1.is_object_event != event2.is_object_event: | |
| # Different event types (object vs motion) - lower similarity | |
| return 0.1 | |
| # Motion events similarity (original logic) | |
| # Time overlap similarity | |
| time_overlap = self._calculate_time_overlap(event1, event2) | |
| # Frame content similarity (use representative frames) | |
| frame1 = event1.keyframes[0] if event1.keyframes else None | |
| frame2 = event2.keyframes[0] if event2.keyframes else None | |
| content_similarity = 0.0 | |
| if frame1 and frame2 and os.path.exists(frame1) and os.path.exists(frame2): | |
| content_similarity = self.similarity_calculator.calculate_combined_similarity(frame1, frame2) | |
| # Motion intensity similarity | |
| motion_sim = 1.0 - abs(event1.motion_intensity - event2.motion_intensity) | |
| # Combined similarity | |
| combined_similarity = ( | |
| time_overlap * 0.3 + | |
| content_similarity * 0.5 + | |
| motion_sim * 0.2 | |
| ) | |
| return combined_similarity | |
| except Exception as e: | |
| logger.error(f"Event similarity calculation failed: {e}") | |
| return 0.0 | |
| def _calculate_object_event_similarity(self, event1: Event, event2: Event) -> float: | |
| """Calculate similarity between two object events""" | |
| try: | |
| # Object class similarity (must be same class) | |
| if event1.object_class != event2.object_class: | |
| return 0.0 # Different object types are not similar | |
| # Time proximity | |
| time_gap = abs(event1.start_timestamp - event2.start_timestamp) | |
| time_similarity = max(0.0, 1.0 - (time_gap / self.config.object_event_temporal_window)) | |
| # Confidence similarity | |
| conf_diff = abs(event1.confidence - event2.confidence) | |
| conf_similarity = max(0.0, 1.0 - conf_diff) | |
| # Detection count similarity | |
| count_diff = abs(event1.detection_count - event2.detection_count) | |
| count_similarity = max(0.0, 1.0 - (count_diff / max(event1.detection_count, event2.detection_count, 1))) | |
| # Frame content similarity | |
| frame1 = event1.keyframes[0] if event1.keyframes else None | |
| frame2 = event2.keyframes[0] if event2.keyframes else None | |
| content_similarity = 0.0 | |
| if frame1 and frame2 and os.path.exists(frame1) and os.path.exists(frame2): | |
| content_similarity = self.similarity_calculator.calculate_combined_similarity(frame1, frame2) | |
| # Combined similarity for object events | |
| combined_similarity = ( | |
| time_similarity * 0.4 + # Time proximity is important | |
| content_similarity * 0.3 + # Visual similarity | |
| conf_similarity * 0.2 + # Confidence similarity | |
| count_similarity * 0.1 # Detection count similarity | |
| ) | |
| return combined_similarity | |
| except Exception as e: | |
| logger.error(f"Object event similarity calculation failed: {e}") | |
| return 0.0 | |
| def _calculate_time_overlap(self, event1: Event, event2: Event) -> float: | |
| """Calculate temporal overlap between events""" | |
| start1, end1 = event1.start_timestamp, event1.end_timestamp | |
| start2, end2 = event2.start_timestamp, event2.end_timestamp | |
| # Calculate overlap | |
| overlap_start = max(start1, start2) | |
| overlap_end = min(end1, end2) | |
| if overlap_start >= overlap_end: | |
| return 0.0 | |
| overlap_duration = overlap_end - overlap_start | |
| total_duration = max(end1, end2) - min(start1, start2) | |
| return overlap_duration / total_duration if total_duration > 0 else 0.0 | |
| def _cluster_similar_events(self, events: List[Event], similarity_matrix: np.ndarray) -> List[List[Event]]: | |
| """Cluster similar events using similarity threshold""" | |
| n = len(events) | |
| visited = [False] * n | |
| clusters = [] | |
| for i in range(n): | |
| if visited[i]: | |
| continue | |
| # Start new cluster | |
| cluster = [events[i]] | |
| visited[i] = True | |
| # Find similar events | |
| for j in range(i + 1, n): | |
| if not visited[j] and similarity_matrix[i, j] >= self.config.similarity_threshold: | |
| cluster.append(events[j]) | |
| visited[j] = True | |
| clusters.append(cluster) | |
| return clusters | |
| def _create_canonical_event(self, cluster: List[Event], event_type: str, | |
| canonical_id: int, cluster_id: int) -> CanonicalEvent: | |
| """Create canonical event from cluster of similar events""" | |
| if not cluster: | |
| raise ValueError("Cannot create canonical event from empty cluster") | |
| # Find representative event (highest importance score) | |
| representative = max(cluster, key=lambda e: e.importance_score) | |
| # Aggregate properties | |
| start_time = min(e.start_timestamp for e in cluster) | |
| end_time = max(e.end_timestamp for e in cluster) | |
| duration = end_time - start_time | |
| avg_confidence = sum(e.confidence for e in cluster) / len(cluster) | |
| # Collect all keyframes | |
| all_keyframes = [] | |
| for event in cluster: | |
| all_keyframes.extend(event.keyframes) | |
| # Remove duplicate frame paths | |
| unique_keyframes = list(set(all_keyframes)) | |
| # Check if this cluster contains object events | |
| object_events = [e for e in cluster if e.is_object_event] | |
| contains_objects = len(object_events) > 0 | |
| # Object detection summary | |
| detected_classes = [] | |
| object_summary = None | |
| threat_level = "low" | |
| if contains_objects: | |
| # Collect detected object classes | |
| detected_classes = list(set(e.object_class for e in object_events if e.object_class)) | |
| # Calculate object detection summary | |
| total_detections = sum(e.detection_count for e in object_events) | |
| max_confidence = max(e.max_confidence for e in object_events) | |
| avg_obj_confidence = sum(e.confidence for e in object_events) / len(object_events) | |
| object_summary = { | |
| 'total_detections': total_detections, | |
| 'max_confidence': max_confidence, | |
| 'average_confidence': avg_obj_confidence, | |
| 'detected_classes': detected_classes, | |
| 'object_events_count': len(object_events) | |
| } | |
| # Assess threat level based on object classes and confidence | |
| threat_level = self._assess_canonical_threat_level(object_events) | |
| # Create enhanced description | |
| if contains_objects: | |
| objects_str = ", ".join(detected_classes) | |
| description = f"{event_type.replace('_', ' ').title()} with {objects_str} detected - {len(cluster)} events aggregated" | |
| else: | |
| description = f"{event_type.replace('_', ' ').title()} event aggregated from {len(cluster)} similar events" | |
| canonical_event = CanonicalEvent( | |
| canonical_id=f"canonical_{canonical_id:04d}", | |
| event_type=event_type, | |
| representative_frame=representative.keyframes[0] if representative.keyframes else "", | |
| start_time=start_time, | |
| end_time=end_time, | |
| duration=duration, | |
| confidence=avg_confidence, | |
| frame_count=len(unique_keyframes), | |
| aggregated_events=[e.event_id for e in cluster], | |
| description=description, | |
| similarity_cluster=cluster_id, | |
| # Enhanced object detection fields | |
| contains_objects=contains_objects, | |
| detected_object_classes=detected_classes, | |
| object_detection_summary=object_summary, | |
| threat_level=threat_level | |
| ) | |
| return canonical_event | |
| def _assess_canonical_threat_level(self, object_events: List[Event]) -> str: | |
| """Assess threat level for canonical event containing object events""" | |
| if not object_events: | |
| return "low" | |
| # Get highest threat level from individual events | |
| threat_levels = ["low", "medium", "high", "critical"] | |
| max_threat_index = 0 | |
| for event in object_events: | |
| event_threat = self._assess_individual_threat_level(event) | |
| threat_index = threat_levels.index(event_threat) if event_threat in threat_levels else 0 | |
| max_threat_index = max(max_threat_index, threat_index) | |
| # Additional factors for canonical events | |
| max_confidence = max(e.max_confidence for e in object_events) | |
| total_detections = sum(e.detection_count for e in object_events) | |
| unique_classes = len(set(e.object_class for e in object_events)) | |
| # Escalate threat if multiple factors present | |
| if unique_classes > 1: # Multiple types of objects detected | |
| max_threat_index = min(max_threat_index + 1, len(threat_levels) - 1) | |
| if total_detections > 10: # Many detections | |
| max_threat_index = min(max_threat_index + 1, len(threat_levels) - 1) | |
| if max_confidence > 0.9: # Very high confidence | |
| max_threat_index = min(max_threat_index + 1, len(threat_levels) - 1) | |
| return threat_levels[max_threat_index] | |
| def _assess_individual_threat_level(self, event: Event) -> str: | |
| """Assess threat level for individual event (duplicate of EventDetector method)""" | |
| if not event.is_object_event: | |
| # For motion events, use motion intensity and burst activity | |
| if event.event_type == "high_motion" and event.motion_intensity > 0.015: | |
| return "medium" | |
| elif event.event_type == "burst_activity": | |
| return "medium" | |
| else: | |
| return "low" | |
| # Object-based threat assessment | |
| threat_map = { | |
| 'fire': { | |
| 'low': 0.3, # Confidence thresholds | |
| 'medium': 0.5, | |
| 'high': 0.7, | |
| 'critical': 0.85 | |
| }, | |
| 'gun': { | |
| 'low': 0.4, | |
| 'medium': 0.6, | |
| 'high': 0.8, | |
| 'critical': 0.9 | |
| }, | |
| 'knife': { | |
| 'low': 0.4, | |
| 'medium': 0.6, | |
| 'high': 0.75, | |
| 'critical': 0.85 | |
| } | |
| } | |
| obj_class = event.object_class.lower() | |
| confidence = event.max_confidence | |
| if obj_class in threat_map: | |
| thresholds = threat_map[obj_class] | |
| if confidence >= thresholds['critical']: | |
| return "critical" | |
| elif confidence >= thresholds['high']: | |
| return "high" | |
| elif confidence >= thresholds['medium']: | |
| return "medium" | |
| else: | |
| return "low" | |
| return "medium" # Default for unknown object types | |
| def save_canonical_events(self, canonical_events: List[CanonicalEvent], | |
| output_path: str) -> bool: | |
| """Save canonical events to JSON file""" | |
| try: | |
| # Convert to serializable format | |
| events_data = { | |
| 'metadata': { | |
| 'total_canonical_events': len(canonical_events), | |
| 'generation_timestamp': datetime.now().isoformat(), | |
| 'deduplication_threshold': self.config.similarity_threshold | |
| }, | |
| 'canonical_events': [asdict(event) for event in canonical_events] | |
| } | |
| with open(output_path, 'w') as f: | |
| json.dump(events_data, f, indent=2) | |
| logger.info(f"Canonical events saved to: {output_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save canonical events: {e}") | |
| return False |