swara-api / app /services /video_processor.py
Zakha123-cyber
Fix: Case-insensitive expression key lookup for accurate percentages
8505f24
"""
Video Processor Orchestrator
This module coordinates all AI models and creates the final analysis result.
"""
import cv2 as cv
import time
from typing import Dict, Any, Optional, Callable
from loguru import logger
from concurrent.futures import ThreadPoolExecutor, as_completed
from app.config import settings
from app.services.eye_tracking import EyeTrackingService
from app.services.facial_expression import FacialExpressionService
from app.services.gesture_detection import GestureDetectionService
from app.models import (
AnalysisResult,
VideoMetadata,
MainIndicators,
BonusIndicators,
IndicatorResult,
Level
)
class VideoProcessor:
"""
Main video processor that orchestrates all AI models
"""
def __init__(self):
"""Initialize video processor with all services"""
self.eye_tracking_service = None
self.facial_expression_service = None
self.gesture_service = None
logger.info("VideoProcessor initialized")
def _load_models(self):
"""Lazy load models"""
if self.eye_tracking_service is None:
logger.info("Loading Eye Tracking model...")
self.eye_tracking_service = EyeTrackingService()
if self.facial_expression_service is None:
logger.info("Loading Facial Expression model...")
self.facial_expression_service = FacialExpressionService()
if self.gesture_service is None:
logger.info("Loading Gesture Detection model...")
self.gesture_service = GestureDetectionService()
logger.info("✓ All models loaded")
def process_video(
self,
video_path: str,
level: int,
progress_callback: Optional[Callable] = None
) -> Dict[str, Any]:
"""
Process video and return analysis results
Args:
video_path: Path to video file
level: Public speaking level (1-5)
progress_callback: Optional callback for progress updates
Signature: callback(step: str, percentage: float, message: str)
Returns:
Dict containing analysis results
"""
start_time = time.time()
try:
# Load models
if progress_callback:
progress_callback("loading_models", 10, "Loading AI models...")
self._load_models()
# Get video metadata
if progress_callback:
progress_callback("reading_video", 15, "Reading video metadata...")
metadata = self._get_video_metadata(video_path)
# Determine which indicators to process based on level
indicators_config = self._get_indicators_for_level(level)
# Process all models in parallel
if progress_callback:
progress_callback("processing", 20, "Processing video with AI models...")
results = self._process_models_parallel(
video_path,
indicators_config,
progress_callback
)
# Build final result
if progress_callback:
progress_callback("finalizing", 90, "Building final analysis...")
analysis_result = self._build_analysis_result(
level=level,
metadata=metadata,
results=results
)
processing_time = time.time() - start_time
if progress_callback:
progress_callback("completed", 100, f"Analysis completed in {processing_time:.2f}s")
logger.info(f"✓ Video processed successfully in {processing_time:.2f}s")
return analysis_result
except Exception as e:
logger.error(f"✗ Video processing failed: {e}")
raise
def _get_video_metadata(self, video_path: str) -> VideoMetadata:
"""Extract video metadata"""
try:
cap = cv.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
fps = int(cap.get(cv.CAP_PROP_FPS))
width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps if fps > 0 else 0
cap.release()
# Get file size
import os
file_size = os.path.getsize(video_path)
return VideoMetadata(
duration=round(duration, 2),
fps=fps,
resolution=f"{width}x{height}",
file_size=file_size
)
except Exception as e:
logger.error(f"Failed to get video metadata: {e}")
raise
def _get_indicators_for_level(self, level: int) -> Dict[str, bool]:
"""
Determine which indicators to process based on level
Returns:
Dict with indicator names and whether to process them
"""
config = {
# Main indicators (always processed if in level)
"kontak_mata": level >= 1, # FIX: Level 1 juga butuh kontak mata!
"kesesuaian_topik": level >= 3,
"struktur_kalimat": level >= 5,
# Bonus indicators (always processed for all levels)
"face_expression": True,
"gesture": True,
"first_impression": True,
# Audio indicators (placeholder - not implemented yet)
"tempo": False,
"artikulasi": False,
"jeda": False,
"kata_pengisi": False,
"kata_tidak_senonoh": False
}
return config
def _process_models_parallel(
self,
video_path: str,
indicators_config: Dict[str, bool],
progress_callback: Optional[Callable] = None
) -> Dict[str, Any]:
"""
Process all required models SEQUENTIALLY to avoid OOM
(Renamed but kept for compatibility - actually sequential now)
Returns:
Dict with results from each model
"""
results = {}
# Define tasks to run
tasks = []
# Eye tracking (for kontak_mata)
if indicators_config.get("kontak_mata", False):
tasks.append(("eye_tracking", self.eye_tracking_service.analyze_video))
# Facial expression (always run for first_impression and face_expression)
if indicators_config.get("face_expression", False):
tasks.append(("facial_expression", self.facial_expression_service.analyze_video))
# Gesture detection (always run)
if indicators_config.get("gesture", False):
tasks.append(("gesture", self.gesture_service.analyze_video))
# Process tasks SEQUENTIALLY to avoid memory overflow
total = len(tasks)
for idx, (task_name, func) in enumerate(tasks, 1):
try:
logger.info(f"⏳ Processing {task_name} ({idx}/{total})...")
if progress_callback:
pct = 20 + ((idx - 1) / total) * 60 # 20% to 80%
progress_callback(
"processing",
pct,
f"Processing {task_name} ({idx}/{total})..."
)
# Run analysis
result = func(video_path)
results[task_name] = result
if progress_callback:
pct = 20 + (idx / total) * 60
progress_callback(
"processing",
pct,
f"Completed {task_name} ({idx}/{total})"
)
logger.info(f"✓ {task_name} completed")
except Exception as e:
logger.error(f"✗ {task_name} failed: {e}")
results[task_name] = {"error": str(e)}
return results
return results
def _build_analysis_result(
self,
level: int,
metadata: VideoMetadata,
results: Dict[str, Any]
) -> Dict[str, Any]:
"""
Build final analysis result in clean, focused format
Returns:
Dict with structure: video_info, analysis_results (eye_contact, facial_expression, gesture), overall
"""
# Extract results from each service
eye_data = results.get("eye_tracking", {})
face_data = results.get("facial_expression", {})
gesture_data = results.get("gesture", {})
# Build video_info
video_info = {
"duration": round(metadata.duration, 2),
"fps": metadata.fps,
"resolution": metadata.resolution,
"file_size": metadata.file_size
}
# Build analysis_results
analysis_results = {
"eye_contact": self._format_eye_contact(eye_data),
"facial_expression": self._format_facial_expression(face_data, metadata.fps),
"gesture": self._format_gesture(gesture_data)
}
# Build overall summary
overall = self._build_overall_summary(analysis_results)
# Final response structure
result = {
"video_info": video_info,
"analysis_results": analysis_results,
"overall": overall
}
return result
def _format_eye_contact(self, eye_data: Dict[str, Any]) -> Dict[str, Any]:
"""Format eye contact analysis to clean structure"""
if not eye_data or "eye_contact_analysis" not in eye_data:
return self._empty_eye_contact()
eye_analysis = eye_data["eye_contact_analysis"]
return {
"score": eye_analysis.get("score", 0),
"rating": eye_analysis.get("rating", "Unknown"),
"summary": {
"gaze_away_time": round(eye_analysis.get("total_gaze_away_time", 0), 2),
"gaze_away_percentage": round(eye_analysis.get("gaze_away_percentage", 0), 1),
"center_percentage": round(
eye_analysis.get("position_distribution", {}).get("CENTER", {}).get("percentage", 0), 1
),
"blinks_per_minute": round(eye_data.get("blink_analysis", {}).get("blinks_per_minute", 0), 1)
},
"details": {
"total_blinks": eye_data.get("blink_analysis", {}).get("total_blinks", 0),
"gaze_distribution": {
"center": eye_analysis.get("position_distribution", {}).get("CENTER", {}).get("frames", 0),
"left": eye_analysis.get("position_distribution", {}).get("LEFT", {}).get("frames", 0),
"right": eye_analysis.get("position_distribution", {}).get("RIGHT", {}).get("frames", 0),
"unknown": eye_analysis.get("position_distribution", {}).get("UNKNOWN", {}).get("frames", 0)
}
}
}
def _format_facial_expression(self, face_data: Dict[str, Any], fps: int) -> Dict[str, Any]:
"""Format facial expression analysis with first impression"""
if not face_data or "summary" not in face_data:
return self._empty_facial_expression()
summary = face_data["summary"]
expressions = summary.get("expression_distribution", {})
# Normalize expression keys to lowercase for consistent access
expressions_lower = {k.lower(): v for k, v in expressions.items()}
# Calculate overall percentages (expression_distribution already contains percentages)
happy_pct = round(expressions_lower.get("happy", 0), 1)
neutral_pct = round(expressions_lower.get("neutral", 0), 1)
negative_pct = round(
expressions_lower.get("sad", 0) +
expressions_lower.get("angry", 0) +
expressions_lower.get("fear", 0), 1
)
# Get dominant expression from service
dominant_display = summary.get("dominant_expression", "Unknown").capitalize()
# Calculate score (0-10) based on positive expressions
positive_pct = happy_pct + expressions_lower.get("surprise", 0)
score = min(10, round(positive_pct / 10, 1))
# Calculate average confidence from frame data
frame_data = face_data.get("statistics_df", [])
avg_confidence = 0.0
if frame_data:
valid_frames = [f for f in frame_data if f.get("expression") not in ["no_face", "background"]]
if valid_frames:
total_conf = sum(f.get("confidence", 0) for f in valid_frames)
avg_confidence = round(total_conf / len(valid_frames), 2)
# Build first impression (first 3 seconds)
first_impression = self._analyze_first_impression(face_data, fps)
return {
"score": score,
"dominant_expression": dominant_display,
"first_impression": first_impression,
"overall_summary": {
"happy_percentage": happy_pct,
"neutral_percentage": neutral_pct,
"negative_percentage": negative_pct
},
"details": {
"expressions": {k: round(v, 1) for k, v in expressions.items()}, # Keep original keys for display
"confidence_avg": avg_confidence
}
}
def _analyze_first_impression(self, face_data: Dict[str, Any], fps: int) -> Dict[str, Any]:
"""
Analyze first 3 seconds of video for first impression
Args:
face_data: Facial expression data from service
fps: Frames per second
Returns:
First impression analysis dict
"""
# Default values
first_duration = 3.0 # 3 seconds
frames_to_analyze = int(first_duration * fps)
# Get frame-by-frame data (stored in 'statistics_df' key)
frame_data = face_data.get("statistics_df", [])
if not frame_data:
# Fallback: use overall distribution
return self._first_impression_from_overall(face_data, first_duration, frames_to_analyze)
# Filter frames in first 3 seconds and with valid faces
first_frames = [
f for f in frame_data
if f.get("timestamp_start", 999) < first_duration
and f.get("expression") not in ["no_face", "background"]
]
actual_frames = len(first_frames)
if actual_frames == 0:
return self._first_impression_from_overall(face_data, first_duration, frames_to_analyze)
# Count expressions in first frames (normalize to lowercase for consistency)
expression_counts = {}
confidence_sum = 0
for frame in first_frames:
expr = frame.get("expression", "neutral").lower() # Normalize to lowercase
conf = frame.get("confidence", 0)
expression_counts[expr] = expression_counts.get(expr, 0) + 1
confidence_sum += conf
# Get dominant expression in first impression
if expression_counts:
dominant_key = max(expression_counts.items(), key=lambda x: x[1])[0]
dominant_expr = dominant_key.capitalize() # Display format
dominant_count = expression_counts[dominant_key] # Use same key
else:
dominant_expr = "Neutral"
dominant_count = 0
dominant_pct = round((dominant_count / actual_frames) * 100, 1) if actual_frames > 0 else 0
# Calculate average confidence
avg_confidence = round(confidence_sum / actual_frames, 2) if actual_frames > 0 else 0
# Generate rating and description
rating, description = self._rate_first_impression(dominant_expr, dominant_pct)
return {
"expression": dominant_expr,
"confidence": avg_confidence,
"percentage": dominant_pct,
"duration_analyzed": round(first_duration, 1),
"frames_analyzed": actual_frames,
"rating": rating,
"description": description
}
def _first_impression_from_overall(self, face_data: Dict[str, Any], duration: float, frames: int) -> Dict[str, Any]:
"""Fallback first impression from overall data"""
summary = face_data.get("summary", {})
expressions = summary.get("expression_distribution", {})
if not expressions:
return {
"expression": "Unknown",
"confidence": 0.0,
"percentage": 0.0,
"duration_analyzed": duration,
"frames_analyzed": 0,
"rating": "Tidak Dapat Dianalisis",
"description": "Data ekspresi tidak tersedia"
}
# Use overall dominant expression as approximation
# expression_distribution already contains percentages from service
dominant_key = max(expressions.items(), key=lambda x: x[1])[0]
dominant_display = dominant_key.capitalize()
percentage = round(expressions[dominant_key], 1) # Already percentage
# Calculate average confidence from frame data if available
frame_data = face_data.get("statistics_df", [])
avg_confidence = 0.0
if frame_data:
valid_frames = [f for f in frame_data if f.get("expression") not in ["no_face", "background"]]
if valid_frames:
total_conf = sum(f.get("confidence", 0) for f in valid_frames)
avg_confidence = round(total_conf / len(valid_frames), 2)
rating, description = self._rate_first_impression(dominant_display, percentage)
return {
"expression": dominant_display,
"confidence": avg_confidence,
"percentage": percentage,
"duration_analyzed": duration,
"frames_analyzed": frames,
"rating": rating,
"description": description
}
def _rate_first_impression(self, expression: str, percentage: float) -> tuple[str, str]:
"""
Generate rating and description for first impression
Args:
expression: Dominant expression
percentage: Percentage of dominant expression
Returns:
(rating, description) tuple
"""
# Positive expressions
if expression.lower() in ["happy", "surprise"]:
if percentage >= 70:
return ("Sangat Baik", f"Sangat positif dan energik - kesan pertama yang sangat kuat")
elif percentage >= 50:
return ("Baik", f"Positif dan menyambut - kesan pertama yang baik")
else:
return ("Cukup Baik", f"Cukup positif - kesan pertama yang dapat ditingkatkan")
# Neutral expression
elif expression.lower() == "neutral":
if percentage >= 60:
return ("Baik", "Tenang dan profesional - kesan pertama yang stabil")
else:
return ("Cukup Baik", "Netral - kesan pertama yang cukup baik")
# Negative expressions
else:
if percentage >= 50:
return ("Buruk", f"Tampak {expression.lower()} - kesan pertama yang perlu diperbaiki")
else:
return ("Cukup Baik", f"Sedikit terlihat {expression.lower()} - perlu lebih percaya diri")
def _format_gesture(self, gesture_data: Dict[str, Any]) -> Dict[str, Any]:
"""Format gesture analysis to clean structure"""
if not gesture_data or "gesture_analysis" not in gesture_data:
return self._empty_gesture()
gesture_analysis = gesture_data["gesture_analysis"]
detailed = gesture_analysis.get("detailed_metrics", {})
return {
"score": round(gesture_analysis.get("movement_score", 0), 1),
"movement_category": gesture_analysis.get("movement_category", "unknown"),
"summary": {
"hand_activity_percentage": round(gesture_analysis.get("hand_activity_percentage", 0), 1),
"gesture_frequency": round(gesture_analysis.get("gesture_frequency", 0), 1),
"body_stability_score": round(gesture_analysis.get("body_stability_score", 0), 1)
},
"details": {
"avg_hand_movement_speed": round(detailed.get("avg_hand_movement_speed", 0), 1),
"hand_position_distribution": {
"front": round(detailed.get("hand_position_distribution", {}).get("front", 0), 1),
"side": round(detailed.get("hand_position_distribution", {}).get("side", 0), 1),
"back": round(detailed.get("hand_position_distribution", {}).get("back", 0), 1)
},
"nervous_gestures_detected": gesture_analysis.get("nervous_gestures_detected", False)
},
"recommendations": gesture_analysis.get("recommendations", [])
}
def _build_overall_summary(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]:
"""
Build overall summary with strengths and areas for improvement
Args:
analysis_results: Dict containing eye_contact, facial_expression, gesture
Returns:
Overall summary dict
"""
eye = analysis_results.get("eye_contact", {})
face = analysis_results.get("facial_expression", {})
gesture = analysis_results.get("gesture", {})
# Calculate scores
eye_score = eye.get("score", 0) # out of 5
face_score = face.get("score", 0) # out of 10
gesture_score = gesture.get("score", 0) # out of 10
# Normalize to same scale (out of 10)
eye_score_normalized = eye_score * 2 # 5 -> 10
# Total and average
total_score = round(eye_score_normalized + face_score + gesture_score, 1) # out of 30
average_score = round(total_score / 3, 1) # out of 10
# Overall rating
rating = self._get_overall_rating(average_score)
# Build strengths
strengths = self._identify_strengths(eye, face, gesture)
# Build areas for improvement
improvements = self._identify_improvements(eye, face, gesture)
return {
"total_score": total_score,
"average_score": average_score,
"rating": rating,
"strengths": strengths,
"areas_for_improvement": improvements
}
def _get_overall_rating(self, average_score: float) -> str:
"""Get overall rating from average score"""
if average_score >= 8.5:
return "Sangat Baik"
elif average_score >= 7.0:
return "Baik"
elif average_score >= 5.5:
return "Cukup Baik"
elif average_score >= 4.0:
return "Buruk"
else:
return "Perlu Ditingkatkan"
def _identify_strengths(self, eye: Dict, face: Dict, gesture: Dict) -> list[str]:
"""Identify strengths based on high scores"""
strengths = []
# Eye contact
eye_score = eye.get("score", 0)
if eye_score >= 4:
center_pct = eye.get("summary", {}).get("center_percentage", 0)
strengths.append(f"Excellent eye contact (Score {eye_score}/5, {center_pct}% center gaze)")
# First impression
first_imp = face.get("first_impression", {})
if first_imp.get("rating") in ["Sangat Baik", "Baik"]:
expr = first_imp.get("expression", "")
pct = first_imp.get("percentage", 0)
strengths.append(f"Strong first impression ({pct}% {expr.lower()} in first 3 seconds)")
# Facial expression overall
face_score = face.get("score", 0)
happy_pct = face.get("overall_summary", {}).get("happy_percentage", 0)
if happy_pct >= 50:
strengths.append(f"Positive facial expressions ({happy_pct}% happy overall)")
# Gesture
gesture_score = gesture.get("score", 0)
movement_cat = gesture.get("movement_category", "")
if gesture_score >= 7 and movement_cat == "balanced":
strengths.append("Balanced and natural body language")
# Body stability
stability = gesture.get("summary", {}).get("body_stability_score", 0)
if stability >= 8:
strengths.append(f"Excellent body stability (Score {stability}/10)")
return strengths if strengths else ["Analysis completed successfully"]
def _identify_improvements(self, eye: Dict, face: Dict, gesture: Dict) -> list[str]:
"""Identify areas for improvement based on low scores"""
improvements = []
# Eye contact
eye_score = eye.get("score", 0)
gaze_away_pct = eye.get("summary", {}).get("gaze_away_percentage", 0)
if eye_score < 3 or gaze_away_pct > 20:
improvements.append(f"Improve eye contact - reduce gaze away time (currently {gaze_away_pct}%)")
# First impression
first_imp = face.get("first_impression", {})
if first_imp.get("rating") in ["Buruk", "Cukup Baik"]:
improvements.append(f"Work on first impression - start with more positive energy")
# Facial expression
happy_pct = face.get("overall_summary", {}).get("happy_percentage", 0)
negative_pct = face.get("overall_summary", {}).get("negative_percentage", 0)
if happy_pct < 40:
improvements.append("Increase positive facial expressions - smile more naturally")
if negative_pct > 15:
improvements.append(f"Reduce negative expressions (currently {negative_pct}%)")
# Gesture - hand position
hand_pos = gesture.get("details", {}).get("hand_position_distribution", {})
back_pct = hand_pos.get("back", 0)
if back_pct > 10:
improvements.append(f"Keep hands more visible - reduce movements behind body (currently {back_pct}%)")
# Gesture - frequency
freq = gesture.get("summary", {}).get("gesture_frequency", 0)
if freq > 3.5:
improvements.append("Reduce gesture frequency - too many rapid movements")
elif freq < 0.8:
improvements.append("Increase gesture usage - add more expressiveness")
# Nervous gestures
if gesture.get("details", {}).get("nervous_gestures_detected", False):
improvements.append("Work on reducing nervous gestures")
return improvements if improvements else ["Keep up the great work!"]
def _empty_eye_contact(self) -> Dict[str, Any]:
"""Return empty eye contact structure"""
return {
"score": 0,
"rating": "No Data",
"summary": {
"gaze_away_time": 0,
"gaze_away_percentage": 0,
"center_percentage": 0,
"blinks_per_minute": 0
},
"details": {
"total_blinks": 0,
"gaze_distribution": {"center": 0, "left": 0, "right": 0, "unknown": 0}
}
}
def _empty_facial_expression(self) -> Dict[str, Any]:
"""Return empty facial expression structure"""
return {
"score": 0,
"dominant_expression": "Unknown",
"first_impression": {
"expression": "Unknown",
"confidence": 0,
"percentage": 0,
"duration_analyzed": 3.0,
"frames_analyzed": 0,
"rating": "No Data",
"description": "No facial expression data available"
},
"overall_summary": {
"happy_percentage": 0,
"neutral_percentage": 0,
"negative_percentage": 0
},
"details": {
"expressions": {},
"confidence_avg": 0
}
}
def _empty_gesture(self) -> Dict[str, Any]:
"""Return empty gesture structure"""
return {
"score": 0,
"movement_category": "unknown",
"summary": {
"hand_activity_percentage": 0,
"gesture_frequency": 0,
"body_stability_score": 0
},
"details": {
"avg_hand_movement_speed": 0,
"hand_position_distribution": {"front": 0, "side": 0, "back": 0},
"nervous_gestures_detected": False
},
"recommendations": []
}
# Singleton instance
_processor_instance = None
def get_video_processor() -> VideoProcessor:
"""Get global video processor instance"""
global _processor_instance
if _processor_instance is None:
_processor_instance = VideoProcessor()
return _processor_instance