swara-api / app /services /eye_tracking.py
Zakha123-cyber
Optimize Eye Tracking: Fix over-sensitivity + accurate gaze away calculation
40fb868
"""
Eye Tracking Service
Refactored from eye_tracking_production.py for production use.
Production-ready eye tracking untuk website SWARA
"""
import cv2 as cv
import math
import numpy as np
import mediapipe as mp
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any
from loguru import logger
from app.config import settings
class EyeTrackingConfig:
"""Configuration class untuk eye tracking parameters"""
# MediaPipe landmarks indices
LEFT_EYE = [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398]
RIGHT_EYE = [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246]
# Eye size classification thresholds
SMALL_EYE_THRESHOLD = 600
MEDIUM_EYE_THRESHOLD = 1500
# Position boundaries (OPTIMIZED - More strict for accuracy)
# Expanded CENTER zone to reduce false positives for distant subjects
LEFT_BOUNDARY = 0.35 # Was 0.30 - More strict (less sensitive to LEFT)
RIGHT_BOUNDARY = 0.65 # Was 0.70 - More strict (less sensitive to RIGHT)
# Temporal smoothing zone (adjusted for new boundaries)
SMOOTHING_LEFT_MIN = 0.35
SMOOTHING_LEFT_MAX = 0.40
SMOOTHING_RIGHT_MIN = 0.60
SMOOTHING_RIGHT_MAX = 0.65
# Minimum duration for gaze away (in seconds)
# Only count as "gaze away" if looking away for >0.5 seconds continuously
MIN_GAZE_AWAY_DURATION = 0.5
# Blink ratio threshold
BLINK_THRESHOLD = 5.5
# Score thresholds (dalam detik)
SCORE_THRESHOLDS = {
5: (5, "Sangat Baik"),
4: (8, "Baik"),
3: (10, "Cukup Baik"),
2: (12, "Buruk"),
1: (float('inf'), "Perlu Ditingkatkan")
}
# Adaptive parameters by eye size
ADAPTIVE_PARAMS = {
'SMALL': {
'scale_factor': 3.0,
'interpolation': cv.INTER_LANCZOS4,
'clahe_clip': 4.0,
'clahe_grid': (4, 4),
'bilateral_d': 7,
'bilateral_sigma': 75,
'thresholds': [20, 25, 30, 35, 40, 45, 50, 55],
'min_area_ratio': 0.001,
'max_area_ratio': 0.50,
'min_circularity': 0.3,
'min_solidity': 0.5,
'morph_kernel': 5,
'morph_close_iter': 3,
'morph_open_iter': 2
},
'MEDIUM': {
'scale_factor': 2.0,
'interpolation': cv.INTER_CUBIC,
'clahe_clip': 3.0,
'clahe_grid': (8, 8),
'bilateral_d': 5,
'bilateral_sigma': 50,
'thresholds': [30, 35, 40, 45, 50, 55, 60],
'min_area_ratio': 0.005,
'max_area_ratio': 0.45,
'min_circularity': 0.4,
'min_solidity': 0.6,
'morph_kernel': 3,
'morph_close_iter': 2,
'morph_open_iter': 1
},
'LARGE': {
'scale_factor': 1.5,
'interpolation': cv.INTER_CUBIC,
'clahe_clip': 2.0,
'clahe_grid': (8, 8),
'bilateral_d': 3,
'bilateral_sigma': 30,
'thresholds': [35, 40, 45, 50, 55, 60, 65],
'min_area_ratio': 0.01,
'max_area_ratio': 0.40,
'min_circularity': 0.5,
'min_solidity': 0.7,
'morph_kernel': 3,
'morph_close_iter': 2,
'morph_open_iter': 1
}
}
class EyeTracker:
"""Main class untuk eye tracking"""
def __init__(self, config: EyeTrackingConfig = None):
self.config = config or EyeTrackingConfig()
# ============================================================
# FLEXIBLE GPU/CPU RUNTIME FOR MEDIAPIPE
# ============================================================
# Check if GPU is available and configure MediaPipe accordingly
import torch
use_gpu = torch.cuda.is_available()
if use_gpu:
# GPU detected - Use GPU delegate for acceleration
logger.info(f"✓ GPU detected for Eye Tracking: {torch.cuda.get_device_name(0)}")
logger.info("✓ Configuring MediaPipe Face Mesh with GPU delegate")
try:
# MediaPipe GPU delegate requires specific configuration
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
logger.info("✓ MediaPipe Face Mesh initialized with GPU acceleration")
except Exception as e:
logger.warning(f"⚠ GPU delegate failed, falling back to CPU: {e}")
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
else:
# CPU - Standard configuration
logger.info("ℹ No GPU detected, using CPU for Eye Tracking")
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.prev_position_right = None
self.prev_position_left = None
def __del__(self):
"""Cleanup resources"""
if hasattr(self, 'face_mesh') and self.face_mesh:
self.face_mesh.close()
@staticmethod
def euclidean_distance(point1: Tuple[int, int], point2: Tuple[int, int]) -> float:
"""Calculate Euclidean distance between two points"""
return math.sqrt((point2[0] - point1[0])**2 + (point2[1] - point1[1])**2)
def detect_landmarks(self, frame: np.ndarray) -> Optional[List[Tuple[int, int]]]:
"""Detect facial landmarks"""
try:
rgb_frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
results = self.face_mesh.process(rgb_frame)
if not results.multi_face_landmarks:
return None
img_height, img_width = frame.shape[:2]
mesh_coords = [
(int(point.x * img_width), int(point.y * img_height))
for point in results.multi_face_landmarks[0].landmark
]
return mesh_coords
except Exception as e:
logger.error(f"Error detecting landmarks: {e}")
return None
def calculate_blink_ratio(self, landmarks: List[Tuple[int, int]]) -> float:
"""Calculate blink ratio from eye landmarks"""
try:
# Right eye
rh_distance = self.euclidean_distance(
landmarks[self.config.RIGHT_EYE[0]],
landmarks[self.config.RIGHT_EYE[8]]
)
rv_distance = self.euclidean_distance(
landmarks[self.config.RIGHT_EYE[12]],
landmarks[self.config.RIGHT_EYE[4]]
)
# Left eye
lh_distance = self.euclidean_distance(
landmarks[self.config.LEFT_EYE[0]],
landmarks[self.config.LEFT_EYE[8]]
)
lv_distance = self.euclidean_distance(
landmarks[self.config.LEFT_EYE[12]],
landmarks[self.config.LEFT_EYE[4]]
)
if rv_distance == 0 or lv_distance == 0:
return 0
re_ratio = rh_distance / rv_distance
le_ratio = lh_distance / lv_distance
ratio = (re_ratio + le_ratio) / 2
return ratio
except Exception as e:
logger.error(f"Error calculating blink ratio: {e}")
return 0
def extract_eye_region(self, frame: np.ndarray, eye_coords: List[Tuple[int, int]]) -> Optional[np.ndarray]:
"""Extract and crop eye region from frame"""
try:
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
mask = np.zeros(gray.shape, dtype=np.uint8)
cv.fillPoly(mask, [np.array(eye_coords, dtype=np.int32)], 255)
eye = cv.bitwise_and(gray, gray, mask=mask)
eye[mask == 0] = 155
# Get bounding box
x_coords = [coord[0] for coord in eye_coords]
y_coords = [coord[1] for coord in eye_coords]
min_x, max_x = min(x_coords), max(x_coords)
min_y, max_y = min(y_coords), max(y_coords)
cropped = eye[min_y:max_y, min_x:max_x]
return cropped if cropped.size > 0 else None
except Exception as e:
logger.error(f"Error extracting eye region: {e}")
return None
def classify_eye_size(self, eye_region: np.ndarray) -> str:
"""Classify eye size (SMALL/MEDIUM/LARGE)"""
if eye_region is None or eye_region.size == 0:
return 'UNKNOWN'
h, w = eye_region.shape
area = h * w
if area < self.config.SMALL_EYE_THRESHOLD:
return 'SMALL'
elif area < self.config.MEDIUM_EYE_THRESHOLD:
return 'MEDIUM'
else:
return 'LARGE'
def adaptive_preprocessing(self, eye_region: np.ndarray, eye_size: str) -> Optional[np.ndarray]:
"""
Adaptive preprocessing: upscale + enhancement berdasarkan ukuran mata
"""
if eye_region is None or eye_region.size == 0:
return None
try:
params = self.config.ADAPTIVE_PARAMS[eye_size]
scale_factor = params['scale_factor']
# Adaptive upscaling based on eye size
if eye_size == 'SMALL':
interpolation = cv.INTER_LANCZOS4
else:
interpolation = cv.INTER_CUBIC
upscaled = cv.resize(
eye_region, None,
fx=scale_factor, fy=scale_factor,
interpolation=interpolation
)
# Adaptive enhancement based on eye size
if eye_size == 'SMALL':
# Aggressive enhancement for small eyes
clahe = cv.createCLAHE(clipLimit=4.0, tileGridSize=(4,4))
enhanced = clahe.apply(upscaled)
enhanced = cv.bilateralFilter(enhanced, 7, 75, 75)
# Unsharp mask untuk detail
gaussian = cv.GaussianBlur(enhanced, (3, 3), 2.0)
enhanced = cv.addWeighted(enhanced, 1.5, gaussian, -0.5, 0)
enhanced = np.clip(enhanced, 0, 255).astype(np.uint8)
elif eye_size == 'MEDIUM':
clahe = cv.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
enhanced = clahe.apply(upscaled)
enhanced = cv.bilateralFilter(enhanced, 5, 50, 50)
else: # LARGE
clahe = cv.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
enhanced = clahe.apply(upscaled)
enhanced = cv.bilateralFilter(enhanced, 3, 30, 30)
return enhanced
except Exception as e:
logger.error(f"Error in adaptive preprocessing: {e}")
return None
def aggressive_morphology(self, mask: np.ndarray, eye_size: str) -> np.ndarray:
"""
STAGE 1: Aggressive morphology untuk solid contour
Mengatasi masalah kontour terpecah-pecah
"""
params = self.config.ADAPTIVE_PARAMS[eye_size]
kernel = cv.getStructuringElement(
cv.MORPH_ELLIPSE,
(params['morph_kernel'], params['morph_kernel'])
)
# Close gaps - menggabungkan fragmen yang terpisah
mask = cv.morphologyEx(
mask, cv.MORPH_CLOSE, kernel,
iterations=params['morph_close_iter']
)
# Remove noise
mask = cv.morphologyEx(
mask, cv.MORPH_OPEN, kernel,
iterations=params['morph_open_iter']
)
# Fill holes untuk SMALL eyes
if eye_size == 'SMALL':
kernel_dilate = cv.getStructuringElement(cv.MORPH_ELLIPSE, (3, 3))
mask = cv.dilate(mask, kernel_dilate, iterations=1)
return mask
def connected_components_analysis(self, mask: np.ndarray, params: Dict) -> Optional[Dict]:
"""
STAGE 2: Connected Components Analysis untuk filtering blob yang lebih akurat
Mengatasi false positives dari noise
"""
h, w = mask.shape
min_area = (h * w) * params['min_area_ratio']
max_area = (h * w) * params['max_area_ratio']
# Connected components with stats
num_labels, labels, stats, centroids = cv.connectedComponentsWithStats(
mask, connectivity=8
)
candidates = []
for i in range(1, num_labels): # Skip background (label 0)
area = stats[i, cv.CC_STAT_AREA]
# Filter by area
if area < min_area or area > max_area:
continue
# Create component mask
component_mask = np.zeros_like(mask)
component_mask[labels == i] = 255
# Calculate properties
contours, _ = cv.findContours(
component_mask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE
)
if not contours:
continue
contour = contours[0]
# Circularity
perimeter = cv.arcLength(contour, True)
if perimeter == 0:
continue
circularity = 4 * np.pi * area / (perimeter ** 2)
if circularity < params['min_circularity']:
continue
# Solidity (area / convex hull area) - filter irregular shapes
hull = cv.convexHull(contour)
hull_area = cv.contourArea(hull)
if hull_area == 0:
continue
solidity = area / hull_area
if solidity < params['min_solidity']:
continue
# Distance from center (prefer pupil near center)
center_x = w / 2
cx = centroids[i][0]
distance_from_center = abs(cx - center_x) / w
center_score = 1.0 - distance_from_center
# Aspect ratio (prefer circular)
x, y, w_bbox, h_bbox = (stats[i, cv.CC_STAT_LEFT],
stats[i, cv.CC_STAT_TOP],
stats[i, cv.CC_STAT_WIDTH],
stats[i, cv.CC_STAT_HEIGHT])
if h_bbox == 0:
continue
aspect_ratio = w_bbox / h_bbox
aspect_score = 1.0 - abs(aspect_ratio - 1.0)
# Combined score (COLAB METHOD: multiplicative)
# Mengutamakan kandidat dengan SEMUA metrik bagus
score = area * circularity * solidity * center_score * aspect_score
candidates.append({
'mask': component_mask,
'contour': contour,
'centroid': centroids[i],
'area': area,
'circularity': circularity,
'solidity': solidity,
'center_score': center_score,
'aspect_ratio': aspect_ratio,
'score': score
})
if not candidates:
return None
return max(candidates, key=lambda x: x['score'])
def distance_transform_refinement(self, mask: np.ndarray) -> Tuple[int, int]:
"""
STAGE 3: Distance Transform untuk memperbaiki centroid
Memberikan posisi yang lebih akurat dibanding moment
"""
dist_transform = cv.distanceTransform(mask, cv.DIST_L2, 5)
_, _, _, max_loc = cv.minMaxLoc(dist_transform)
return max_loc
def detect_pupil(self, enhanced: np.ndarray, eye_size: str) -> Optional[Dict]:
"""
Detect pupil using multi-stage OPTIMIZED pipeline
OPTIMIZATIONS dari Colab:
1. Aggressive Morphology - solid contour, no fragments
2. Connected Components Analysis - better blob detection
3. Distance Transform - accurate centroid
4. Solidity Filter - reject irregular shapes
"""
params = self.config.ADAPTIVE_PARAMS[eye_size]
h, w = enhanced.shape
best_candidate = None
best_score = 0
best_threshold = 0
for thresh_val in params['thresholds']:
_, binary = cv.threshold(enhanced, thresh_val, 255, cv.THRESH_BINARY_INV)
# STAGE 1: Aggressive Morphology
binary = self.aggressive_morphology(binary, eye_size)
# STAGE 2: Connected Components Analysis
candidate = self.connected_components_analysis(binary, params)
if candidate and candidate['score'] > best_score:
best_candidate = candidate
best_score = candidate['score']
best_threshold = thresh_val
if not best_candidate:
return None
# STAGE 3: Distance transform refinement
dt_center = self.distance_transform_refinement(best_candidate['mask'])
best_candidate['dt_center'] = dt_center
best_candidate['threshold'] = best_threshold
return best_candidate
def determine_gaze_position(self, centroid_x: int, width: int, prev_position: Optional[str]) -> str:
"""Determine gaze position (LEFT/CENTER/RIGHT)"""
ratio = centroid_x / width
# Base position
if ratio < self.config.LEFT_BOUNDARY:
position = "LEFT"
elif ratio > self.config.RIGHT_BOUNDARY:
position = "RIGHT"
else:
position = "CENTER"
# Temporal smoothing
if prev_position and prev_position != "UNKNOWN":
if position == "LEFT" and self.config.SMOOTHING_LEFT_MIN < ratio < self.config.SMOOTHING_LEFT_MAX:
position = prev_position
elif position == "RIGHT" and self.config.SMOOTHING_RIGHT_MIN < ratio < self.config.SMOOTHING_RIGHT_MAX:
position = prev_position
elif position == "CENTER" and prev_position != "CENTER":
if ratio < self.config.SMOOTHING_LEFT_MAX or ratio > self.config.SMOOTHING_RIGHT_MIN:
position = prev_position
return position
def estimate_eye_position(self, eye_region: np.ndarray, prev_position: Optional[str] = None) -> Tuple[str, Dict]:
"""
Estimate eye gaze position using OPTIMIZED METHOD
Priority untuk centroid: Distance Transform > Ellipse > Connected Components
"""
if eye_region is None or eye_region.size == 0:
return "UNKNOWN", {}
h, w = eye_region.shape
if h < 5 or w < 10:
return "UNKNOWN", {}
try:
eye_size = self.classify_eye_size(eye_region)
enhanced = self.adaptive_preprocessing(eye_region, eye_size)
if enhanced is None:
return "UNKNOWN", {}
pupil_data = self.detect_pupil(enhanced, eye_size)
if not pupil_data:
return "UNKNOWN", {}
# OPTIMIZED: Use Distance Transform center (most accurate)
scale_factor = self.config.ADAPTIVE_PARAMS[eye_size]['scale_factor']
cx_dt, cy_dt = pupil_data['dt_center']
# Scale back to original size
centroid_x = int(cx_dt / scale_factor)
# Determine position
position = self.determine_gaze_position(centroid_x, w, prev_position)
return position, {
'eye_size': eye_size,
'centroid': (centroid_x, int(cy_dt / scale_factor)),
'circularity': pupil_data['circularity'],
'solidity': pupil_data['solidity'],
'dt_center': pupil_data['dt_center'],
'threshold': pupil_data['threshold']
}
except Exception as e:
logger.error(f"Error estimating eye position: {e}")
return "UNKNOWN", {}
def process_frame(self, frame: np.ndarray) -> Dict:
"""Process single frame and return analysis"""
result = {
'face_detected': False,
'blink_detected': False,
'blink_ratio': 0.0,
'right_eye': {'position': 'UNKNOWN', 'data': {}},
'left_eye': {'position': 'UNKNOWN', 'data': {}},
'gaze_position': 'UNKNOWN'
}
try:
landmarks = self.detect_landmarks(frame)
if landmarks is None:
return result
result['face_detected'] = True
# Blink detection
blink_ratio = self.calculate_blink_ratio(landmarks)
result['blink_ratio'] = round(blink_ratio, 2)
result['blink_detected'] = bool(blink_ratio > self.config.BLINK_THRESHOLD)
if not result['blink_detected']:
# Right eye
right_eye_coords = [landmarks[i] for i in self.config.RIGHT_EYE]
right_eye_region = self.extract_eye_region(frame, right_eye_coords)
if right_eye_region is not None:
right_position, right_data = self.estimate_eye_position(
right_eye_region, self.prev_position_right
)
result['right_eye'] = {'position': right_position, 'data': right_data}
self.prev_position_right = right_position
# Left eye
left_eye_coords = [landmarks[i] for i in self.config.LEFT_EYE]
left_eye_region = self.extract_eye_region(frame, left_eye_coords)
if left_eye_region is not None:
left_position, left_data = self.estimate_eye_position(
left_eye_region, self.prev_position_left
)
result['left_eye'] = {'position': left_position, 'data': left_data}
self.prev_position_left = left_position
# Determine overall gaze
if result['right_eye']['position'] == result['left_eye']['position']:
result['gaze_position'] = result['right_eye']['position']
elif result['right_eye']['position'] == 'UNKNOWN':
result['gaze_position'] = result['left_eye']['position']
elif result['left_eye']['position'] == 'UNKNOWN':
result['gaze_position'] = result['right_eye']['position']
else:
result['gaze_position'] = result['right_eye']['position']
except Exception as e:
logger.error(f"Error processing frame: {e}")
return result
class EyeTrackingService:
"""
Eye Tracking Service for SWARA API
Analyzes eye contact and gaze patterns in videos
"""
# Class variable for singleton pattern
_tracker = None
def __init__(self):
"""Initialize service"""
if EyeTrackingService._tracker is None:
logger.info("Initializing Eye Tracking Service...")
EyeTrackingService._tracker = EyeTracker()
logger.info("✓ Eye Tracking Service initialized")
def calculate_score(self, gaze_away_time: float) -> Tuple[int, str]:
"""Calculate score based on gaze away time"""
config = EyeTrackingConfig()
for score, (threshold, rating) in sorted(
config.SCORE_THRESHOLDS.items(), reverse=True
):
if gaze_away_time <= threshold:
return score, rating
return 1, "Perlu Ditingkatkan"
def _annotate_frame(
self,
frame: np.ndarray,
result: Dict,
frame_number: int,
total_blinks: int,
gaze_position: str
) -> np.ndarray:
"""
Annotate frame with eye tracking information
Args:
frame: Original frame
result: Analysis result from process_frame
frame_number: Current frame number
total_blinks: Total blinks detected so far
gaze_position: Current gaze position
Returns:
Annotated frame
"""
annotated = frame.copy()
# Define colors
COLOR_GREEN = (0, 255, 0)
COLOR_RED = (0, 0, 255)
COLOR_YELLOW = (0, 255, 255)
COLOR_BLUE = (255, 0, 0)
COLOR_WHITE = (255, 255, 255)
# Semi-transparent overlay for info box
overlay = annotated.copy()
cv.rectangle(overlay, (10, 10), (400, 180), (0, 0, 0), -1)
cv.addWeighted(overlay, 0.6, annotated, 0.4, 0, annotated)
# Frame info
cv.putText(annotated, f"Frame: {frame_number}", (20, 35),
cv.FONT_HERSHEY_SIMPLEX, 0.6, COLOR_WHITE, 2)
# Face detection status
face_status = "DETECTED" if result['face_detected'] else "NOT DETECTED"
face_color = COLOR_GREEN if result['face_detected'] else COLOR_RED
cv.putText(annotated, f"Face: {face_status}", (20, 60),
cv.FONT_HERSHEY_SIMPLEX, 0.6, face_color, 2)
# Blink info
blink_status = "BLINKING" if result['blink_detected'] else "OPEN"
blink_color = COLOR_YELLOW if result['blink_detected'] else COLOR_GREEN
cv.putText(annotated, f"Eyes: {blink_status} | Ratio: {result['blink_ratio']:.2f}",
(20, 85), cv.FONT_HERSHEY_SIMPLEX, 0.6, blink_color, 2)
cv.putText(annotated, f"Total Blinks: {total_blinks}", (20, 110),
cv.FONT_HERSHEY_SIMPLEX, 0.6, COLOR_WHITE, 2)
# Gaze position
if gaze_position == 'CENTER':
gaze_color = COLOR_GREEN
elif gaze_position in ['LEFT', 'RIGHT']:
gaze_color = COLOR_YELLOW
else:
gaze_color = COLOR_RED
cv.putText(annotated, f"Gaze: {gaze_position}", (20, 135),
cv.FONT_HERSHEY_SIMPLEX, 0.7, gaze_color, 2)
# Eye positions
if result['face_detected'] and not result['blink_detected']:
left_pos = result['left_eye']['position']
right_pos = result['right_eye']['position']
cv.putText(annotated, f"L:{left_pos} | R:{right_pos}", (20, 160),
cv.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_BLUE, 2)
# Gaze indicator (big display)
h, w = annotated.shape[:2]
indicator_y = h - 60
# Draw gaze direction indicator
if gaze_position == 'CENTER':
cv.circle(annotated, (w // 2, indicator_y), 30, COLOR_GREEN, -1)
cv.putText(annotated, "CENTER", (w // 2 - 50, indicator_y + 10),
cv.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
elif gaze_position == 'LEFT':
cv.arrowedLine(annotated, (w // 2, indicator_y), (w // 2 - 80, indicator_y),
COLOR_YELLOW, 5, tipLength=0.3)
cv.putText(annotated, "LEFT", (w // 2 - 150, indicator_y + 10),
cv.FONT_HERSHEY_SIMPLEX, 0.8, COLOR_YELLOW, 2)
elif gaze_position == 'RIGHT':
cv.arrowedLine(annotated, (w // 2, indicator_y), (w // 2 + 80, indicator_y),
COLOR_YELLOW, 5, tipLength=0.3)
cv.putText(annotated, "RIGHT", (w // 2 + 50, indicator_y + 10),
cv.FONT_HERSHEY_SIMPLEX, 0.8, COLOR_YELLOW, 2)
else:
cv.putText(annotated, "UNKNOWN", (w // 2 - 60, indicator_y + 10),
cv.FONT_HERSHEY_SIMPLEX, 0.8, COLOR_RED, 2)
return annotated
def analyze_video(
self,
video_path: str,
progress_callback: Optional[callable] = None
) -> Dict[str, Any]:
"""
Analyze video for eye contact
Args:
video_path: Path to video file
progress_callback: Optional callback for progress updates
Returns:
Dict containing eye tracking analysis results
"""
try:
logger.info(f"Analyzing video with Eye Tracking Service: {video_path}")
cap = cv.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
# Video properties
fps = int(cap.get(cv.CAP_PROP_FPS)) or 30
width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv.CAP_PROP_FRAME_COUNT))
logger.info(f"Video properties: {width}x{height} @ {fps}FPS, {total_frames} frames")
# Initialize counters
frame_count = 0
blink_count = 0
position_counts = {'CENTER': 0, 'LEFT': 0, 'RIGHT': 0, 'UNKNOWN': 0}
prev_blink = False
# ============================================================
# IMPROVED GAZE AWAY TRACKING
# ============================================================
# Track continuous gaze away periods (minimum duration filter)
config = EyeTrackingConfig()
min_frames_away = int(fps * config.MIN_GAZE_AWAY_DURATION) # 0.5 seconds
current_gaze_away_start = None # Track start of gaze away period
total_gaze_away_time = 0.0 # Accumulated gaze away time in seconds
gaze_away_periods = [] # List of (start_frame, end_frame, duration) tuples
# Debug counters
debug_stats = {
'face_detected_frames': 0,
'pupil_detected_frames': 0,
'center_gaze_frames': 0,
'left_gaze_frames': 0,
'right_gaze_frames': 0,
'unknown_frames': 0,
'raw_gaze_away_frames': 0, # All LEFT/RIGHT frames
'filtered_gaze_away_frames': 0 # Only continuous periods >0.5s
}
logger.info("Starting frame processing...")
logger.info(f"Minimum gaze away duration: {config.MIN_GAZE_AWAY_DURATION}s ({min_frames_away} frames)")
# Process frames
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# Progress callback
if progress_callback and frame_count % 30 == 0:
progress = int((frame_count / total_frames) * 100)
progress_callback(frame_count, total_frames, f"Eye tracking: {progress}%")
# Process frame
result = self._tracker.process_frame(frame)
# Debug stats
if result['face_detected']:
debug_stats['face_detected_frames'] += 1
# Count blinks
if result['blink_detected'] and not prev_blink:
blink_count += 1
logger.debug(f"Frame {frame_count}: Blink detected (total: {blink_count})")
prev_blink = result['blink_detected']
# Track gaze position
gaze_pos = result['gaze_position']
position_counts[gaze_pos] = position_counts.get(gaze_pos, 0) + 1
# Update debug stats
if gaze_pos == 'CENTER':
debug_stats['center_gaze_frames'] += 1
elif gaze_pos == 'LEFT':
debug_stats['left_gaze_frames'] += 1
elif gaze_pos == 'RIGHT':
debug_stats['right_gaze_frames'] += 1
else:
debug_stats['unknown_frames'] += 1
# ============================================================
# IMPROVED GAZE AWAY CALCULATION (Minimum Duration Filter)
# ============================================================
# Track continuous gaze away periods, not individual frames
is_looking_away = (gaze_pos == 'LEFT' or gaze_pos == 'RIGHT')
if is_looking_away:
debug_stats['raw_gaze_away_frames'] += 1
# Start new gaze away period
if current_gaze_away_start is None:
current_gaze_away_start = frame_count
else: # Looking at CENTER or UNKNOWN
# End of gaze away period
if current_gaze_away_start is not None:
period_frames = frame_count - current_gaze_away_start
period_duration = period_frames / fps
# Only count if duration >= minimum threshold
if period_frames >= min_frames_away:
total_gaze_away_time += period_duration
debug_stats['filtered_gaze_away_frames'] += period_frames
gaze_away_periods.append({
'start_frame': current_gaze_away_start,
'end_frame': frame_count - 1,
'duration': period_duration,
'frames': period_frames
})
logger.debug(f"Gaze away period detected: frames {current_gaze_away_start}-{frame_count-1} "
f"({period_duration:.2f}s)")
else:
# Too short - likely false positive/noise
logger.debug(f"Gaze away period too short (ignored): {period_frames} frames "
f"({period_duration:.2f}s < {config.MIN_GAZE_AWAY_DURATION}s)")
current_gaze_away_start = None
# Log every 100 frames
if frame_count % 100 == 0:
logger.info(f"Processed {frame_count}/{total_frames} frames | "
f"Gaze: C:{debug_stats['center_gaze_frames']} "
f"L:{debug_stats['left_gaze_frames']} "
f"R:{debug_stats['right_gaze_frames']} | "
f"Blinks: {blink_count}")
cap.release()
# ============================================================
# FINALIZE GAZE AWAY CALCULATION
# ============================================================
# Handle last period if video ends while looking away
if current_gaze_away_start is not None:
period_frames = frame_count - current_gaze_away_start
period_duration = period_frames / fps
if period_frames >= min_frames_away:
total_gaze_away_time += period_duration
debug_stats['filtered_gaze_away_frames'] += period_frames
gaze_away_periods.append({
'start_frame': current_gaze_away_start,
'end_frame': frame_count,
'duration': period_duration,
'frames': period_frames
})
logger.debug(f"Final gaze away period: frames {current_gaze_away_start}-{frame_count} "
f"({period_duration:.2f}s)")
# Calculate metrics
duration = frame_count / fps
gaze_away_time = total_gaze_away_time # Use filtered time, not raw frames
score, rating = self.calculate_score(gaze_away_time)
# Log summary statistics
logger.info("="*60)
logger.info("EYE TRACKING ANALYSIS SUMMARY")
logger.info("="*60)
logger.info(f"Total Frames Processed: {frame_count}")
logger.info(f"Face Detection Rate: {debug_stats['face_detected_frames']}/{frame_count} "
f"({debug_stats['face_detected_frames']/frame_count*100:.1f}%)")
logger.info(f"\nGaze Distribution:")
logger.info(f" CENTER: {debug_stats['center_gaze_frames']} frames "
f"({debug_stats['center_gaze_frames']/frame_count*100:.1f}%)")
logger.info(f" LEFT: {debug_stats['left_gaze_frames']} frames "
f"({debug_stats['left_gaze_frames']/frame_count*100:.1f}%)")
logger.info(f" RIGHT: {debug_stats['right_gaze_frames']} frames "
f"({debug_stats['right_gaze_frames']/frame_count*100:.1f}%)")
logger.info(f" UNKNOWN: {debug_stats['unknown_frames']} frames "
f"({debug_stats['unknown_frames']/frame_count*100:.1f}%)")
logger.info(f"\n📊 Gaze Away Analysis:")
logger.info(f" Raw LEFT/RIGHT frames: {debug_stats['raw_gaze_away_frames']} frames "
f"({debug_stats['raw_gaze_away_frames']/frame_count*100:.1f}%)")
logger.info(f" Filtered gaze away frames: {debug_stats['filtered_gaze_away_frames']} frames "
f"({debug_stats['filtered_gaze_away_frames']/frame_count*100:.1f}%)")
logger.info(f" Continuous gaze away periods: {len(gaze_away_periods)} periods")
logger.info(f" Total gaze away time: {gaze_away_time:.2f}s / {duration:.2f}s "
f"({gaze_away_time/duration*100:.1f}%)")
logger.info(f" Minimum duration threshold: {config.MIN_GAZE_AWAY_DURATION}s")
logger.info(f"Total Blinks: {blink_count} ({blink_count/duration*60:.1f} blinks/minute)")
logger.info(f"\nFinal Score: {score}/5 - {rating}")
logger.info("="*60)
# Build result
result = {
'success': True,
'video_info': {
'duration': round(duration, 2),
'fps': fps,
'total_frames': frame_count,
'resolution': f"{width}x{height}"
},
'eye_contact_analysis': {
'total_gaze_away_time': round(gaze_away_time, 2),
'gaze_away_percentage': round((gaze_away_time / duration) * 100, 2) if duration > 0 else 0,
'score': score,
'rating': rating,
'position_distribution': {
k: {
'frames': v,
'percentage': round((v / frame_count) * 100, 2) if frame_count > 0 else 0
}
for k, v in position_counts.items()
}
},
'blink_analysis': {
'total_blinks': blink_count,
'blinks_per_minute': round((blink_count / duration) * 60, 2) if duration > 0 else 0
},
'debug_stats': debug_stats,
'gaze_away_periods': gaze_away_periods # Include detailed periods for debugging
}
logger.info(f"✓ Eye Tracking analysis completed: Score {score}/5 - {rating}")
return result
except Exception as e:
logger.error(f"✗ Eye Tracking analysis failed: {e}")
raise