""" Gesture Detection Service Refactored from Colab notebook for production use. Detects body gestures and movements using MediaPipe Pose. """ import cv2 import numpy as np import mediapipe as mp from typing import Dict, Any, List, Optional, Tuple from loguru import logger from scipy.signal import savgol_filter from collections import Counter class GestureConfig: """Configuration untuk gesture detection thresholds""" # Movement thresholds (dalam pixel) EXCESSIVE_MOVEMENT_THRESHOLD = 50 # pixel/frame MINIMAL_MOVEMENT_THRESHOLD = 5 # pixel/frame # Frequency thresholds (gestures per second) HIGH_FREQUENCY = 3.0 LOW_FREQUENCY = 0.5 # Stability thresholds JITTER_THRESHOLD = 15 # pixel variance # Hand position zones (relative to body) FRONT_ZONE_THRESHOLD = 0.15 # 15cm di depan bahu # Landmark indices SHOULDER_LEFT = 11 SHOULDER_RIGHT = 12 ELBOW_LEFT = 13 ELBOW_RIGHT = 14 WRIST_LEFT = 15 WRIST_RIGHT = 16 HIP_LEFT = 23 HIP_RIGHT = 24 NOSE = 0 class GestureDetectionService: """ Gesture Detection Service for SWARA API Analyzes hand movements, body stability, and gesture patterns using MediaPipe Pose landmarks. """ _instance = None _pose = None _use_gpu = False def __new__(cls): """Singleton pattern to avoid reloading MediaPipe multiple times""" if cls._instance is None: cls._instance = super().__new__(cls) # ============================================================ # FLEXIBLE GPU/CPU RUNTIME FOR MEDIAPIPE # ============================================================ # Check if GPU is available and configure MediaPipe accordingly import torch cls._use_gpu = torch.cuda.is_available() if cls._use_gpu: # GPU detected - Use GPU delegate for acceleration logger.info(f"✓ GPU detected for Gesture Detection: {torch.cuda.get_device_name(0)}") logger.info("✓ Configuring MediaPipe Pose with GPU delegate") try: cls._pose = mp.solutions.pose.Pose( static_image_mode=False, model_complexity=1, smooth_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) logger.info("✓ MediaPipe Pose initialized with GPU acceleration") except Exception as e: logger.warning(f"⚠ GPU delegate failed, falling back to CPU: {e}") cls._pose = mp.solutions.pose.Pose( static_image_mode=False, model_complexity=1, smooth_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) else: # CPU - Standard configuration logger.info("ℹ No GPU detected, using CPU for Gesture Detection") cls._pose = mp.solutions.pose.Pose( static_image_mode=False, model_complexity=1, smooth_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) logger.info("GestureDetectionService initialized with MediaPipe Pose") return cls._instance def __init__(self): """Initialize service (called after __new__)""" self.config = GestureConfig() def analyze_video( self, video_path: str, progress_callback: Optional[callable] = None ) -> Dict[str, Any]: """ Analyze gestures in a video file Args: video_path: Path to video file progress_callback: Optional callback function(current, total, message) Returns: Dictionary containing gesture analysis results """ # Create fresh MediaPipe Pose instance to avoid timeout issues # Use same GPU/CPU configuration as singleton import torch use_gpu = torch.cuda.is_available() if use_gpu: logger.debug(f"Creating fresh Pose instance with GPU support for video: {video_path}") try: pose = mp.solutions.pose.Pose( static_image_mode=False, model_complexity=1, smooth_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) except Exception as e: logger.warning(f"⚠ Fresh GPU instance failed, using CPU: {e}") pose = mp.solutions.pose.Pose( static_image_mode=False, model_complexity=1, smooth_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) else: pose = mp.solutions.pose.Pose( static_image_mode=False, model_complexity=1, smooth_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) try: logger.info(f"Starting gesture analysis for: {video_path}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Cannot open video file: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) logger.info(f"Video Info: {width}x{height} @ {fps}FPS, Total frames: {total_frames}") # Data storage frame_data = [] frame_count = 0 prev_landmarks = None while True: ret, frame = cap.read() if not ret: break frame_count += 1 # Progress callback if progress_callback and frame_count % 30 == 0: progress = int((frame_count / total_frames) * 100) progress_callback(frame_count, total_frames, f"Processing gestures: {progress}%") # Convert to RGB for MediaPipe rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # MediaPipe processing with error handling try: results = pose.process(rgb_frame) # Use local pose instance except Exception as pose_error: logger.warning(f"MediaPipe processing failed at frame {frame_count}: {pose_error}") results = None # Initialize frame metrics frame_metrics = { 'frame_number': frame_count, 'timestamp_start': (frame_count - 1) / fps, 'timestamp_end': frame_count / fps, 'pose_detected': False, 'left_hand_movement': 0.0, 'right_hand_movement': 0.0, 'body_movement': 0.0, 'left_hand_position': 'unknown', 'right_hand_position': 'unknown' } if results and results.pose_landmarks: frame_metrics['pose_detected'] = True landmarks = results.pose_landmarks.landmark # Get key landmarks l_wrist = self._get_landmark_coords(landmarks, self.config.WRIST_LEFT, width, height) r_wrist = self._get_landmark_coords(landmarks, self.config.WRIST_RIGHT, width, height) l_shoulder = self._get_landmark_coords(landmarks, self.config.SHOULDER_LEFT, width, height) r_shoulder = self._get_landmark_coords(landmarks, self.config.SHOULDER_RIGHT, width, height) # Calculate movements if previous frame exists if prev_landmarks is not None: if l_wrist and prev_landmarks.get('l_wrist'): frame_metrics['left_hand_movement'] = self._calculate_movement_speed( prev_landmarks['l_wrist'], l_wrist ) if r_wrist and prev_landmarks.get('r_wrist'): frame_metrics['right_hand_movement'] = self._calculate_movement_speed( prev_landmarks['r_wrist'], r_wrist ) # Body movement (center of shoulders) if l_shoulder and r_shoulder and prev_landmarks.get('shoulder_center'): shoulder_center = ( (l_shoulder[0] + r_shoulder[0]) / 2, (l_shoulder[1] + r_shoulder[1]) / 2 ) frame_metrics['body_movement'] = self._calculate_movement_speed( prev_landmarks['shoulder_center'], shoulder_center ) # Determine hand positions (front/side/back) if l_wrist and l_shoulder: if l_wrist[0] < l_shoulder[0] - width * 0.05: frame_metrics['left_hand_position'] = 'front' elif l_wrist[0] > l_shoulder[0] + width * 0.05: frame_metrics['left_hand_position'] = 'back' else: frame_metrics['left_hand_position'] = 'side' if r_wrist and r_shoulder: if r_wrist[0] > r_shoulder[0] + width * 0.05: frame_metrics['right_hand_position'] = 'front' elif r_wrist[0] < r_shoulder[0] - width * 0.05: frame_metrics['right_hand_position'] = 'back' else: frame_metrics['right_hand_position'] = 'side' # Store current landmarks for next frame prev_landmarks = { 'l_wrist': l_wrist, 'r_wrist': r_wrist, 'l_shoulder': l_shoulder, 'r_shoulder': r_shoulder, 'shoulder_center': ( (l_shoulder[0] + r_shoulder[0]) / 2, (l_shoulder[1] + r_shoulder[1]) / 2 ) if l_shoulder and r_shoulder else None } else: prev_landmarks = None frame_data.append(frame_metrics) cap.release() logger.info(f"✓ Processed {frame_count} frames") if not frame_data: logger.warning("No frames processed") return self._create_empty_result("No frames processed") # Filter frames with detected pose pose_frames = [f for f in frame_data if f['pose_detected']] if len(pose_frames) < 10: logger.warning(f"Insufficient pose landmarks detected: {len(pose_frames)} frames") return self._create_empty_result("Insufficient pose data") logger.info(f"Frames with pose detected: {len(pose_frames)} / {len(frame_data)} ({len(pose_frames)/len(frame_data)*100:.1f}%)") # Analyze gestures with error protection try: analysis_result = self._analyze_gestures(pose_frames, fps, total_frames) except Exception as analyze_error: logger.error(f"Gesture analysis failed: {analyze_error}") # Return partial result instead of crashing return { 'success': False, 'gesture_analysis': { 'movement_score': 0, 'movement_category': 'unknown', 'error': str(analyze_error) } } logger.info(f"Gesture analysis complete: Score {analysis_result['gesture_analysis']['movement_score']:.1f}/10") return analysis_result except Exception as e: logger.error(f"Error in gesture analysis: {str(e)}") raise finally: # Clean up MediaPipe resources pose.close() logger.debug("MediaPipe Pose resources cleaned up") def _get_landmark_coords( self, landmarks: Any, idx: int, width: int, height: int ) -> Optional[Tuple[int, int, float]]: """Get landmark coordinates in pixel space with visibility""" if landmarks: lm = landmarks[idx] return (int(lm.x * width), int(lm.y * height), lm.visibility) return None def _calculate_movement_speed( self, prev_point: Tuple, curr_point: Tuple ) -> float: """Calculate movement speed between frames""" if prev_point is None or curr_point is None: return 0.0 return np.sqrt( (curr_point[0] - prev_point[0])**2 + (curr_point[1] - prev_point[1])**2 ) def _smooth_data(self, data: List[float], window_size: int = 5) -> np.ndarray: """Smooth data using Savitzky-Golay filter""" if len(data) < window_size: return np.array(data) try: # Ensure window_size is odd and valid if window_size % 2 == 0: window_size += 1 if len(data) <= window_size: window_size = len(data) - 1 if len(data) % 2 == 0 else len(data) if window_size < 3: return np.array(data) return savgol_filter(data, window_size, 2) except Exception as e: logger.warning(f"Smoothing failed: {e}, returning raw data") return np.array(data) def _analyze_gestures( self, pose_frames: List[Dict], fps: float, total_frames: int ) -> Dict[str, Any]: """Analyze gesture patterns and calculate scores""" # Extract movement data left_hand_movements = [f['left_hand_movement'] for f in pose_frames] right_hand_movements = [f['right_hand_movement'] for f in pose_frames] body_movements = [f['body_movement'] for f in pose_frames] # Calculate statistics avg_left_hand_speed = np.mean(left_hand_movements) avg_right_hand_speed = np.mean(right_hand_movements) avg_hand_speed = (avg_left_hand_speed + avg_right_hand_speed) / 2 max_left_hand_speed = np.max(left_hand_movements) max_right_hand_speed = np.max(right_hand_movements) max_hand_speed = max(max_left_hand_speed, max_right_hand_speed) avg_body_movement = np.mean(body_movements) max_body_movement = np.max(body_movements) # Hand activity percentage active_frames = [ f for f in pose_frames if f['left_hand_movement'] > self.config.MINIMAL_MOVEMENT_THRESHOLD or f['right_hand_movement'] > self.config.MINIMAL_MOVEMENT_THRESHOLD ] hand_activity_percentage = (len(active_frames) / len(pose_frames)) * 100 # Gesture frequency (peak detection) combined_movement = [ left_hand_movements[i] + right_hand_movements[i] for i in range(len(left_hand_movements)) ] smooth_movement = self._smooth_data(combined_movement) peaks = 0 threshold = self.config.MINIMAL_MOVEMENT_THRESHOLD * 2 for i in range(1, len(smooth_movement) - 1): if (smooth_movement[i] > threshold and smooth_movement[i] > smooth_movement[i-1] and smooth_movement[i] > smooth_movement[i+1]): peaks += 1 video_duration = total_frames / fps gesture_frequency = peaks / video_duration if video_duration > 0 else 0 # Body stability body_movement_variance = np.var(body_movements) if body_movement_variance < self.config.JITTER_THRESHOLD: jitter_level = 'low' elif body_movement_variance < self.config.JITTER_THRESHOLD * 2: jitter_level = 'medium' else: jitter_level = 'high' # Hand position distribution hand_positions = [] for f in pose_frames: if f['left_hand_position'] != 'unknown': hand_positions.append(f['left_hand_position']) if f['right_hand_position'] != 'unknown': hand_positions.append(f['right_hand_position']) if hand_positions: pos_counts = Counter(hand_positions) total_pos = len(hand_positions) hand_position_dist = { 'front': (pos_counts.get('front', 0) / total_pos) * 100, 'side': (pos_counts.get('side', 0) / total_pos) * 100, 'back': (pos_counts.get('back', 0) / total_pos) * 100 } else: hand_position_dist = {'front': 0.0, 'side': 0.0, 'back': 0.0} # Calculate movement score movement_score = self._calculate_movement_score( avg_hand_speed, max_hand_speed, gesture_frequency, body_movement_variance, jitter_level, hand_activity_percentage, hand_position_dist ) # Movement category if (avg_hand_speed > self.config.EXCESSIVE_MOVEMENT_THRESHOLD or gesture_frequency > self.config.HIGH_FREQUENCY or hand_activity_percentage > 80): movement_category = 'excessive' elif (avg_hand_speed < self.config.MINIMAL_MOVEMENT_THRESHOLD or gesture_frequency < self.config.LOW_FREQUENCY or hand_activity_percentage < 35): movement_category = 'minimal' else: movement_category = 'balanced' # Body stability score if jitter_level == 'low': body_stability_score = 9.0 elif jitter_level == 'medium': body_stability_score = 6.0 else: body_stability_score = 3.0 if avg_body_movement > 20: body_stability_score -= 2.0 body_stability_score = max(0, min(10, body_stability_score)) # Detect nervous gestures nervous_gestures_detected = ( gesture_frequency > self.config.HIGH_FREQUENCY or jitter_level == 'high' or hand_activity_percentage > 85 or max_hand_speed > 300 ) # Generate recommendations recommendations = self._generate_recommendations( gesture_frequency, hand_position_dist, max_hand_speed, hand_activity_percentage, jitter_level, avg_hand_speed, movement_score ) # Log analysis logger.info(f"Movement Metrics - Avg Speed: {avg_hand_speed:.2f}px, " f"Frequency: {gesture_frequency:.2f}/s, " f"Activity: {hand_activity_percentage:.1f}%, " f"Stability: {jitter_level}") return { 'gesture_analysis': { 'movement_score': round(movement_score, 1), 'movement_category': movement_category, 'gesture_frequency': round(gesture_frequency, 2), 'hand_activity_percentage': round(hand_activity_percentage, 1), 'body_stability_score': round(body_stability_score, 1), 'nervous_gestures_detected': nervous_gestures_detected, 'recommendations': recommendations, 'detailed_metrics': { 'avg_hand_movement_speed': round(avg_hand_speed, 2), 'max_hand_movement_speed': round(max_hand_speed, 2), 'avg_body_movement': round(avg_body_movement, 2), 'max_body_movement': round(max_body_movement, 2), 'body_sway_intensity': jitter_level, 'hand_position_distribution': { 'front': round(hand_position_dist['front'], 1), 'side': round(hand_position_dist['side'], 1), 'back': round(hand_position_dist['back'], 1) }, 'gesture_peaks_detected': peaks }, 'total_frames_analyzed': len(pose_frames), 'video_duration': round(video_duration, 2) } } def _calculate_movement_score( self, avg_hand_speed: float, max_hand_speed: float, gesture_frequency: float, body_variance: float, jitter_level: str, hand_activity: float, hand_position_dist: Dict[str, float] ) -> float: """Calculate movement score (0-10) based on multiple factors""" score = 10.0 # Penalty #1: Average Movement Speed if avg_hand_speed > self.config.EXCESSIVE_MOVEMENT_THRESHOLD: score -= 3.0 elif avg_hand_speed < self.config.MINIMAL_MOVEMENT_THRESHOLD: score -= 2.5 # Penalty #2: Max Speed Spikes if max_hand_speed > 300: score -= 2.0 elif max_hand_speed > 200: score -= 1.0 # Penalty #3: Gesture Frequency if gesture_frequency > 4.0: score -= 3.5 elif gesture_frequency > self.config.HIGH_FREQUENCY: score -= 2.5 elif gesture_frequency < self.config.LOW_FREQUENCY: score -= 2.0 # Penalty #4: Body Instability if jitter_level == 'high': score -= 2.0 elif jitter_level == 'medium': score -= 1.0 else: score += 0.5 # Bonus for stability # Penalty #5: Hand Position - Back if hand_position_dist['back'] > 35: score -= 2.5 elif hand_position_dist['back'] > 25: score -= 1.5 elif hand_position_dist['back'] > 15: score -= 0.5 # Penalty #6: Hand Position - Front if hand_position_dist['front'] < 40: score -= 2.0 elif hand_position_dist['front'] < 50: score -= 1.0 elif hand_position_dist['front'] > 60: score += 1.0 # Bonus # Penalty #7: Hand Activity if hand_activity > 85: score -= 1.5 elif hand_activity > 75: score -= 0.5 elif hand_activity < 30: score -= 1.5 return max(0, min(10, score)) def _generate_recommendations( self, gesture_frequency: float, hand_position_dist: Dict[str, float], max_hand_speed: float, hand_activity: float, jitter_level: str, avg_hand_speed: float, movement_score: float ) -> List[str]: """Generate actionable recommendations""" recommendations = [] if gesture_frequency > 4.0: recommendations.append("Reduce gesture frequency significantly (currently very high)") elif gesture_frequency > 3.0: recommendations.append("Reduce gesture frequency slightly") elif gesture_frequency < 0.5: recommendations.append("Increase gesture frequency for more expressiveness") if hand_position_dist['back'] > 30: recommendations.append("Keep hands visible in front - avoid hiding behind body") elif hand_position_dist['back'] > 20: recommendations.append("Try to position hands more in front for better engagement") if hand_position_dist['front'] < 45: recommendations.append("Bring hands forward more often - increases audience connection") if max_hand_speed > 300: recommendations.append("Avoid sudden explosive movements - use smooth gestures") if hand_activity > 80: recommendations.append("Add strategic pauses - let hands rest between key points") elif hand_activity < 35: recommendations.append("Increase hand activity - use more gestures to emphasize points") if jitter_level == 'high': recommendations.append("Work on body stability - reduce nervous movements and sway") if avg_hand_speed > 50: recommendations.append("Slow down hand movements - make gestures more deliberate") elif avg_hand_speed < 5: recommendations.append("Make gestures more dynamic - increase movement speed slightly") if movement_score >= 8.0: recommendations.append("Excellent gesture control! Very natural and professional.") if not recommendations: recommendations.append("Keep up the great work!") return recommendations def _create_empty_result(self, reason: str) -> Dict[str, Any]: """Create empty result when analysis fails""" return { 'gesture_analysis': { 'movement_score': 0.0, 'movement_category': 'unknown', 'gesture_frequency': 0.0, 'hand_activity_percentage': 0.0, 'body_stability_score': 0.0, 'nervous_gestures_detected': False, 'recommendations': [f"Analysis failed: {reason}"], 'detailed_metrics': { 'avg_hand_movement_speed': 0.0, 'max_hand_movement_speed': 0.0, 'avg_body_movement': 0.0, 'max_body_movement': 0.0, 'body_sway_intensity': 'unknown', 'hand_position_distribution': { 'front': 0.0, 'side': 0.0, 'back': 0.0 }, 'gesture_peaks_detected': 0 }, 'total_frames_analyzed': 0, 'video_duration': 0.0 } }