|
|
""" |
|
|
Eye Tracking Service |
|
|
|
|
|
Refactored from eye_tracking_production.py for production use. |
|
|
Production-ready eye tracking untuk website SWARA |
|
|
""" |
|
|
import cv2 as cv |
|
|
import math |
|
|
import numpy as np |
|
|
import mediapipe as mp |
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Tuple, Optional, Any |
|
|
from loguru import logger |
|
|
|
|
|
from app.config import settings |
|
|
|
|
|
|
|
|
class EyeTrackingConfig: |
|
|
"""Configuration class untuk eye tracking parameters""" |
|
|
|
|
|
|
|
|
LEFT_EYE = [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398] |
|
|
RIGHT_EYE = [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246] |
|
|
|
|
|
|
|
|
SMALL_EYE_THRESHOLD = 600 |
|
|
MEDIUM_EYE_THRESHOLD = 1500 |
|
|
|
|
|
|
|
|
|
|
|
LEFT_BOUNDARY = 0.35 |
|
|
RIGHT_BOUNDARY = 0.65 |
|
|
|
|
|
|
|
|
SMOOTHING_LEFT_MIN = 0.35 |
|
|
SMOOTHING_LEFT_MAX = 0.40 |
|
|
SMOOTHING_RIGHT_MIN = 0.60 |
|
|
SMOOTHING_RIGHT_MAX = 0.65 |
|
|
|
|
|
|
|
|
|
|
|
MIN_GAZE_AWAY_DURATION = 0.5 |
|
|
|
|
|
|
|
|
BLINK_THRESHOLD = 5.5 |
|
|
|
|
|
|
|
|
SCORE_THRESHOLDS = { |
|
|
5: (5, "Sangat Baik"), |
|
|
4: (8, "Baik"), |
|
|
3: (10, "Cukup Baik"), |
|
|
2: (12, "Buruk"), |
|
|
1: (float('inf'), "Perlu Ditingkatkan") |
|
|
} |
|
|
|
|
|
|
|
|
ADAPTIVE_PARAMS = { |
|
|
'SMALL': { |
|
|
'scale_factor': 3.0, |
|
|
'interpolation': cv.INTER_LANCZOS4, |
|
|
'clahe_clip': 4.0, |
|
|
'clahe_grid': (4, 4), |
|
|
'bilateral_d': 7, |
|
|
'bilateral_sigma': 75, |
|
|
'thresholds': [20, 25, 30, 35, 40, 45, 50, 55], |
|
|
'min_area_ratio': 0.001, |
|
|
'max_area_ratio': 0.50, |
|
|
'min_circularity': 0.3, |
|
|
'min_solidity': 0.5, |
|
|
'morph_kernel': 5, |
|
|
'morph_close_iter': 3, |
|
|
'morph_open_iter': 2 |
|
|
}, |
|
|
'MEDIUM': { |
|
|
'scale_factor': 2.0, |
|
|
'interpolation': cv.INTER_CUBIC, |
|
|
'clahe_clip': 3.0, |
|
|
'clahe_grid': (8, 8), |
|
|
'bilateral_d': 5, |
|
|
'bilateral_sigma': 50, |
|
|
'thresholds': [30, 35, 40, 45, 50, 55, 60], |
|
|
'min_area_ratio': 0.005, |
|
|
'max_area_ratio': 0.45, |
|
|
'min_circularity': 0.4, |
|
|
'min_solidity': 0.6, |
|
|
'morph_kernel': 3, |
|
|
'morph_close_iter': 2, |
|
|
'morph_open_iter': 1 |
|
|
}, |
|
|
'LARGE': { |
|
|
'scale_factor': 1.5, |
|
|
'interpolation': cv.INTER_CUBIC, |
|
|
'clahe_clip': 2.0, |
|
|
'clahe_grid': (8, 8), |
|
|
'bilateral_d': 3, |
|
|
'bilateral_sigma': 30, |
|
|
'thresholds': [35, 40, 45, 50, 55, 60, 65], |
|
|
'min_area_ratio': 0.01, |
|
|
'max_area_ratio': 0.40, |
|
|
'min_circularity': 0.5, |
|
|
'min_solidity': 0.7, |
|
|
'morph_kernel': 3, |
|
|
'morph_close_iter': 2, |
|
|
'morph_open_iter': 1 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class EyeTracker: |
|
|
"""Main class untuk eye tracking""" |
|
|
|
|
|
def __init__(self, config: EyeTrackingConfig = None): |
|
|
self.config = config or EyeTrackingConfig() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
|
use_gpu = torch.cuda.is_available() |
|
|
|
|
|
if use_gpu: |
|
|
|
|
|
logger.info(f"✓ GPU detected for Eye Tracking: {torch.cuda.get_device_name(0)}") |
|
|
logger.info("✓ Configuring MediaPipe Face Mesh with GPU delegate") |
|
|
try: |
|
|
|
|
|
self.face_mesh = mp.solutions.face_mesh.FaceMesh( |
|
|
min_detection_confidence=0.5, |
|
|
min_tracking_confidence=0.5 |
|
|
) |
|
|
logger.info("✓ MediaPipe Face Mesh initialized with GPU acceleration") |
|
|
except Exception as e: |
|
|
logger.warning(f"⚠ GPU delegate failed, falling back to CPU: {e}") |
|
|
self.face_mesh = mp.solutions.face_mesh.FaceMesh( |
|
|
min_detection_confidence=0.5, |
|
|
min_tracking_confidence=0.5 |
|
|
) |
|
|
else: |
|
|
|
|
|
logger.info("ℹ No GPU detected, using CPU for Eye Tracking") |
|
|
self.face_mesh = mp.solutions.face_mesh.FaceMesh( |
|
|
min_detection_confidence=0.5, |
|
|
min_tracking_confidence=0.5 |
|
|
) |
|
|
|
|
|
self.prev_position_right = None |
|
|
self.prev_position_left = None |
|
|
|
|
|
def __del__(self): |
|
|
"""Cleanup resources""" |
|
|
if hasattr(self, 'face_mesh') and self.face_mesh: |
|
|
self.face_mesh.close() |
|
|
|
|
|
@staticmethod |
|
|
def euclidean_distance(point1: Tuple[int, int], point2: Tuple[int, int]) -> float: |
|
|
"""Calculate Euclidean distance between two points""" |
|
|
return math.sqrt((point2[0] - point1[0])**2 + (point2[1] - point1[1])**2) |
|
|
|
|
|
def detect_landmarks(self, frame: np.ndarray) -> Optional[List[Tuple[int, int]]]: |
|
|
"""Detect facial landmarks""" |
|
|
try: |
|
|
rgb_frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB) |
|
|
results = self.face_mesh.process(rgb_frame) |
|
|
|
|
|
if not results.multi_face_landmarks: |
|
|
return None |
|
|
|
|
|
img_height, img_width = frame.shape[:2] |
|
|
mesh_coords = [ |
|
|
(int(point.x * img_width), int(point.y * img_height)) |
|
|
for point in results.multi_face_landmarks[0].landmark |
|
|
] |
|
|
return mesh_coords |
|
|
except Exception as e: |
|
|
logger.error(f"Error detecting landmarks: {e}") |
|
|
return None |
|
|
|
|
|
def calculate_blink_ratio(self, landmarks: List[Tuple[int, int]]) -> float: |
|
|
"""Calculate blink ratio from eye landmarks""" |
|
|
try: |
|
|
|
|
|
rh_distance = self.euclidean_distance( |
|
|
landmarks[self.config.RIGHT_EYE[0]], |
|
|
landmarks[self.config.RIGHT_EYE[8]] |
|
|
) |
|
|
rv_distance = self.euclidean_distance( |
|
|
landmarks[self.config.RIGHT_EYE[12]], |
|
|
landmarks[self.config.RIGHT_EYE[4]] |
|
|
) |
|
|
|
|
|
|
|
|
lh_distance = self.euclidean_distance( |
|
|
landmarks[self.config.LEFT_EYE[0]], |
|
|
landmarks[self.config.LEFT_EYE[8]] |
|
|
) |
|
|
lv_distance = self.euclidean_distance( |
|
|
landmarks[self.config.LEFT_EYE[12]], |
|
|
landmarks[self.config.LEFT_EYE[4]] |
|
|
) |
|
|
|
|
|
if rv_distance == 0 or lv_distance == 0: |
|
|
return 0 |
|
|
|
|
|
re_ratio = rh_distance / rv_distance |
|
|
le_ratio = lh_distance / lv_distance |
|
|
ratio = (re_ratio + le_ratio) / 2 |
|
|
|
|
|
return ratio |
|
|
except Exception as e: |
|
|
logger.error(f"Error calculating blink ratio: {e}") |
|
|
return 0 |
|
|
|
|
|
def extract_eye_region(self, frame: np.ndarray, eye_coords: List[Tuple[int, int]]) -> Optional[np.ndarray]: |
|
|
"""Extract and crop eye region from frame""" |
|
|
try: |
|
|
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) |
|
|
mask = np.zeros(gray.shape, dtype=np.uint8) |
|
|
|
|
|
cv.fillPoly(mask, [np.array(eye_coords, dtype=np.int32)], 255) |
|
|
eye = cv.bitwise_and(gray, gray, mask=mask) |
|
|
eye[mask == 0] = 155 |
|
|
|
|
|
|
|
|
x_coords = [coord[0] for coord in eye_coords] |
|
|
y_coords = [coord[1] for coord in eye_coords] |
|
|
|
|
|
min_x, max_x = min(x_coords), max(x_coords) |
|
|
min_y, max_y = min(y_coords), max(y_coords) |
|
|
|
|
|
cropped = eye[min_y:max_y, min_x:max_x] |
|
|
return cropped if cropped.size > 0 else None |
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting eye region: {e}") |
|
|
return None |
|
|
|
|
|
def classify_eye_size(self, eye_region: np.ndarray) -> str: |
|
|
"""Classify eye size (SMALL/MEDIUM/LARGE)""" |
|
|
if eye_region is None or eye_region.size == 0: |
|
|
return 'UNKNOWN' |
|
|
|
|
|
h, w = eye_region.shape |
|
|
area = h * w |
|
|
|
|
|
if area < self.config.SMALL_EYE_THRESHOLD: |
|
|
return 'SMALL' |
|
|
elif area < self.config.MEDIUM_EYE_THRESHOLD: |
|
|
return 'MEDIUM' |
|
|
else: |
|
|
return 'LARGE' |
|
|
|
|
|
def adaptive_preprocessing(self, eye_region: np.ndarray, eye_size: str) -> Optional[np.ndarray]: |
|
|
""" |
|
|
Adaptive preprocessing: upscale + enhancement berdasarkan ukuran mata |
|
|
""" |
|
|
if eye_region is None or eye_region.size == 0: |
|
|
return None |
|
|
|
|
|
try: |
|
|
params = self.config.ADAPTIVE_PARAMS[eye_size] |
|
|
scale_factor = params['scale_factor'] |
|
|
|
|
|
|
|
|
if eye_size == 'SMALL': |
|
|
interpolation = cv.INTER_LANCZOS4 |
|
|
else: |
|
|
interpolation = cv.INTER_CUBIC |
|
|
|
|
|
upscaled = cv.resize( |
|
|
eye_region, None, |
|
|
fx=scale_factor, fy=scale_factor, |
|
|
interpolation=interpolation |
|
|
) |
|
|
|
|
|
|
|
|
if eye_size == 'SMALL': |
|
|
|
|
|
clahe = cv.createCLAHE(clipLimit=4.0, tileGridSize=(4,4)) |
|
|
enhanced = clahe.apply(upscaled) |
|
|
enhanced = cv.bilateralFilter(enhanced, 7, 75, 75) |
|
|
|
|
|
|
|
|
gaussian = cv.GaussianBlur(enhanced, (3, 3), 2.0) |
|
|
enhanced = cv.addWeighted(enhanced, 1.5, gaussian, -0.5, 0) |
|
|
enhanced = np.clip(enhanced, 0, 255).astype(np.uint8) |
|
|
|
|
|
elif eye_size == 'MEDIUM': |
|
|
clahe = cv.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) |
|
|
enhanced = clahe.apply(upscaled) |
|
|
enhanced = cv.bilateralFilter(enhanced, 5, 50, 50) |
|
|
|
|
|
else: |
|
|
clahe = cv.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) |
|
|
enhanced = clahe.apply(upscaled) |
|
|
enhanced = cv.bilateralFilter(enhanced, 3, 30, 30) |
|
|
|
|
|
return enhanced |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in adaptive preprocessing: {e}") |
|
|
return None |
|
|
|
|
|
def aggressive_morphology(self, mask: np.ndarray, eye_size: str) -> np.ndarray: |
|
|
""" |
|
|
STAGE 1: Aggressive morphology untuk solid contour |
|
|
Mengatasi masalah kontour terpecah-pecah |
|
|
""" |
|
|
params = self.config.ADAPTIVE_PARAMS[eye_size] |
|
|
kernel = cv.getStructuringElement( |
|
|
cv.MORPH_ELLIPSE, |
|
|
(params['morph_kernel'], params['morph_kernel']) |
|
|
) |
|
|
|
|
|
|
|
|
mask = cv.morphologyEx( |
|
|
mask, cv.MORPH_CLOSE, kernel, |
|
|
iterations=params['morph_close_iter'] |
|
|
) |
|
|
|
|
|
|
|
|
mask = cv.morphologyEx( |
|
|
mask, cv.MORPH_OPEN, kernel, |
|
|
iterations=params['morph_open_iter'] |
|
|
) |
|
|
|
|
|
|
|
|
if eye_size == 'SMALL': |
|
|
kernel_dilate = cv.getStructuringElement(cv.MORPH_ELLIPSE, (3, 3)) |
|
|
mask = cv.dilate(mask, kernel_dilate, iterations=1) |
|
|
|
|
|
return mask |
|
|
|
|
|
def connected_components_analysis(self, mask: np.ndarray, params: Dict) -> Optional[Dict]: |
|
|
""" |
|
|
STAGE 2: Connected Components Analysis untuk filtering blob yang lebih akurat |
|
|
Mengatasi false positives dari noise |
|
|
""" |
|
|
h, w = mask.shape |
|
|
min_area = (h * w) * params['min_area_ratio'] |
|
|
max_area = (h * w) * params['max_area_ratio'] |
|
|
|
|
|
|
|
|
num_labels, labels, stats, centroids = cv.connectedComponentsWithStats( |
|
|
mask, connectivity=8 |
|
|
) |
|
|
|
|
|
candidates = [] |
|
|
|
|
|
for i in range(1, num_labels): |
|
|
area = stats[i, cv.CC_STAT_AREA] |
|
|
|
|
|
|
|
|
if area < min_area or area > max_area: |
|
|
continue |
|
|
|
|
|
|
|
|
component_mask = np.zeros_like(mask) |
|
|
component_mask[labels == i] = 255 |
|
|
|
|
|
|
|
|
contours, _ = cv.findContours( |
|
|
component_mask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE |
|
|
) |
|
|
|
|
|
if not contours: |
|
|
continue |
|
|
|
|
|
contour = contours[0] |
|
|
|
|
|
|
|
|
perimeter = cv.arcLength(contour, True) |
|
|
if perimeter == 0: |
|
|
continue |
|
|
circularity = 4 * np.pi * area / (perimeter ** 2) |
|
|
|
|
|
if circularity < params['min_circularity']: |
|
|
continue |
|
|
|
|
|
|
|
|
hull = cv.convexHull(contour) |
|
|
hull_area = cv.contourArea(hull) |
|
|
if hull_area == 0: |
|
|
continue |
|
|
solidity = area / hull_area |
|
|
|
|
|
if solidity < params['min_solidity']: |
|
|
continue |
|
|
|
|
|
|
|
|
center_x = w / 2 |
|
|
cx = centroids[i][0] |
|
|
distance_from_center = abs(cx - center_x) / w |
|
|
center_score = 1.0 - distance_from_center |
|
|
|
|
|
|
|
|
x, y, w_bbox, h_bbox = (stats[i, cv.CC_STAT_LEFT], |
|
|
stats[i, cv.CC_STAT_TOP], |
|
|
stats[i, cv.CC_STAT_WIDTH], |
|
|
stats[i, cv.CC_STAT_HEIGHT]) |
|
|
if h_bbox == 0: |
|
|
continue |
|
|
aspect_ratio = w_bbox / h_bbox |
|
|
aspect_score = 1.0 - abs(aspect_ratio - 1.0) |
|
|
|
|
|
|
|
|
|
|
|
score = area * circularity * solidity * center_score * aspect_score |
|
|
|
|
|
candidates.append({ |
|
|
'mask': component_mask, |
|
|
'contour': contour, |
|
|
'centroid': centroids[i], |
|
|
'area': area, |
|
|
'circularity': circularity, |
|
|
'solidity': solidity, |
|
|
'center_score': center_score, |
|
|
'aspect_ratio': aspect_ratio, |
|
|
'score': score |
|
|
}) |
|
|
|
|
|
if not candidates: |
|
|
return None |
|
|
|
|
|
return max(candidates, key=lambda x: x['score']) |
|
|
|
|
|
def distance_transform_refinement(self, mask: np.ndarray) -> Tuple[int, int]: |
|
|
""" |
|
|
STAGE 3: Distance Transform untuk memperbaiki centroid |
|
|
Memberikan posisi yang lebih akurat dibanding moment |
|
|
""" |
|
|
dist_transform = cv.distanceTransform(mask, cv.DIST_L2, 5) |
|
|
_, _, _, max_loc = cv.minMaxLoc(dist_transform) |
|
|
return max_loc |
|
|
|
|
|
def detect_pupil(self, enhanced: np.ndarray, eye_size: str) -> Optional[Dict]: |
|
|
""" |
|
|
Detect pupil using multi-stage OPTIMIZED pipeline |
|
|
|
|
|
OPTIMIZATIONS dari Colab: |
|
|
1. Aggressive Morphology - solid contour, no fragments |
|
|
2. Connected Components Analysis - better blob detection |
|
|
3. Distance Transform - accurate centroid |
|
|
4. Solidity Filter - reject irregular shapes |
|
|
""" |
|
|
params = self.config.ADAPTIVE_PARAMS[eye_size] |
|
|
h, w = enhanced.shape |
|
|
|
|
|
best_candidate = None |
|
|
best_score = 0 |
|
|
best_threshold = 0 |
|
|
|
|
|
for thresh_val in params['thresholds']: |
|
|
_, binary = cv.threshold(enhanced, thresh_val, 255, cv.THRESH_BINARY_INV) |
|
|
|
|
|
|
|
|
binary = self.aggressive_morphology(binary, eye_size) |
|
|
|
|
|
|
|
|
candidate = self.connected_components_analysis(binary, params) |
|
|
|
|
|
if candidate and candidate['score'] > best_score: |
|
|
best_candidate = candidate |
|
|
best_score = candidate['score'] |
|
|
best_threshold = thresh_val |
|
|
|
|
|
if not best_candidate: |
|
|
return None |
|
|
|
|
|
|
|
|
dt_center = self.distance_transform_refinement(best_candidate['mask']) |
|
|
best_candidate['dt_center'] = dt_center |
|
|
best_candidate['threshold'] = best_threshold |
|
|
|
|
|
return best_candidate |
|
|
|
|
|
def determine_gaze_position(self, centroid_x: int, width: int, prev_position: Optional[str]) -> str: |
|
|
"""Determine gaze position (LEFT/CENTER/RIGHT)""" |
|
|
ratio = centroid_x / width |
|
|
|
|
|
|
|
|
if ratio < self.config.LEFT_BOUNDARY: |
|
|
position = "LEFT" |
|
|
elif ratio > self.config.RIGHT_BOUNDARY: |
|
|
position = "RIGHT" |
|
|
else: |
|
|
position = "CENTER" |
|
|
|
|
|
|
|
|
if prev_position and prev_position != "UNKNOWN": |
|
|
if position == "LEFT" and self.config.SMOOTHING_LEFT_MIN < ratio < self.config.SMOOTHING_LEFT_MAX: |
|
|
position = prev_position |
|
|
elif position == "RIGHT" and self.config.SMOOTHING_RIGHT_MIN < ratio < self.config.SMOOTHING_RIGHT_MAX: |
|
|
position = prev_position |
|
|
elif position == "CENTER" and prev_position != "CENTER": |
|
|
if ratio < self.config.SMOOTHING_LEFT_MAX or ratio > self.config.SMOOTHING_RIGHT_MIN: |
|
|
position = prev_position |
|
|
|
|
|
return position |
|
|
|
|
|
def estimate_eye_position(self, eye_region: np.ndarray, prev_position: Optional[str] = None) -> Tuple[str, Dict]: |
|
|
""" |
|
|
Estimate eye gaze position using OPTIMIZED METHOD |
|
|
|
|
|
Priority untuk centroid: Distance Transform > Ellipse > Connected Components |
|
|
""" |
|
|
if eye_region is None or eye_region.size == 0: |
|
|
return "UNKNOWN", {} |
|
|
|
|
|
h, w = eye_region.shape |
|
|
if h < 5 or w < 10: |
|
|
return "UNKNOWN", {} |
|
|
|
|
|
try: |
|
|
eye_size = self.classify_eye_size(eye_region) |
|
|
enhanced = self.adaptive_preprocessing(eye_region, eye_size) |
|
|
|
|
|
if enhanced is None: |
|
|
return "UNKNOWN", {} |
|
|
|
|
|
pupil_data = self.detect_pupil(enhanced, eye_size) |
|
|
|
|
|
if not pupil_data: |
|
|
return "UNKNOWN", {} |
|
|
|
|
|
|
|
|
scale_factor = self.config.ADAPTIVE_PARAMS[eye_size]['scale_factor'] |
|
|
cx_dt, cy_dt = pupil_data['dt_center'] |
|
|
|
|
|
|
|
|
centroid_x = int(cx_dt / scale_factor) |
|
|
|
|
|
|
|
|
position = self.determine_gaze_position(centroid_x, w, prev_position) |
|
|
|
|
|
return position, { |
|
|
'eye_size': eye_size, |
|
|
'centroid': (centroid_x, int(cy_dt / scale_factor)), |
|
|
'circularity': pupil_data['circularity'], |
|
|
'solidity': pupil_data['solidity'], |
|
|
'dt_center': pupil_data['dt_center'], |
|
|
'threshold': pupil_data['threshold'] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error estimating eye position: {e}") |
|
|
return "UNKNOWN", {} |
|
|
|
|
|
def process_frame(self, frame: np.ndarray) -> Dict: |
|
|
"""Process single frame and return analysis""" |
|
|
result = { |
|
|
'face_detected': False, |
|
|
'blink_detected': False, |
|
|
'blink_ratio': 0.0, |
|
|
'right_eye': {'position': 'UNKNOWN', 'data': {}}, |
|
|
'left_eye': {'position': 'UNKNOWN', 'data': {}}, |
|
|
'gaze_position': 'UNKNOWN' |
|
|
} |
|
|
|
|
|
try: |
|
|
landmarks = self.detect_landmarks(frame) |
|
|
|
|
|
if landmarks is None: |
|
|
return result |
|
|
|
|
|
result['face_detected'] = True |
|
|
|
|
|
|
|
|
blink_ratio = self.calculate_blink_ratio(landmarks) |
|
|
result['blink_ratio'] = round(blink_ratio, 2) |
|
|
result['blink_detected'] = bool(blink_ratio > self.config.BLINK_THRESHOLD) |
|
|
|
|
|
if not result['blink_detected']: |
|
|
|
|
|
right_eye_coords = [landmarks[i] for i in self.config.RIGHT_EYE] |
|
|
right_eye_region = self.extract_eye_region(frame, right_eye_coords) |
|
|
|
|
|
if right_eye_region is not None: |
|
|
right_position, right_data = self.estimate_eye_position( |
|
|
right_eye_region, self.prev_position_right |
|
|
) |
|
|
result['right_eye'] = {'position': right_position, 'data': right_data} |
|
|
self.prev_position_right = right_position |
|
|
|
|
|
|
|
|
left_eye_coords = [landmarks[i] for i in self.config.LEFT_EYE] |
|
|
left_eye_region = self.extract_eye_region(frame, left_eye_coords) |
|
|
|
|
|
if left_eye_region is not None: |
|
|
left_position, left_data = self.estimate_eye_position( |
|
|
left_eye_region, self.prev_position_left |
|
|
) |
|
|
result['left_eye'] = {'position': left_position, 'data': left_data} |
|
|
self.prev_position_left = left_position |
|
|
|
|
|
|
|
|
if result['right_eye']['position'] == result['left_eye']['position']: |
|
|
result['gaze_position'] = result['right_eye']['position'] |
|
|
elif result['right_eye']['position'] == 'UNKNOWN': |
|
|
result['gaze_position'] = result['left_eye']['position'] |
|
|
elif result['left_eye']['position'] == 'UNKNOWN': |
|
|
result['gaze_position'] = result['right_eye']['position'] |
|
|
else: |
|
|
result['gaze_position'] = result['right_eye']['position'] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing frame: {e}") |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
class EyeTrackingService: |
|
|
""" |
|
|
Eye Tracking Service for SWARA API |
|
|
|
|
|
Analyzes eye contact and gaze patterns in videos |
|
|
""" |
|
|
|
|
|
|
|
|
_tracker = None |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize service""" |
|
|
if EyeTrackingService._tracker is None: |
|
|
logger.info("Initializing Eye Tracking Service...") |
|
|
EyeTrackingService._tracker = EyeTracker() |
|
|
logger.info("✓ Eye Tracking Service initialized") |
|
|
|
|
|
def calculate_score(self, gaze_away_time: float) -> Tuple[int, str]: |
|
|
"""Calculate score based on gaze away time""" |
|
|
config = EyeTrackingConfig() |
|
|
for score, (threshold, rating) in sorted( |
|
|
config.SCORE_THRESHOLDS.items(), reverse=True |
|
|
): |
|
|
if gaze_away_time <= threshold: |
|
|
return score, rating |
|
|
return 1, "Perlu Ditingkatkan" |
|
|
|
|
|
def _annotate_frame( |
|
|
self, |
|
|
frame: np.ndarray, |
|
|
result: Dict, |
|
|
frame_number: int, |
|
|
total_blinks: int, |
|
|
gaze_position: str |
|
|
) -> np.ndarray: |
|
|
""" |
|
|
Annotate frame with eye tracking information |
|
|
|
|
|
Args: |
|
|
frame: Original frame |
|
|
result: Analysis result from process_frame |
|
|
frame_number: Current frame number |
|
|
total_blinks: Total blinks detected so far |
|
|
gaze_position: Current gaze position |
|
|
|
|
|
Returns: |
|
|
Annotated frame |
|
|
""" |
|
|
annotated = frame.copy() |
|
|
|
|
|
|
|
|
COLOR_GREEN = (0, 255, 0) |
|
|
COLOR_RED = (0, 0, 255) |
|
|
COLOR_YELLOW = (0, 255, 255) |
|
|
COLOR_BLUE = (255, 0, 0) |
|
|
COLOR_WHITE = (255, 255, 255) |
|
|
|
|
|
|
|
|
overlay = annotated.copy() |
|
|
cv.rectangle(overlay, (10, 10), (400, 180), (0, 0, 0), -1) |
|
|
cv.addWeighted(overlay, 0.6, annotated, 0.4, 0, annotated) |
|
|
|
|
|
|
|
|
cv.putText(annotated, f"Frame: {frame_number}", (20, 35), |
|
|
cv.FONT_HERSHEY_SIMPLEX, 0.6, COLOR_WHITE, 2) |
|
|
|
|
|
|
|
|
face_status = "DETECTED" if result['face_detected'] else "NOT DETECTED" |
|
|
face_color = COLOR_GREEN if result['face_detected'] else COLOR_RED |
|
|
cv.putText(annotated, f"Face: {face_status}", (20, 60), |
|
|
cv.FONT_HERSHEY_SIMPLEX, 0.6, face_color, 2) |
|
|
|
|
|
|
|
|
blink_status = "BLINKING" if result['blink_detected'] else "OPEN" |
|
|
blink_color = COLOR_YELLOW if result['blink_detected'] else COLOR_GREEN |
|
|
cv.putText(annotated, f"Eyes: {blink_status} | Ratio: {result['blink_ratio']:.2f}", |
|
|
(20, 85), cv.FONT_HERSHEY_SIMPLEX, 0.6, blink_color, 2) |
|
|
cv.putText(annotated, f"Total Blinks: {total_blinks}", (20, 110), |
|
|
cv.FONT_HERSHEY_SIMPLEX, 0.6, COLOR_WHITE, 2) |
|
|
|
|
|
|
|
|
if gaze_position == 'CENTER': |
|
|
gaze_color = COLOR_GREEN |
|
|
elif gaze_position in ['LEFT', 'RIGHT']: |
|
|
gaze_color = COLOR_YELLOW |
|
|
else: |
|
|
gaze_color = COLOR_RED |
|
|
|
|
|
cv.putText(annotated, f"Gaze: {gaze_position}", (20, 135), |
|
|
cv.FONT_HERSHEY_SIMPLEX, 0.7, gaze_color, 2) |
|
|
|
|
|
|
|
|
if result['face_detected'] and not result['blink_detected']: |
|
|
left_pos = result['left_eye']['position'] |
|
|
right_pos = result['right_eye']['position'] |
|
|
cv.putText(annotated, f"L:{left_pos} | R:{right_pos}", (20, 160), |
|
|
cv.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_BLUE, 2) |
|
|
|
|
|
|
|
|
h, w = annotated.shape[:2] |
|
|
indicator_y = h - 60 |
|
|
|
|
|
|
|
|
if gaze_position == 'CENTER': |
|
|
cv.circle(annotated, (w // 2, indicator_y), 30, COLOR_GREEN, -1) |
|
|
cv.putText(annotated, "CENTER", (w // 2 - 50, indicator_y + 10), |
|
|
cv.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2) |
|
|
elif gaze_position == 'LEFT': |
|
|
cv.arrowedLine(annotated, (w // 2, indicator_y), (w // 2 - 80, indicator_y), |
|
|
COLOR_YELLOW, 5, tipLength=0.3) |
|
|
cv.putText(annotated, "LEFT", (w // 2 - 150, indicator_y + 10), |
|
|
cv.FONT_HERSHEY_SIMPLEX, 0.8, COLOR_YELLOW, 2) |
|
|
elif gaze_position == 'RIGHT': |
|
|
cv.arrowedLine(annotated, (w // 2, indicator_y), (w // 2 + 80, indicator_y), |
|
|
COLOR_YELLOW, 5, tipLength=0.3) |
|
|
cv.putText(annotated, "RIGHT", (w // 2 + 50, indicator_y + 10), |
|
|
cv.FONT_HERSHEY_SIMPLEX, 0.8, COLOR_YELLOW, 2) |
|
|
else: |
|
|
cv.putText(annotated, "UNKNOWN", (w // 2 - 60, indicator_y + 10), |
|
|
cv.FONT_HERSHEY_SIMPLEX, 0.8, COLOR_RED, 2) |
|
|
|
|
|
return annotated |
|
|
|
|
|
def analyze_video( |
|
|
self, |
|
|
video_path: str, |
|
|
progress_callback: Optional[callable] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Analyze video for eye contact |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
progress_callback: Optional callback for progress updates |
|
|
|
|
|
Returns: |
|
|
Dict containing eye tracking analysis results |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Analyzing video with Eye Tracking Service: {video_path}") |
|
|
|
|
|
cap = cv.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Cannot open video: {video_path}") |
|
|
|
|
|
|
|
|
fps = int(cap.get(cv.CAP_PROP_FPS)) or 30 |
|
|
width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) |
|
|
total_frames = int(cap.get(cv.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
logger.info(f"Video properties: {width}x{height} @ {fps}FPS, {total_frames} frames") |
|
|
|
|
|
|
|
|
frame_count = 0 |
|
|
blink_count = 0 |
|
|
position_counts = {'CENTER': 0, 'LEFT': 0, 'RIGHT': 0, 'UNKNOWN': 0} |
|
|
|
|
|
prev_blink = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
config = EyeTrackingConfig() |
|
|
min_frames_away = int(fps * config.MIN_GAZE_AWAY_DURATION) |
|
|
|
|
|
current_gaze_away_start = None |
|
|
total_gaze_away_time = 0.0 |
|
|
gaze_away_periods = [] |
|
|
|
|
|
|
|
|
debug_stats = { |
|
|
'face_detected_frames': 0, |
|
|
'pupil_detected_frames': 0, |
|
|
'center_gaze_frames': 0, |
|
|
'left_gaze_frames': 0, |
|
|
'right_gaze_frames': 0, |
|
|
'unknown_frames': 0, |
|
|
'raw_gaze_away_frames': 0, |
|
|
'filtered_gaze_away_frames': 0 |
|
|
} |
|
|
|
|
|
logger.info("Starting frame processing...") |
|
|
logger.info(f"Minimum gaze away duration: {config.MIN_GAZE_AWAY_DURATION}s ({min_frames_away} frames)") |
|
|
|
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
frame_count += 1 |
|
|
|
|
|
|
|
|
if progress_callback and frame_count % 30 == 0: |
|
|
progress = int((frame_count / total_frames) * 100) |
|
|
progress_callback(frame_count, total_frames, f"Eye tracking: {progress}%") |
|
|
|
|
|
|
|
|
result = self._tracker.process_frame(frame) |
|
|
|
|
|
|
|
|
if result['face_detected']: |
|
|
debug_stats['face_detected_frames'] += 1 |
|
|
|
|
|
|
|
|
if result['blink_detected'] and not prev_blink: |
|
|
blink_count += 1 |
|
|
logger.debug(f"Frame {frame_count}: Blink detected (total: {blink_count})") |
|
|
prev_blink = result['blink_detected'] |
|
|
|
|
|
|
|
|
gaze_pos = result['gaze_position'] |
|
|
position_counts[gaze_pos] = position_counts.get(gaze_pos, 0) + 1 |
|
|
|
|
|
|
|
|
if gaze_pos == 'CENTER': |
|
|
debug_stats['center_gaze_frames'] += 1 |
|
|
elif gaze_pos == 'LEFT': |
|
|
debug_stats['left_gaze_frames'] += 1 |
|
|
elif gaze_pos == 'RIGHT': |
|
|
debug_stats['right_gaze_frames'] += 1 |
|
|
else: |
|
|
debug_stats['unknown_frames'] += 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_looking_away = (gaze_pos == 'LEFT' or gaze_pos == 'RIGHT') |
|
|
|
|
|
if is_looking_away: |
|
|
debug_stats['raw_gaze_away_frames'] += 1 |
|
|
|
|
|
|
|
|
if current_gaze_away_start is None: |
|
|
current_gaze_away_start = frame_count |
|
|
|
|
|
else: |
|
|
|
|
|
if current_gaze_away_start is not None: |
|
|
period_frames = frame_count - current_gaze_away_start |
|
|
period_duration = period_frames / fps |
|
|
|
|
|
|
|
|
if period_frames >= min_frames_away: |
|
|
total_gaze_away_time += period_duration |
|
|
debug_stats['filtered_gaze_away_frames'] += period_frames |
|
|
gaze_away_periods.append({ |
|
|
'start_frame': current_gaze_away_start, |
|
|
'end_frame': frame_count - 1, |
|
|
'duration': period_duration, |
|
|
'frames': period_frames |
|
|
}) |
|
|
logger.debug(f"Gaze away period detected: frames {current_gaze_away_start}-{frame_count-1} " |
|
|
f"({period_duration:.2f}s)") |
|
|
else: |
|
|
|
|
|
logger.debug(f"Gaze away period too short (ignored): {period_frames} frames " |
|
|
f"({period_duration:.2f}s < {config.MIN_GAZE_AWAY_DURATION}s)") |
|
|
|
|
|
current_gaze_away_start = None |
|
|
|
|
|
|
|
|
if frame_count % 100 == 0: |
|
|
logger.info(f"Processed {frame_count}/{total_frames} frames | " |
|
|
f"Gaze: C:{debug_stats['center_gaze_frames']} " |
|
|
f"L:{debug_stats['left_gaze_frames']} " |
|
|
f"R:{debug_stats['right_gaze_frames']} | " |
|
|
f"Blinks: {blink_count}") |
|
|
|
|
|
cap.release() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if current_gaze_away_start is not None: |
|
|
period_frames = frame_count - current_gaze_away_start |
|
|
period_duration = period_frames / fps |
|
|
|
|
|
if period_frames >= min_frames_away: |
|
|
total_gaze_away_time += period_duration |
|
|
debug_stats['filtered_gaze_away_frames'] += period_frames |
|
|
gaze_away_periods.append({ |
|
|
'start_frame': current_gaze_away_start, |
|
|
'end_frame': frame_count, |
|
|
'duration': period_duration, |
|
|
'frames': period_frames |
|
|
}) |
|
|
logger.debug(f"Final gaze away period: frames {current_gaze_away_start}-{frame_count} " |
|
|
f"({period_duration:.2f}s)") |
|
|
|
|
|
|
|
|
duration = frame_count / fps |
|
|
gaze_away_time = total_gaze_away_time |
|
|
score, rating = self.calculate_score(gaze_away_time) |
|
|
|
|
|
|
|
|
logger.info("="*60) |
|
|
logger.info("EYE TRACKING ANALYSIS SUMMARY") |
|
|
logger.info("="*60) |
|
|
logger.info(f"Total Frames Processed: {frame_count}") |
|
|
logger.info(f"Face Detection Rate: {debug_stats['face_detected_frames']}/{frame_count} " |
|
|
f"({debug_stats['face_detected_frames']/frame_count*100:.1f}%)") |
|
|
logger.info(f"\nGaze Distribution:") |
|
|
logger.info(f" CENTER: {debug_stats['center_gaze_frames']} frames " |
|
|
f"({debug_stats['center_gaze_frames']/frame_count*100:.1f}%)") |
|
|
logger.info(f" LEFT: {debug_stats['left_gaze_frames']} frames " |
|
|
f"({debug_stats['left_gaze_frames']/frame_count*100:.1f}%)") |
|
|
logger.info(f" RIGHT: {debug_stats['right_gaze_frames']} frames " |
|
|
f"({debug_stats['right_gaze_frames']/frame_count*100:.1f}%)") |
|
|
logger.info(f" UNKNOWN: {debug_stats['unknown_frames']} frames " |
|
|
f"({debug_stats['unknown_frames']/frame_count*100:.1f}%)") |
|
|
logger.info(f"\n📊 Gaze Away Analysis:") |
|
|
logger.info(f" Raw LEFT/RIGHT frames: {debug_stats['raw_gaze_away_frames']} frames " |
|
|
f"({debug_stats['raw_gaze_away_frames']/frame_count*100:.1f}%)") |
|
|
logger.info(f" Filtered gaze away frames: {debug_stats['filtered_gaze_away_frames']} frames " |
|
|
f"({debug_stats['filtered_gaze_away_frames']/frame_count*100:.1f}%)") |
|
|
logger.info(f" Continuous gaze away periods: {len(gaze_away_periods)} periods") |
|
|
logger.info(f" Total gaze away time: {gaze_away_time:.2f}s / {duration:.2f}s " |
|
|
f"({gaze_away_time/duration*100:.1f}%)") |
|
|
logger.info(f" Minimum duration threshold: {config.MIN_GAZE_AWAY_DURATION}s") |
|
|
logger.info(f"Total Blinks: {blink_count} ({blink_count/duration*60:.1f} blinks/minute)") |
|
|
logger.info(f"\nFinal Score: {score}/5 - {rating}") |
|
|
logger.info("="*60) |
|
|
|
|
|
|
|
|
result = { |
|
|
'success': True, |
|
|
'video_info': { |
|
|
'duration': round(duration, 2), |
|
|
'fps': fps, |
|
|
'total_frames': frame_count, |
|
|
'resolution': f"{width}x{height}" |
|
|
}, |
|
|
'eye_contact_analysis': { |
|
|
'total_gaze_away_time': round(gaze_away_time, 2), |
|
|
'gaze_away_percentage': round((gaze_away_time / duration) * 100, 2) if duration > 0 else 0, |
|
|
'score': score, |
|
|
'rating': rating, |
|
|
'position_distribution': { |
|
|
k: { |
|
|
'frames': v, |
|
|
'percentage': round((v / frame_count) * 100, 2) if frame_count > 0 else 0 |
|
|
} |
|
|
for k, v in position_counts.items() |
|
|
} |
|
|
}, |
|
|
'blink_analysis': { |
|
|
'total_blinks': blink_count, |
|
|
'blinks_per_minute': round((blink_count / duration) * 60, 2) if duration > 0 else 0 |
|
|
}, |
|
|
'debug_stats': debug_stats, |
|
|
'gaze_away_periods': gaze_away_periods |
|
|
} |
|
|
|
|
|
logger.info(f"✓ Eye Tracking analysis completed: Score {score}/5 - {rating}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"✗ Eye Tracking analysis failed: {e}") |
|
|
raise |
|
|
|