swara-api / app /services /gesture_detection.py
Zakha123-cyber
Add flexible GPU/CPU runtime for MediaPipe (Eye Tracking + Gesture Detection)
4283beb
"""
Gesture Detection Service
Refactored from Colab notebook for production use.
Detects body gestures and movements using MediaPipe Pose.
"""
import cv2
import numpy as np
import mediapipe as mp
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
from scipy.signal import savgol_filter
from collections import Counter
class GestureConfig:
"""Configuration untuk gesture detection thresholds"""
# Movement thresholds (dalam pixel)
EXCESSIVE_MOVEMENT_THRESHOLD = 50 # pixel/frame
MINIMAL_MOVEMENT_THRESHOLD = 5 # pixel/frame
# Frequency thresholds (gestures per second)
HIGH_FREQUENCY = 3.0
LOW_FREQUENCY = 0.5
# Stability thresholds
JITTER_THRESHOLD = 15 # pixel variance
# Hand position zones (relative to body)
FRONT_ZONE_THRESHOLD = 0.15 # 15cm di depan bahu
# Landmark indices
SHOULDER_LEFT = 11
SHOULDER_RIGHT = 12
ELBOW_LEFT = 13
ELBOW_RIGHT = 14
WRIST_LEFT = 15
WRIST_RIGHT = 16
HIP_LEFT = 23
HIP_RIGHT = 24
NOSE = 0
class GestureDetectionService:
"""
Gesture Detection Service for SWARA API
Analyzes hand movements, body stability, and gesture patterns
using MediaPipe Pose landmarks.
"""
_instance = None
_pose = None
_use_gpu = False
def __new__(cls):
"""Singleton pattern to avoid reloading MediaPipe multiple times"""
if cls._instance is None:
cls._instance = super().__new__(cls)
# ============================================================
# FLEXIBLE GPU/CPU RUNTIME FOR MEDIAPIPE
# ============================================================
# Check if GPU is available and configure MediaPipe accordingly
import torch
cls._use_gpu = torch.cuda.is_available()
if cls._use_gpu:
# GPU detected - Use GPU delegate for acceleration
logger.info(f"✓ GPU detected for Gesture Detection: {torch.cuda.get_device_name(0)}")
logger.info("✓ Configuring MediaPipe Pose with GPU delegate")
try:
cls._pose = mp.solutions.pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
logger.info("✓ MediaPipe Pose initialized with GPU acceleration")
except Exception as e:
logger.warning(f"⚠ GPU delegate failed, falling back to CPU: {e}")
cls._pose = mp.solutions.pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
else:
# CPU - Standard configuration
logger.info("ℹ No GPU detected, using CPU for Gesture Detection")
cls._pose = mp.solutions.pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
logger.info("GestureDetectionService initialized with MediaPipe Pose")
return cls._instance
def __init__(self):
"""Initialize service (called after __new__)"""
self.config = GestureConfig()
def analyze_video(
self,
video_path: str,
progress_callback: Optional[callable] = None
) -> Dict[str, Any]:
"""
Analyze gestures in a video file
Args:
video_path: Path to video file
progress_callback: Optional callback function(current, total, message)
Returns:
Dictionary containing gesture analysis results
"""
# Create fresh MediaPipe Pose instance to avoid timeout issues
# Use same GPU/CPU configuration as singleton
import torch
use_gpu = torch.cuda.is_available()
if use_gpu:
logger.debug(f"Creating fresh Pose instance with GPU support for video: {video_path}")
try:
pose = mp.solutions.pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
except Exception as e:
logger.warning(f"⚠ Fresh GPU instance failed, using CPU: {e}")
pose = mp.solutions.pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
else:
pose = mp.solutions.pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
try:
logger.info(f"Starting gesture analysis for: {video_path}")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video file: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
logger.info(f"Video Info: {width}x{height} @ {fps}FPS, Total frames: {total_frames}")
# Data storage
frame_data = []
frame_count = 0
prev_landmarks = None
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# Progress callback
if progress_callback and frame_count % 30 == 0:
progress = int((frame_count / total_frames) * 100)
progress_callback(frame_count, total_frames, f"Processing gestures: {progress}%")
# Convert to RGB for MediaPipe
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# MediaPipe processing with error handling
try:
results = pose.process(rgb_frame) # Use local pose instance
except Exception as pose_error:
logger.warning(f"MediaPipe processing failed at frame {frame_count}: {pose_error}")
results = None
# Initialize frame metrics
frame_metrics = {
'frame_number': frame_count,
'timestamp_start': (frame_count - 1) / fps,
'timestamp_end': frame_count / fps,
'pose_detected': False,
'left_hand_movement': 0.0,
'right_hand_movement': 0.0,
'body_movement': 0.0,
'left_hand_position': 'unknown',
'right_hand_position': 'unknown'
}
if results and results.pose_landmarks:
frame_metrics['pose_detected'] = True
landmarks = results.pose_landmarks.landmark
# Get key landmarks
l_wrist = self._get_landmark_coords(landmarks, self.config.WRIST_LEFT, width, height)
r_wrist = self._get_landmark_coords(landmarks, self.config.WRIST_RIGHT, width, height)
l_shoulder = self._get_landmark_coords(landmarks, self.config.SHOULDER_LEFT, width, height)
r_shoulder = self._get_landmark_coords(landmarks, self.config.SHOULDER_RIGHT, width, height)
# Calculate movements if previous frame exists
if prev_landmarks is not None:
if l_wrist and prev_landmarks.get('l_wrist'):
frame_metrics['left_hand_movement'] = self._calculate_movement_speed(
prev_landmarks['l_wrist'], l_wrist
)
if r_wrist and prev_landmarks.get('r_wrist'):
frame_metrics['right_hand_movement'] = self._calculate_movement_speed(
prev_landmarks['r_wrist'], r_wrist
)
# Body movement (center of shoulders)
if l_shoulder and r_shoulder and prev_landmarks.get('shoulder_center'):
shoulder_center = (
(l_shoulder[0] + r_shoulder[0]) / 2,
(l_shoulder[1] + r_shoulder[1]) / 2
)
frame_metrics['body_movement'] = self._calculate_movement_speed(
prev_landmarks['shoulder_center'], shoulder_center
)
# Determine hand positions (front/side/back)
if l_wrist and l_shoulder:
if l_wrist[0] < l_shoulder[0] - width * 0.05:
frame_metrics['left_hand_position'] = 'front'
elif l_wrist[0] > l_shoulder[0] + width * 0.05:
frame_metrics['left_hand_position'] = 'back'
else:
frame_metrics['left_hand_position'] = 'side'
if r_wrist and r_shoulder:
if r_wrist[0] > r_shoulder[0] + width * 0.05:
frame_metrics['right_hand_position'] = 'front'
elif r_wrist[0] < r_shoulder[0] - width * 0.05:
frame_metrics['right_hand_position'] = 'back'
else:
frame_metrics['right_hand_position'] = 'side'
# Store current landmarks for next frame
prev_landmarks = {
'l_wrist': l_wrist,
'r_wrist': r_wrist,
'l_shoulder': l_shoulder,
'r_shoulder': r_shoulder,
'shoulder_center': (
(l_shoulder[0] + r_shoulder[0]) / 2,
(l_shoulder[1] + r_shoulder[1]) / 2
) if l_shoulder and r_shoulder else None
}
else:
prev_landmarks = None
frame_data.append(frame_metrics)
cap.release()
logger.info(f"✓ Processed {frame_count} frames")
if not frame_data:
logger.warning("No frames processed")
return self._create_empty_result("No frames processed")
# Filter frames with detected pose
pose_frames = [f for f in frame_data if f['pose_detected']]
if len(pose_frames) < 10:
logger.warning(f"Insufficient pose landmarks detected: {len(pose_frames)} frames")
return self._create_empty_result("Insufficient pose data")
logger.info(f"Frames with pose detected: {len(pose_frames)} / {len(frame_data)} ({len(pose_frames)/len(frame_data)*100:.1f}%)")
# Analyze gestures with error protection
try:
analysis_result = self._analyze_gestures(pose_frames, fps, total_frames)
except Exception as analyze_error:
logger.error(f"Gesture analysis failed: {analyze_error}")
# Return partial result instead of crashing
return {
'success': False,
'gesture_analysis': {
'movement_score': 0,
'movement_category': 'unknown',
'error': str(analyze_error)
}
}
logger.info(f"Gesture analysis complete: Score {analysis_result['gesture_analysis']['movement_score']:.1f}/10")
return analysis_result
except Exception as e:
logger.error(f"Error in gesture analysis: {str(e)}")
raise
finally:
# Clean up MediaPipe resources
pose.close()
logger.debug("MediaPipe Pose resources cleaned up")
def _get_landmark_coords(
self,
landmarks: Any,
idx: int,
width: int,
height: int
) -> Optional[Tuple[int, int, float]]:
"""Get landmark coordinates in pixel space with visibility"""
if landmarks:
lm = landmarks[idx]
return (int(lm.x * width), int(lm.y * height), lm.visibility)
return None
def _calculate_movement_speed(
self,
prev_point: Tuple,
curr_point: Tuple
) -> float:
"""Calculate movement speed between frames"""
if prev_point is None or curr_point is None:
return 0.0
return np.sqrt(
(curr_point[0] - prev_point[0])**2 +
(curr_point[1] - prev_point[1])**2
)
def _smooth_data(self, data: List[float], window_size: int = 5) -> np.ndarray:
"""Smooth data using Savitzky-Golay filter"""
if len(data) < window_size:
return np.array(data)
try:
# Ensure window_size is odd and valid
if window_size % 2 == 0:
window_size += 1
if len(data) <= window_size:
window_size = len(data) - 1 if len(data) % 2 == 0 else len(data)
if window_size < 3:
return np.array(data)
return savgol_filter(data, window_size, 2)
except Exception as e:
logger.warning(f"Smoothing failed: {e}, returning raw data")
return np.array(data)
def _analyze_gestures(
self,
pose_frames: List[Dict],
fps: float,
total_frames: int
) -> Dict[str, Any]:
"""Analyze gesture patterns and calculate scores"""
# Extract movement data
left_hand_movements = [f['left_hand_movement'] for f in pose_frames]
right_hand_movements = [f['right_hand_movement'] for f in pose_frames]
body_movements = [f['body_movement'] for f in pose_frames]
# Calculate statistics
avg_left_hand_speed = np.mean(left_hand_movements)
avg_right_hand_speed = np.mean(right_hand_movements)
avg_hand_speed = (avg_left_hand_speed + avg_right_hand_speed) / 2
max_left_hand_speed = np.max(left_hand_movements)
max_right_hand_speed = np.max(right_hand_movements)
max_hand_speed = max(max_left_hand_speed, max_right_hand_speed)
avg_body_movement = np.mean(body_movements)
max_body_movement = np.max(body_movements)
# Hand activity percentage
active_frames = [
f for f in pose_frames
if f['left_hand_movement'] > self.config.MINIMAL_MOVEMENT_THRESHOLD or
f['right_hand_movement'] > self.config.MINIMAL_MOVEMENT_THRESHOLD
]
hand_activity_percentage = (len(active_frames) / len(pose_frames)) * 100
# Gesture frequency (peak detection)
combined_movement = [
left_hand_movements[i] + right_hand_movements[i]
for i in range(len(left_hand_movements))
]
smooth_movement = self._smooth_data(combined_movement)
peaks = 0
threshold = self.config.MINIMAL_MOVEMENT_THRESHOLD * 2
for i in range(1, len(smooth_movement) - 1):
if (smooth_movement[i] > threshold and
smooth_movement[i] > smooth_movement[i-1] and
smooth_movement[i] > smooth_movement[i+1]):
peaks += 1
video_duration = total_frames / fps
gesture_frequency = peaks / video_duration if video_duration > 0 else 0
# Body stability
body_movement_variance = np.var(body_movements)
if body_movement_variance < self.config.JITTER_THRESHOLD:
jitter_level = 'low'
elif body_movement_variance < self.config.JITTER_THRESHOLD * 2:
jitter_level = 'medium'
else:
jitter_level = 'high'
# Hand position distribution
hand_positions = []
for f in pose_frames:
if f['left_hand_position'] != 'unknown':
hand_positions.append(f['left_hand_position'])
if f['right_hand_position'] != 'unknown':
hand_positions.append(f['right_hand_position'])
if hand_positions:
pos_counts = Counter(hand_positions)
total_pos = len(hand_positions)
hand_position_dist = {
'front': (pos_counts.get('front', 0) / total_pos) * 100,
'side': (pos_counts.get('side', 0) / total_pos) * 100,
'back': (pos_counts.get('back', 0) / total_pos) * 100
}
else:
hand_position_dist = {'front': 0.0, 'side': 0.0, 'back': 0.0}
# Calculate movement score
movement_score = self._calculate_movement_score(
avg_hand_speed, max_hand_speed, gesture_frequency,
body_movement_variance, jitter_level, hand_activity_percentage,
hand_position_dist
)
# Movement category
if (avg_hand_speed > self.config.EXCESSIVE_MOVEMENT_THRESHOLD or
gesture_frequency > self.config.HIGH_FREQUENCY or
hand_activity_percentage > 80):
movement_category = 'excessive'
elif (avg_hand_speed < self.config.MINIMAL_MOVEMENT_THRESHOLD or
gesture_frequency < self.config.LOW_FREQUENCY or
hand_activity_percentage < 35):
movement_category = 'minimal'
else:
movement_category = 'balanced'
# Body stability score
if jitter_level == 'low':
body_stability_score = 9.0
elif jitter_level == 'medium':
body_stability_score = 6.0
else:
body_stability_score = 3.0
if avg_body_movement > 20:
body_stability_score -= 2.0
body_stability_score = max(0, min(10, body_stability_score))
# Detect nervous gestures
nervous_gestures_detected = (
gesture_frequency > self.config.HIGH_FREQUENCY or
jitter_level == 'high' or
hand_activity_percentage > 85 or
max_hand_speed > 300
)
# Generate recommendations
recommendations = self._generate_recommendations(
gesture_frequency, hand_position_dist, max_hand_speed,
hand_activity_percentage, jitter_level, avg_hand_speed,
movement_score
)
# Log analysis
logger.info(f"Movement Metrics - Avg Speed: {avg_hand_speed:.2f}px, "
f"Frequency: {gesture_frequency:.2f}/s, "
f"Activity: {hand_activity_percentage:.1f}%, "
f"Stability: {jitter_level}")
return {
'gesture_analysis': {
'movement_score': round(movement_score, 1),
'movement_category': movement_category,
'gesture_frequency': round(gesture_frequency, 2),
'hand_activity_percentage': round(hand_activity_percentage, 1),
'body_stability_score': round(body_stability_score, 1),
'nervous_gestures_detected': nervous_gestures_detected,
'recommendations': recommendations,
'detailed_metrics': {
'avg_hand_movement_speed': round(avg_hand_speed, 2),
'max_hand_movement_speed': round(max_hand_speed, 2),
'avg_body_movement': round(avg_body_movement, 2),
'max_body_movement': round(max_body_movement, 2),
'body_sway_intensity': jitter_level,
'hand_position_distribution': {
'front': round(hand_position_dist['front'], 1),
'side': round(hand_position_dist['side'], 1),
'back': round(hand_position_dist['back'], 1)
},
'gesture_peaks_detected': peaks
},
'total_frames_analyzed': len(pose_frames),
'video_duration': round(video_duration, 2)
}
}
def _calculate_movement_score(
self,
avg_hand_speed: float,
max_hand_speed: float,
gesture_frequency: float,
body_variance: float,
jitter_level: str,
hand_activity: float,
hand_position_dist: Dict[str, float]
) -> float:
"""Calculate movement score (0-10) based on multiple factors"""
score = 10.0
# Penalty #1: Average Movement Speed
if avg_hand_speed > self.config.EXCESSIVE_MOVEMENT_THRESHOLD:
score -= 3.0
elif avg_hand_speed < self.config.MINIMAL_MOVEMENT_THRESHOLD:
score -= 2.5
# Penalty #2: Max Speed Spikes
if max_hand_speed > 300:
score -= 2.0
elif max_hand_speed > 200:
score -= 1.0
# Penalty #3: Gesture Frequency
if gesture_frequency > 4.0:
score -= 3.5
elif gesture_frequency > self.config.HIGH_FREQUENCY:
score -= 2.5
elif gesture_frequency < self.config.LOW_FREQUENCY:
score -= 2.0
# Penalty #4: Body Instability
if jitter_level == 'high':
score -= 2.0
elif jitter_level == 'medium':
score -= 1.0
else:
score += 0.5 # Bonus for stability
# Penalty #5: Hand Position - Back
if hand_position_dist['back'] > 35:
score -= 2.5
elif hand_position_dist['back'] > 25:
score -= 1.5
elif hand_position_dist['back'] > 15:
score -= 0.5
# Penalty #6: Hand Position - Front
if hand_position_dist['front'] < 40:
score -= 2.0
elif hand_position_dist['front'] < 50:
score -= 1.0
elif hand_position_dist['front'] > 60:
score += 1.0 # Bonus
# Penalty #7: Hand Activity
if hand_activity > 85:
score -= 1.5
elif hand_activity > 75:
score -= 0.5
elif hand_activity < 30:
score -= 1.5
return max(0, min(10, score))
def _generate_recommendations(
self,
gesture_frequency: float,
hand_position_dist: Dict[str, float],
max_hand_speed: float,
hand_activity: float,
jitter_level: str,
avg_hand_speed: float,
movement_score: float
) -> List[str]:
"""Generate actionable recommendations"""
recommendations = []
if gesture_frequency > 4.0:
recommendations.append("Reduce gesture frequency significantly (currently very high)")
elif gesture_frequency > 3.0:
recommendations.append("Reduce gesture frequency slightly")
elif gesture_frequency < 0.5:
recommendations.append("Increase gesture frequency for more expressiveness")
if hand_position_dist['back'] > 30:
recommendations.append("Keep hands visible in front - avoid hiding behind body")
elif hand_position_dist['back'] > 20:
recommendations.append("Try to position hands more in front for better engagement")
if hand_position_dist['front'] < 45:
recommendations.append("Bring hands forward more often - increases audience connection")
if max_hand_speed > 300:
recommendations.append("Avoid sudden explosive movements - use smooth gestures")
if hand_activity > 80:
recommendations.append("Add strategic pauses - let hands rest between key points")
elif hand_activity < 35:
recommendations.append("Increase hand activity - use more gestures to emphasize points")
if jitter_level == 'high':
recommendations.append("Work on body stability - reduce nervous movements and sway")
if avg_hand_speed > 50:
recommendations.append("Slow down hand movements - make gestures more deliberate")
elif avg_hand_speed < 5:
recommendations.append("Make gestures more dynamic - increase movement speed slightly")
if movement_score >= 8.0:
recommendations.append("Excellent gesture control! Very natural and professional.")
if not recommendations:
recommendations.append("Keep up the great work!")
return recommendations
def _create_empty_result(self, reason: str) -> Dict[str, Any]:
"""Create empty result when analysis fails"""
return {
'gesture_analysis': {
'movement_score': 0.0,
'movement_category': 'unknown',
'gesture_frequency': 0.0,
'hand_activity_percentage': 0.0,
'body_stability_score': 0.0,
'nervous_gestures_detected': False,
'recommendations': [f"Analysis failed: {reason}"],
'detailed_metrics': {
'avg_hand_movement_speed': 0.0,
'max_hand_movement_speed': 0.0,
'avg_body_movement': 0.0,
'max_body_movement': 0.0,
'body_sway_intensity': 'unknown',
'hand_position_distribution': {
'front': 0.0,
'side': 0.0,
'back': 0.0
},
'gesture_peaks_detected': 0
},
'total_frames_analyzed': 0,
'video_duration': 0.0
}
}