Spaces:
Sleeping
Sleeping
| """ | |
| Video Content Analyzer for GAIA Agent - Phase 5 | |
| Provides comprehensive video content analysis including scene segmentation, temporal patterns, and content summarization. | |
| Features: | |
| - Scene segmentation and analysis | |
| - Temporal pattern recognition | |
| - Object interaction analysis | |
| - Content summarization and reporting | |
| - Key frame identification and extraction | |
| - Video metadata analysis | |
| """ | |
| import os | |
| import logging | |
| import cv2 | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Tuple | |
| import json | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| import tempfile | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| class VideoContentAnalyzer: | |
| """Advanced video content analyzer for scene understanding and temporal analysis.""" | |
| def __init__(self): | |
| """Initialize the video content analyzer.""" | |
| self.available = True | |
| self.temp_dir = tempfile.mkdtemp() | |
| # Analysis parameters | |
| self.scene_change_threshold = 0.3 | |
| self.keyframe_interval = 30 # Extract keyframe every 30 frames | |
| self.min_scene_duration = 2.0 # Minimum scene duration in seconds | |
| self.max_scenes = 50 # Maximum number of scenes to analyze | |
| # Initialize analysis components | |
| self._init_scene_analyzer() | |
| self._init_temporal_analyzer() | |
| logger.info(f"πΉ Video Content Analyzer initialized - Available: {self.available}") | |
| def _init_scene_analyzer(self): | |
| """Initialize scene analysis components.""" | |
| try: | |
| # Scene change detection parameters | |
| self.scene_detector_params = { | |
| 'histogram_bins': 32, | |
| 'color_spaces': ['HSV', 'RGB'], | |
| 'comparison_methods': [cv2.HISTCMP_CORREL, cv2.HISTCMP_CHISQR], | |
| 'motion_threshold': 0.1 | |
| } | |
| logger.info("β Scene analyzer initialized") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Scene analyzer initialization failed: {e}") | |
| def _init_temporal_analyzer(self): | |
| """Initialize temporal analysis components.""" | |
| try: | |
| # Temporal pattern analysis parameters | |
| self.temporal_params = { | |
| 'pattern_window': 10, # Analyze patterns over 10 frame windows | |
| 'smoothing_factor': 0.3, | |
| 'trend_threshold': 0.1, | |
| 'periodicity_detection': True | |
| } | |
| logger.info("β Temporal analyzer initialized") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Temporal analyzer initialization failed: {e}") | |
| def analyze_video_content(self, video_path: str, | |
| object_detections: List[List[Dict[str, Any]]] = None, | |
| question: str = None) -> Dict[str, Any]: | |
| """ | |
| Perform comprehensive video content analysis. | |
| Args: | |
| video_path: Path to video file | |
| object_detections: Optional pre-computed object detections per frame | |
| question: Optional question to guide analysis | |
| Returns: | |
| Comprehensive content analysis results | |
| """ | |
| try: | |
| logger.info(f"πΉ Starting video content analysis for: {video_path}") | |
| # Extract video metadata | |
| metadata = self._extract_video_metadata(video_path) | |
| # Perform scene segmentation | |
| scenes = self._segment_scenes(video_path) | |
| # Extract key frames | |
| keyframes = self._extract_keyframes(video_path, scenes) | |
| # Analyze temporal patterns | |
| temporal_analysis = self._analyze_temporal_patterns( | |
| video_path, object_detections, scenes | |
| ) | |
| # Perform content summarization | |
| content_summary = self._summarize_content( | |
| scenes, keyframes, temporal_analysis, object_detections | |
| ) | |
| # Generate interaction analysis | |
| interaction_analysis = self._analyze_object_interactions( | |
| object_detections, scenes | |
| ) | |
| # Create comprehensive report | |
| analysis_report = self._create_content_report( | |
| metadata, scenes, keyframes, temporal_analysis, | |
| content_summary, interaction_analysis, question | |
| ) | |
| return analysis_report | |
| except Exception as e: | |
| logger.error(f"β Video content analysis failed: {e}") | |
| return { | |
| 'success': False, | |
| 'error': f'Content analysis failed: {str(e)}' | |
| } | |
| def _extract_video_metadata(self, video_path: str) -> Dict[str, Any]: | |
| """Extract comprehensive video metadata.""" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise Exception("Failed to open video file") | |
| # Basic properties | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| duration = frame_count / fps if fps > 0 else 0 | |
| # Additional properties | |
| fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) | |
| codec = "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) | |
| cap.release() | |
| metadata = { | |
| 'filename': os.path.basename(video_path), | |
| 'duration_seconds': duration, | |
| 'fps': fps, | |
| 'frame_count': frame_count, | |
| 'resolution': {'width': width, 'height': height}, | |
| 'aspect_ratio': width / height if height > 0 else 1.0, | |
| 'codec': codec, | |
| 'file_size': os.path.getsize(video_path) if os.path.exists(video_path) else 0, | |
| 'analysis_timestamp': datetime.now().isoformat() | |
| } | |
| logger.info(f"π Video metadata extracted: {duration:.1f}s, {width}x{height}, {fps:.1f} FPS") | |
| return metadata | |
| except Exception as e: | |
| logger.error(f"β Failed to extract video metadata: {e}") | |
| return {} | |
| def _segment_scenes(self, video_path: str) -> List[Dict[str, Any]]: | |
| """Segment video into distinct scenes based on visual changes.""" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise Exception("Failed to open video file") | |
| scenes = [] | |
| prev_hist = None | |
| scene_start = 0 | |
| frame_count = 0 | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| scene_id = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Calculate histogram for scene change detection | |
| hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) | |
| hist = cv2.calcHist([hsv], [0, 1, 2], None, | |
| [self.scene_detector_params['histogram_bins']] * 3, | |
| [0, 180, 0, 256, 0, 256]) | |
| # Detect scene change | |
| if prev_hist is not None: | |
| correlation = cv2.compareHist(hist, prev_hist, cv2.HISTCMP_CORREL) | |
| if correlation < self.scene_change_threshold: | |
| # Scene change detected | |
| scene_end = frame_count | |
| scene_duration = (scene_end - scene_start) / fps | |
| if scene_duration >= self.min_scene_duration: | |
| scene = { | |
| 'id': scene_id, | |
| 'start_frame': scene_start, | |
| 'end_frame': scene_end, | |
| 'start_time': scene_start / fps, | |
| 'end_time': scene_end / fps, | |
| 'duration': scene_duration, | |
| 'frame_count': scene_end - scene_start | |
| } | |
| scenes.append(scene) | |
| scene_id += 1 | |
| if len(scenes) >= self.max_scenes: | |
| break | |
| scene_start = frame_count | |
| prev_hist = hist | |
| frame_count += 1 | |
| # Add final scene | |
| if scene_start < frame_count: | |
| scene_duration = (frame_count - scene_start) / fps | |
| if scene_duration >= self.min_scene_duration: | |
| scene = { | |
| 'id': scene_id, | |
| 'start_frame': scene_start, | |
| 'end_frame': frame_count, | |
| 'start_time': scene_start / fps, | |
| 'end_time': frame_count / fps, | |
| 'duration': scene_duration, | |
| 'frame_count': frame_count - scene_start | |
| } | |
| scenes.append(scene) | |
| cap.release() | |
| logger.info(f"π¬ Scene segmentation complete: {len(scenes)} scenes detected") | |
| return scenes | |
| except Exception as e: | |
| logger.error(f"β Scene segmentation failed: {e}") | |
| return [] | |
| def _extract_keyframes(self, video_path: str, scenes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Extract representative keyframes from video scenes.""" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise Exception("Failed to open video file") | |
| keyframes = [] | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| for scene in scenes: | |
| # Extract keyframes from each scene | |
| scene_keyframes = [] | |
| # Extract keyframe from middle of scene | |
| mid_frame = (scene['start_frame'] + scene['end_frame']) // 2 | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, mid_frame) | |
| ret, frame = cap.read() | |
| if ret: | |
| keyframe = { | |
| 'scene_id': scene['id'], | |
| 'frame_number': mid_frame, | |
| 'timestamp': mid_frame / fps, | |
| 'type': 'scene_representative', | |
| 'frame_data': frame, | |
| 'visual_features': self._extract_visual_features(frame) | |
| } | |
| scene_keyframes.append(keyframe) | |
| # Extract additional keyframes for longer scenes | |
| if scene['duration'] > 10: # For scenes longer than 10 seconds | |
| # Extract keyframes at 1/4 and 3/4 points | |
| for fraction in [0.25, 0.75]: | |
| frame_pos = int(scene['start_frame'] + | |
| fraction * (scene['end_frame'] - scene['start_frame'])) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_pos) | |
| ret, frame = cap.read() | |
| if ret: | |
| keyframe = { | |
| 'scene_id': scene['id'], | |
| 'frame_number': frame_pos, | |
| 'timestamp': frame_pos / fps, | |
| 'type': 'temporal_sample', | |
| 'frame_data': frame, | |
| 'visual_features': self._extract_visual_features(frame) | |
| } | |
| scene_keyframes.append(keyframe) | |
| keyframes.extend(scene_keyframes) | |
| cap.release() | |
| logger.info(f"πΌοΈ Keyframe extraction complete: {len(keyframes)} keyframes extracted") | |
| return keyframes | |
| except Exception as e: | |
| logger.error(f"β Keyframe extraction failed: {e}") | |
| return [] | |
| def _extract_visual_features(self, frame: np.ndarray) -> Dict[str, Any]: | |
| """Extract visual features from a frame.""" | |
| try: | |
| features = {} | |
| # Color histogram | |
| hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) | |
| hist_h = cv2.calcHist([hsv], [0], None, [32], [0, 180]) | |
| hist_s = cv2.calcHist([hsv], [1], None, [32], [0, 256]) | |
| hist_v = cv2.calcHist([hsv], [2], None, [32], [0, 256]) | |
| features['color_histogram'] = { | |
| 'hue': hist_h.flatten().tolist(), | |
| 'saturation': hist_s.flatten().tolist(), | |
| 'value': hist_v.flatten().tolist() | |
| } | |
| # Edge density | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| edge_density = np.sum(edges > 0) / (edges.shape[0] * edges.shape[1]) | |
| features['edge_density'] = float(edge_density) | |
| # Brightness and contrast | |
| features['brightness'] = float(np.mean(gray)) | |
| features['contrast'] = float(np.std(gray)) | |
| # Dominant colors | |
| features['dominant_colors'] = self._get_dominant_colors(frame) | |
| return features | |
| except Exception as e: | |
| logger.error(f"β Visual feature extraction failed: {e}") | |
| return {} | |
| def _get_dominant_colors(self, frame: np.ndarray, k: int = 3) -> List[List[int]]: | |
| """Extract dominant colors from frame using k-means clustering.""" | |
| try: | |
| # Reshape frame to list of pixels | |
| pixels = frame.reshape(-1, 3) | |
| # Use k-means to find dominant colors | |
| from sklearn.cluster import KMeans | |
| kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) | |
| kmeans.fit(pixels) | |
| # Get dominant colors | |
| colors = kmeans.cluster_centers_.astype(int) | |
| return colors.tolist() | |
| except ImportError: | |
| # Fallback without sklearn | |
| return [[128, 128, 128]] # Gray as default | |
| except Exception as e: | |
| logger.error(f"β Dominant color extraction failed: {e}") | |
| return [[128, 128, 128]] | |
| def _analyze_temporal_patterns(self, video_path: str, | |
| object_detections: List[List[Dict[str, Any]]] = None, | |
| scenes: List[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """Analyze temporal patterns in video content.""" | |
| try: | |
| temporal_analysis = { | |
| 'motion_patterns': [], | |
| 'object_appearance_patterns': [], | |
| 'scene_transition_patterns': [], | |
| 'activity_levels': [], | |
| 'periodicity': {} | |
| } | |
| if not object_detections: | |
| return temporal_analysis | |
| # Analyze motion patterns | |
| motion_levels = [] | |
| for frame_detections in object_detections: | |
| # Calculate motion level based on number and size of objects | |
| motion_level = len(frame_detections) | |
| if frame_detections: | |
| avg_area = np.mean([det.get('area', 0) for det in frame_detections]) | |
| motion_level += avg_area / 10000 # Normalize area contribution | |
| motion_levels.append(motion_level) | |
| temporal_analysis['motion_patterns'] = motion_levels | |
| # Analyze object appearance patterns | |
| object_counts_over_time = [] | |
| bird_counts_over_time = [] | |
| animal_counts_over_time = [] | |
| for frame_detections in object_detections: | |
| object_count = len(frame_detections) | |
| bird_count = sum(1 for det in frame_detections | |
| if det.get('species_type') == 'bird') | |
| animal_count = sum(1 for det in frame_detections | |
| if det.get('species_type') == 'animal') | |
| object_counts_over_time.append(object_count) | |
| bird_counts_over_time.append(bird_count) | |
| animal_counts_over_time.append(animal_count) | |
| temporal_analysis['object_appearance_patterns'] = { | |
| 'total_objects': object_counts_over_time, | |
| 'birds': bird_counts_over_time, | |
| 'animals': animal_counts_over_time | |
| } | |
| # Analyze activity levels | |
| window_size = self.temporal_params['pattern_window'] | |
| activity_levels = [] | |
| for i in range(0, len(motion_levels), window_size): | |
| window = motion_levels[i:i+window_size] | |
| if window: | |
| activity_level = { | |
| 'start_frame': i, | |
| 'end_frame': min(i + window_size, len(motion_levels)), | |
| 'avg_motion': np.mean(window), | |
| 'max_motion': np.max(window), | |
| 'motion_variance': np.var(window) | |
| } | |
| activity_levels.append(activity_level) | |
| temporal_analysis['activity_levels'] = activity_levels | |
| # Detect periodicity in object appearances | |
| if len(bird_counts_over_time) > 20: # Need sufficient data | |
| temporal_analysis['periodicity'] = self._detect_periodicity( | |
| bird_counts_over_time, animal_counts_over_time | |
| ) | |
| logger.info("π Temporal pattern analysis complete") | |
| return temporal_analysis | |
| except Exception as e: | |
| logger.error(f"β Temporal pattern analysis failed: {e}") | |
| return {} | |
| def _detect_periodicity(self, bird_counts: List[int], | |
| animal_counts: List[int]) -> Dict[str, Any]: | |
| """Detect periodic patterns in object appearances.""" | |
| try: | |
| periodicity = { | |
| 'bird_patterns': {}, | |
| 'animal_patterns': {}, | |
| 'combined_patterns': {} | |
| } | |
| # Simple autocorrelation-based periodicity detection | |
| def autocorrelation(signal, max_lag=50): | |
| signal = np.array(signal) | |
| n = len(signal) | |
| signal = signal - np.mean(signal) | |
| autocorr = [] | |
| for lag in range(min(max_lag, n//2)): | |
| if n - lag > 0: | |
| corr = np.corrcoef(signal[:-lag], signal[lag:])[0, 1] | |
| autocorr.append(corr if not np.isnan(corr) else 0) | |
| else: | |
| autocorr.append(0) | |
| return autocorr | |
| # Analyze bird count periodicity | |
| bird_autocorr = autocorrelation(bird_counts) | |
| if bird_autocorr: | |
| max_corr_idx = np.argmax(bird_autocorr[1:]) + 1 # Skip lag 0 | |
| periodicity['bird_patterns'] = { | |
| 'dominant_period': max_corr_idx, | |
| 'correlation_strength': bird_autocorr[max_corr_idx], | |
| 'is_periodic': bird_autocorr[max_corr_idx] > 0.3 | |
| } | |
| # Analyze animal count periodicity | |
| animal_autocorr = autocorrelation(animal_counts) | |
| if animal_autocorr: | |
| max_corr_idx = np.argmax(animal_autocorr[1:]) + 1 | |
| periodicity['animal_patterns'] = { | |
| 'dominant_period': max_corr_idx, | |
| 'correlation_strength': animal_autocorr[max_corr_idx], | |
| 'is_periodic': animal_autocorr[max_corr_idx] > 0.3 | |
| } | |
| return periodicity | |
| except Exception as e: | |
| logger.error(f"β Periodicity detection failed: {e}") | |
| return {} | |
| def _summarize_content(self, scenes: List[Dict[str, Any]], | |
| keyframes: List[Dict[str, Any]], | |
| temporal_analysis: Dict[str, Any], | |
| object_detections: List[List[Dict[str, Any]]] = None) -> Dict[str, Any]: | |
| """Generate comprehensive content summary.""" | |
| try: | |
| summary = { | |
| 'overview': {}, | |
| 'scene_summary': [], | |
| 'key_moments': [], | |
| 'content_highlights': [], | |
| 'statistical_summary': {} | |
| } | |
| # Overview | |
| total_duration = sum(scene.get('duration', 0) for scene in scenes) | |
| summary['overview'] = { | |
| 'total_scenes': len(scenes), | |
| 'total_duration': total_duration, | |
| 'avg_scene_duration': total_duration / len(scenes) if scenes else 0, | |
| 'keyframes_extracted': len(keyframes) | |
| } | |
| # Scene summary | |
| for scene in scenes: | |
| scene_summary = { | |
| 'scene_id': scene['id'], | |
| 'duration': scene['duration'], | |
| 'description': f"Scene {scene['id'] + 1}: {scene['duration']:.1f}s", | |
| 'activity_level': 'unknown' | |
| } | |
| # Determine activity level from temporal analysis | |
| if temporal_analysis.get('activity_levels'): | |
| scene_start_frame = scene['start_frame'] | |
| scene_end_frame = scene['end_frame'] | |
| relevant_activities = [ | |
| activity for activity in temporal_analysis['activity_levels'] | |
| if (activity['start_frame'] <= scene_end_frame and | |
| activity['end_frame'] >= scene_start_frame) | |
| ] | |
| if relevant_activities: | |
| avg_motion = np.mean([a['avg_motion'] for a in relevant_activities]) | |
| if avg_motion > 2: | |
| scene_summary['activity_level'] = 'high' | |
| elif avg_motion > 1: | |
| scene_summary['activity_level'] = 'medium' | |
| else: | |
| scene_summary['activity_level'] = 'low' | |
| summary['scene_summary'].append(scene_summary) | |
| # Key moments (high activity periods) | |
| if temporal_analysis.get('activity_levels'): | |
| high_activity_moments = [ | |
| activity for activity in temporal_analysis['activity_levels'] | |
| if activity['avg_motion'] > 2 | |
| ] | |
| summary['key_moments'] = [ | |
| { | |
| 'timestamp': moment['start_frame'] / 30, # Assume 30 FPS | |
| 'duration': (moment['end_frame'] - moment['start_frame']) / 30, | |
| 'activity_level': moment['avg_motion'], | |
| 'description': f"High activity period: {moment['avg_motion']:.1f}" | |
| } | |
| for moment in high_activity_moments[:5] # Top 5 moments | |
| ] | |
| # Statistical summary | |
| if object_detections: | |
| all_detections = [det for frame_dets in object_detections for det in frame_dets] | |
| species_counts = {} | |
| for detection in all_detections: | |
| species = detection.get('species_type', 'unknown') | |
| species_counts[species] = species_counts.get(species, 0) + 1 | |
| summary['statistical_summary'] = { | |
| 'total_detections': len(all_detections), | |
| 'species_distribution': species_counts, | |
| 'avg_detections_per_frame': len(all_detections) / len(object_detections) if object_detections else 0 | |
| } | |
| logger.info("π Content summarization complete") | |
| return summary | |
| except Exception as e: | |
| logger.error(f"β Content summarization failed: {e}") | |
| return {} | |
| def _analyze_object_interactions(self, object_detections: List[List[Dict[str, Any]]] = None, | |
| scenes: List[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """Analyze interactions between detected objects.""" | |
| try: | |
| interaction_analysis = { | |
| 'proximity_interactions': [], | |
| 'temporal_interactions': [], | |
| 'species_interactions': {}, | |
| 'interaction_summary': {} | |
| } | |
| if not object_detections: | |
| return interaction_analysis | |
| # Analyze proximity interactions within frames | |
| for frame_idx, frame_detections in enumerate(object_detections): | |
| if len(frame_detections) > 1: | |
| # Check all pairs of objects in the frame | |
| for i, obj1 in enumerate(frame_detections): | |
| for j, obj2 in enumerate(frame_detections[i+1:], i+1): | |
| distance = self._calculate_object_distance(obj1, obj2) | |
| if distance < 100: # Close proximity threshold | |
| interaction = { | |
| 'frame': frame_idx, | |
| 'timestamp': frame_idx / 30, # Assume 30 FPS | |
| 'object1': obj1.get('class', 'unknown'), | |
| 'object2': obj2.get('class', 'unknown'), | |
| 'distance': distance, | |
| 'interaction_type': 'proximity' | |
| } | |
| interaction_analysis['proximity_interactions'].append(interaction) | |
| # Analyze species interactions | |
| species_pairs = {} | |
| for interaction in interaction_analysis['proximity_interactions']: | |
| obj1_type = interaction['object1'] | |
| obj2_type = interaction['object2'] | |
| pair_key = tuple(sorted([obj1_type, obj2_type])) | |
| if pair_key not in species_pairs: | |
| species_pairs[pair_key] = [] | |
| species_pairs[pair_key].append(interaction) | |
| interaction_analysis['species_interactions'] = { | |
| f"{pair[0]}-{pair[1]}": { | |
| 'interaction_count': len(interactions), | |
| 'avg_distance': np.mean([i['distance'] for i in interactions]), | |
| 'duration': len(interactions) / 30 # Approximate duration | |
| } | |
| for pair, interactions in species_pairs.items() | |
| } | |
| # Interaction summary | |
| interaction_analysis['interaction_summary'] = { | |
| 'total_proximity_interactions': len(interaction_analysis['proximity_interactions']), | |
| 'unique_species_pairs': len(species_pairs), | |
| 'most_interactive_pair': max(species_pairs.keys(), | |
| key=lambda x: len(species_pairs[x])) if species_pairs else None | |
| } | |
| logger.info("π€ Object interaction analysis complete") | |
| return interaction_analysis | |
| except Exception as e: | |
| logger.error(f"β Object interaction analysis failed: {e}") | |
| return {} | |
| def _calculate_object_distance(self, obj1: Dict[str, Any], obj2: Dict[str, Any]) -> float: | |
| """Calculate distance between two objects based on their centers.""" | |
| try: | |
| center1 = obj1.get('center', [0, 0]) | |
| center2 = obj2.get('center', [0, 0]) | |
| distance = np.sqrt((center1[0] - center2[0])**2 + (center1[1] - center2[1])**2) | |
| return float(distance) | |
| except Exception as e: | |
| logger.error(f"β Distance calculation failed: {e}") | |
| return float('inf') | |
| def _create_content_report(self, metadata: Dict[str, Any], | |
| scenes: List[Dict[str, Any]], | |
| keyframes: List[Dict[str, Any]], | |
| temporal_analysis: Dict[str, Any], | |
| content_summary: Dict[str, Any], | |
| interaction_analysis: Dict[str, Any], | |
| question: str = None) -> Dict[str, Any]: | |
| """Create comprehensive content analysis report.""" | |
| try: | |
| report = { | |
| 'success': True, | |
| 'analysis_timestamp': datetime.now().isoformat(), | |
| 'question': question, | |
| 'metadata': metadata, | |
| 'content_analysis': { | |
| 'scenes': scenes, | |
| 'keyframes': [ | |
| {k: v for k, v in kf.items() if k != 'frame_data'} # Exclude frame data | |
| for kf in keyframes | |
| ], | |
| 'temporal_patterns': temporal_analysis, | |
| 'content_summary': content_summary, | |
| 'interactions': interaction_analysis | |
| }, | |
| 'insights': [], | |
| 'recommendations': [] | |
| } | |
| # Generate insights | |
| insights = [] | |
| # Scene insights | |
| if scenes: | |
| avg_scene_duration = np.mean([s['duration'] for s in scenes]) | |
| insights.append(f"Video contains {len(scenes)} distinct scenes with average duration of {avg_scene_duration:.1f}s") | |
| # Activity insights | |
| if temporal_analysis.get('activity_levels'): | |
| high_activity_count = sum(1 for a in temporal_analysis['activity_levels'] if a['avg_motion'] > 2) | |
| insights.append(f"Detected {high_activity_count} high-activity periods in the video") | |
| # Interaction insights | |
| if interaction_analysis.get('interaction_summary', {}).get('total_proximity_interactions', 0) > 0: | |
| total_interactions = interaction_analysis['interaction_summary']['total_proximity_interactions'] | |
| insights.append(f"Found {total_interactions} object proximity interactions") | |
| report['insights'] = insights | |
| # Generate recommendations | |
| recommendations = [] | |
| if question and 'bird' in question.lower(): | |
| if temporal_analysis.get('object_appearance_patterns', {}).get('birds'): | |
| max_birds = max(temporal_analysis['object_appearance_patterns']['birds']) | |
| recommendations.append(f"Maximum simultaneous birds detected: {max_birds}") | |
| if len(scenes) > 10: | |
| recommendations.append("Video has many scene changes - consider analyzing key scenes only") | |
| report['recommendations'] = recommendations | |
| logger.info("π Content analysis report generated successfully") | |
| return report | |
| except Exception as e: | |
| logger.error(f"β Failed to create content report: {e}") | |
| return { | |
| 'success': False, | |
| 'error': f'Failed to create content report: {str(e)}' | |
| } | |
| def get_capabilities(self) -> Dict[str, Any]: | |
| """Get video content analyzer capabilities.""" | |
| return { | |
| 'available': self.available, | |
| 'scene_change_threshold': self.scene_change_threshold, | |
| 'keyframe_interval': self.keyframe_interval, | |
| 'min_scene_duration': self.min_scene_duration, | |
| 'max_scenes': self.max_scenes, | |
| 'features': [ | |
| 'Scene segmentation', | |
| 'Keyframe extraction', | |
| 'Temporal pattern analysis', | |
| 'Object interaction analysis', | |
| 'Content summarization', | |
| 'Visual feature extraction', | |
| 'Activity level detection', | |
| 'Periodicity detection' | |
| ] | |
| } | |
| # Factory function for creating content analyzer | |
| def create_video_content_analyzer() -> VideoContentAnalyzer: | |
| """Create and return a video content analyzer instance.""" | |
| return VideoContentAnalyzer() | |
| if __name__ == "__main__": | |
| # Test the content analyzer | |
| analyzer = VideoContentAnalyzer() | |
| print(f"Content analyzer available: {analyzer.available}") | |
| print(f"Capabilities: {json.dumps(analyzer.get_capabilities(), indent=2)}") |