IntegrationTest / ui /pipeline.py
Abdelrahman Almatrooshi
Integrate L2CS-Net gaze estimation
2eba0cc
import collections
import glob
import json
import math
import os
import pathlib
import sys
import numpy as np
import joblib
_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
from models.face_mesh import FaceMeshDetector
from models.head_pose import HeadPoseEstimator
from models.eye_scorer import EyeBehaviourScorer, compute_mar, MAR_YAWN_THRESHOLD
from models.eye_crop import extract_eye_crops
from models.eye_classifier import load_eye_classifier, GeometricOnlyClassifier
from models.collect_features import FEATURE_NAMES, TemporalTracker, extract_features
_FEAT_IDX = {name: i for i, name in enumerate(FEATURE_NAMES)}
def _clip_features(vec):
"""Clip raw features to the same ranges used during training."""
out = vec.copy()
_i = _FEAT_IDX
out[_i["yaw"]] = np.clip(out[_i["yaw"]], -45, 45)
out[_i["pitch"]] = np.clip(out[_i["pitch"]], -30, 30)
out[_i["roll"]] = np.clip(out[_i["roll"]], -30, 30)
out[_i["head_deviation"]] = math.sqrt(
float(out[_i["yaw"]]) ** 2 + float(out[_i["pitch"]]) ** 2
)
for f in ("ear_left", "ear_right", "ear_avg"):
out[_i[f]] = np.clip(out[_i[f]], 0, 0.85)
out[_i["mar"]] = np.clip(out[_i["mar"]], 0, 1.0)
out[_i["gaze_offset"]] = np.clip(out[_i["gaze_offset"]], 0, 0.50)
out[_i["perclos"]] = np.clip(out[_i["perclos"]], 0, 0.80)
out[_i["blink_rate"]] = np.clip(out[_i["blink_rate"]], 0, 30.0)
out[_i["closure_duration"]] = np.clip(out[_i["closure_duration"]], 0, 10.0)
out[_i["yawn_duration"]] = np.clip(out[_i["yawn_duration"]], 0, 10.0)
return out
class _OutputSmoother:
# Asymmetric EMA: rises fast (recognise focus), falls slower (avoid flicker).
# Grace period holds score steady for a few frames when face is lost.
def __init__(self, alpha_up=0.55, alpha_down=0.45, grace_frames=10):
self._alpha_up = alpha_up
self._alpha_down = alpha_down
self._grace = grace_frames
self._score = 0.5
self._no_face = 0
def reset(self):
self._score = 0.5
self._no_face = 0
def update(self, raw_score, face_detected):
if face_detected:
self._no_face = 0
alpha = self._alpha_up if raw_score > self._score else self._alpha_down
self._score += alpha * (raw_score - self._score)
else:
self._no_face += 1
if self._no_face > self._grace:
self._score *= 0.80
return self._score
DEFAULT_HYBRID_CONFIG = {
"w_mlp": 0.7,
"w_geo": 0.3,
"threshold": 0.55,
"use_yawn_veto": True,
"geo_face_weight": 0.4,
"geo_eye_weight": 0.6,
"mar_yawn_threshold": float(MAR_YAWN_THRESHOLD),
}
class _RuntimeFeatureEngine:
"""Runtime feature engineering (magnitudes, velocities, variances) with EMA baselines."""
_MAG_FEATURES = ["pitch", "yaw", "head_deviation", "gaze_offset", "v_gaze", "h_gaze"]
_VEL_FEATURES = ["pitch", "yaw", "h_gaze", "v_gaze", "head_deviation", "gaze_offset"]
_VAR_FEATURES = ["h_gaze", "v_gaze", "pitch"]
_VAR_WINDOW = 30
_WARMUP = 15
def __init__(self, base_feature_names, norm_features=None):
self._base_names = list(base_feature_names)
self._norm_features = list(norm_features) if norm_features else []
tracked = set(self._MAG_FEATURES) | set(self._norm_features)
self._ema_mean = {f: 0.0 for f in tracked}
self._ema_var = {f: 1.0 for f in tracked}
self._n = 0
self._prev = None
self._var_bufs = {
f: collections.deque(maxlen=self._VAR_WINDOW) for f in self._VAR_FEATURES
}
self._ext_names = (
list(self._base_names)
+ [f"{f}_mag" for f in self._MAG_FEATURES]
+ [f"{f}_vel" for f in self._VEL_FEATURES]
+ [f"{f}_var" for f in self._VAR_FEATURES]
)
@property
def extended_names(self):
return list(self._ext_names)
def transform(self, base_vec):
self._n += 1
raw = {name: float(base_vec[i]) for i, name in enumerate(self._base_names)}
alpha = 2.0 / (min(self._n, 120) + 1)
for feat in self._ema_mean:
if feat not in raw:
continue
v = raw[feat]
if self._n == 1:
self._ema_mean[feat] = v
self._ema_var[feat] = 0.0
else:
self._ema_mean[feat] += alpha * (v - self._ema_mean[feat])
self._ema_var[feat] += alpha * (
(v - self._ema_mean[feat]) ** 2 - self._ema_var[feat]
)
out = base_vec.copy().astype(np.float32)
if self._n > self._WARMUP:
for feat in self._norm_features:
if feat in raw:
idx = self._base_names.index(feat)
std = max(math.sqrt(self._ema_var[feat]), 1e-6)
out[idx] = (raw[feat] - self._ema_mean[feat]) / std
mag = np.zeros(len(self._MAG_FEATURES), dtype=np.float32)
for i, feat in enumerate(self._MAG_FEATURES):
if feat in raw:
mag[i] = abs(raw[feat] - self._ema_mean.get(feat, raw[feat]))
vel = np.zeros(len(self._VEL_FEATURES), dtype=np.float32)
if self._prev is not None:
for i, feat in enumerate(self._VEL_FEATURES):
if feat in raw and feat in self._prev:
vel[i] = abs(raw[feat] - self._prev[feat])
self._prev = dict(raw)
for feat in self._VAR_FEATURES:
if feat in raw:
self._var_bufs[feat].append(raw[feat])
var = np.zeros(len(self._VAR_FEATURES), dtype=np.float32)
for i, feat in enumerate(self._VAR_FEATURES):
buf = self._var_bufs[feat]
if len(buf) >= 2:
arr = np.array(buf)
var[i] = float(arr.var())
return np.concatenate([out, mag, vel, var])
class FaceMeshPipeline:
def __init__(
self,
max_angle: float = 22.0,
alpha: float = 0.4,
beta: float = 0.6,
threshold: float = 0.55,
eye_model_path: str | None = None,
eye_backend: str = "yolo",
eye_blend: float = 0.5,
detector=None,
):
self.detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self.head_pose = HeadPoseEstimator(max_angle=max_angle)
self.eye_scorer = EyeBehaviourScorer()
self.alpha = alpha
self.beta = beta
self.threshold = threshold
self.eye_blend = eye_blend
self.eye_classifier = load_eye_classifier(
path=eye_model_path if eye_model_path and os.path.exists(eye_model_path) else None,
backend=eye_backend,
device="cpu",
)
self._has_eye_model = not isinstance(self.eye_classifier, GeometricOnlyClassifier)
if self._has_eye_model:
print(f"[PIPELINE] Eye model: {self.eye_classifier.name}")
self._smoother = _OutputSmoother()
def process_frame(self, bgr_frame: np.ndarray) -> dict:
landmarks = self.detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"is_focused": False,
"yaw": None,
"pitch": None,
"roll": None,
"mar": None,
"is_yawning": False,
"left_bbox": None,
"right_bbox": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self.threshold
return out
angles = self.head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
out["s_face"] = self.head_pose.score(landmarks, w, h)
s_eye_geo = self.eye_scorer.score(landmarks)
if self._has_eye_model:
left_crop, right_crop, left_bbox, right_bbox = extract_eye_crops(bgr_frame, landmarks)
out["left_bbox"] = left_bbox
out["right_bbox"] = right_bbox
s_eye_model = self.eye_classifier.predict_score([left_crop, right_crop])
out["s_eye"] = (1.0 - self.eye_blend) * s_eye_geo + self.eye_blend * s_eye_model
else:
out["s_eye"] = s_eye_geo
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > MAR_YAWN_THRESHOLD
raw = self.alpha * out["s_face"] + self.beta * out["s_eye"]
if out["is_yawning"]:
raw = 0.0
out["raw_score"] = self._smoother.update(raw, True)
out["is_focused"] = out["raw_score"] >= self.threshold
return out
@property
def has_eye_model(self) -> bool:
return self._has_eye_model
def reset_session(self):
self._smoother.reset()
def close(self):
if self._owns_detector:
self.detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def _latest_model_artifacts(model_dir):
model_files = sorted(glob.glob(os.path.join(model_dir, "model_*.joblib")))
if not model_files:
model_files = sorted(glob.glob(os.path.join(model_dir, "mlp_*.joblib")))
if not model_files:
return None, None, None
basename = os.path.basename(model_files[-1])
tag = ""
for prefix in ("model_", "mlp_"):
if basename.startswith(prefix):
tag = basename[len(prefix) :].replace(".joblib", "")
break
scaler_path = os.path.join(model_dir, f"scaler_{tag}.joblib")
meta_path = os.path.join(model_dir, f"meta_{tag}.npz")
if not os.path.isfile(scaler_path) or not os.path.isfile(meta_path):
return None, None, None
return model_files[-1], scaler_path, meta_path
def _load_hybrid_config(model_dir: str, config_path: str | None = None):
cfg = dict(DEFAULT_HYBRID_CONFIG)
resolved = config_path or os.path.join(model_dir, "hybrid_focus_config.json")
if not os.path.isfile(resolved):
print(f"[HYBRID] No config found at {resolved}; using defaults")
return cfg, None
with open(resolved, "r", encoding="utf-8") as f:
file_cfg = json.load(f)
for key in DEFAULT_HYBRID_CONFIG:
if key in file_cfg:
cfg[key] = file_cfg[key]
cfg["w_mlp"] = float(cfg["w_mlp"])
cfg["w_geo"] = float(cfg["w_geo"])
weight_sum = cfg["w_mlp"] + cfg["w_geo"]
if weight_sum <= 0:
raise ValueError("[HYBRID] Invalid config: w_mlp + w_geo must be > 0")
cfg["w_mlp"] /= weight_sum
cfg["w_geo"] /= weight_sum
cfg["threshold"] = float(cfg["threshold"])
cfg["use_yawn_veto"] = bool(cfg["use_yawn_veto"])
cfg["geo_face_weight"] = float(cfg["geo_face_weight"])
cfg["geo_eye_weight"] = float(cfg["geo_eye_weight"])
cfg["mar_yawn_threshold"] = float(cfg["mar_yawn_threshold"])
print(f"[HYBRID] Loaded config: {resolved}")
return cfg, resolved
class MLPPipeline:
def __init__(self, model_dir=None, detector=None, threshold=0.5):
if model_dir is None:
# Check primary location
model_dir = os.path.join(_PROJECT_ROOT, "MLP", "models")
if not os.path.exists(model_dir):
model_dir = os.path.join(_PROJECT_ROOT, "checkpoints")
mlp_path, scaler_path, meta_path = _latest_model_artifacts(model_dir)
if mlp_path is None:
raise FileNotFoundError(f"No MLP artifacts in {model_dir}")
self._mlp = joblib.load(mlp_path)
self._scaler = joblib.load(scaler_path)
meta = np.load(meta_path, allow_pickle=True)
self._feature_names = list(meta["feature_names"])
norm_feats = list(meta["norm_features"]) if "norm_features" in meta else []
self._engine = _RuntimeFeatureEngine(FEATURE_NAMES, norm_features=norm_feats)
ext_names = self._engine.extended_names
self._indices = [ext_names.index(n) for n in self._feature_names]
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._smoother = _OutputSmoother()
self._threshold = threshold
print(f"[MLP] Loaded {mlp_path} | {len(self._feature_names)} features | threshold={threshold}")
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"mlp_prob": 0.0,
"mar": None,
"yaw": None,
"pitch": None,
"roll": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal)
vec = _clip_features(vec)
out["yaw"] = float(vec[_FEAT_IDX["yaw"]])
out["pitch"] = float(vec[_FEAT_IDX["pitch"]])
out["roll"] = float(vec[_FEAT_IDX["roll"]])
out["s_face"] = float(vec[_FEAT_IDX["s_face"]])
out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]])
out["mar"] = float(vec[_FEAT_IDX["mar"]])
ext_vec = self._engine.transform(vec)
X = ext_vec[self._indices].reshape(1, -1).astype(np.float64)
X_sc = self._scaler.transform(X)
if hasattr(self._mlp, "predict_proba"):
mlp_prob = float(self._mlp.predict_proba(X_sc)[0, 1])
else:
mlp_prob = float(self._mlp.predict(X_sc)[0] == 1)
out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0))
out["raw_score"] = self._smoother.update(out["mlp_prob"], True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def reset_session(self):
self._temporal = TemporalTracker()
self._smoother.reset()
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
class HybridFocusPipeline:
def __init__(
self,
model_dir=None,
config_path: str | None = None,
eye_model_path: str | None = None,
eye_backend: str = "yolo",
eye_blend: float = 0.5,
max_angle: float = 22.0,
detector=None,
):
if model_dir is None:
model_dir = os.path.join(_PROJECT_ROOT, "checkpoints")
mlp_path, scaler_path, meta_path = _latest_model_artifacts(model_dir)
if mlp_path is None:
raise FileNotFoundError(f"No MLP artifacts in {model_dir}")
self._mlp = joblib.load(mlp_path)
self._scaler = joblib.load(scaler_path)
meta = np.load(meta_path, allow_pickle=True)
self._feature_names = list(meta["feature_names"])
norm_feats = list(meta["norm_features"]) if "norm_features" in meta else []
self._engine = _RuntimeFeatureEngine(FEATURE_NAMES, norm_features=norm_feats)
ext_names = self._engine.extended_names
self._indices = [ext_names.index(n) for n in self._feature_names]
self._cfg, self._cfg_path = _load_hybrid_config(model_dir=model_dir, config_path=config_path)
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator(max_angle=max_angle)
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._eye_blend = eye_blend
self.eye_classifier = load_eye_classifier(
path=eye_model_path if eye_model_path and os.path.exists(eye_model_path) else None,
backend=eye_backend,
device="cpu",
)
self._has_eye_model = not isinstance(self.eye_classifier, GeometricOnlyClassifier)
if self._has_eye_model:
print(f"[HYBRID] Eye model: {self.eye_classifier.name}")
self.head_pose = self._head_pose
self._smoother = _OutputSmoother()
print(
f"[HYBRID] Loaded {mlp_path} | {len(self._feature_names)} features | "
f"w_mlp={self._cfg['w_mlp']:.2f}, w_geo={self._cfg['w_geo']:.2f}, "
f"threshold={self._cfg['threshold']:.2f}"
)
@property
def has_eye_model(self) -> bool:
return self._has_eye_model
@property
def config(self) -> dict:
return dict(self._cfg)
def process_frame(self, bgr_frame: np.ndarray) -> dict:
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"focus_score": 0.0,
"mlp_prob": 0.0,
"geo_score": 0.0,
"raw_score": 0.0,
"s_face": 0.0,
"s_eye": 0.0,
"mar": None,
"is_yawning": False,
"yaw": None,
"pitch": None,
"roll": None,
"left_bbox": None,
"right_bbox": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["focus_score"] = smoothed
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._cfg["threshold"]
return out
angles = self._head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
out["s_face"] = self._head_pose.score(landmarks, w, h)
s_eye_geo = self._eye_scorer.score(landmarks)
if self._has_eye_model:
left_crop, right_crop, left_bbox, right_bbox = extract_eye_crops(bgr_frame, landmarks)
out["left_bbox"] = left_bbox
out["right_bbox"] = right_bbox
s_eye_model = self.eye_classifier.predict_score([left_crop, right_crop])
out["s_eye"] = (1.0 - self._eye_blend) * s_eye_geo + self._eye_blend * s_eye_model
else:
out["s_eye"] = s_eye_geo
geo_score = (
self._cfg["geo_face_weight"] * out["s_face"] +
self._cfg["geo_eye_weight"] * out["s_eye"]
)
geo_score = float(np.clip(geo_score, 0.0, 1.0))
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > self._cfg["mar_yawn_threshold"]
if self._cfg["use_yawn_veto"] and out["is_yawning"]:
geo_score = 0.0
out["geo_score"] = geo_score
pre = {
"angles": angles,
"s_face": out["s_face"],
"s_eye": s_eye_geo,
"mar": out["mar"],
}
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal, _pre=pre)
vec = _clip_features(vec)
ext_vec = self._engine.transform(vec)
X = ext_vec[self._indices].reshape(1, -1).astype(np.float64)
X_sc = self._scaler.transform(X)
if hasattr(self._mlp, "predict_proba"):
mlp_prob = float(self._mlp.predict_proba(X_sc)[0, 1])
else:
mlp_prob = float(self._mlp.predict(X_sc)[0] == 1)
out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0))
focus_score = self._cfg["w_mlp"] * out["mlp_prob"] + self._cfg["w_geo"] * out["geo_score"]
out["focus_score"] = self._smoother.update(float(np.clip(focus_score, 0.0, 1.0)), True)
out["raw_score"] = out["focus_score"]
out["is_focused"] = out["focus_score"] >= self._cfg["threshold"]
return out
def reset_session(self):
self._temporal = TemporalTracker()
self._smoother.reset()
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
class XGBoostPipeline:
"""Real-time XGBoost inference pipeline using the same feature extraction as MLPPipeline."""
# Same 10 features used during training (data_preparation.prepare_dataset.SELECTED_FEATURES)
SELECTED = [
'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch',
'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos',
]
def __init__(self, model_path=None, threshold=0.5):
from xgboost import XGBClassifier
if model_path is None:
model_path = os.path.join(_PROJECT_ROOT, "models", "xgboost", "checkpoints", "face_orientation_best.json")
if not os.path.isfile(model_path):
# Fallback to legacy path
model_path = os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json")
if not os.path.isfile(model_path):
raise FileNotFoundError(f"No XGBoost checkpoint at {model_path}")
self._model = XGBClassifier()
self._model.load_model(model_path)
self._threshold = threshold
self._detector = FaceMeshDetector()
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._smoother = _OutputSmoother()
self._indices = [FEATURE_NAMES.index(n) for n in self.SELECTED]
print(f"[XGB] Loaded {model_path} | {len(self.SELECTED)} features, threshold={threshold}")
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"mar": None,
"yaw": None,
"pitch": None,
"roll": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal)
vec = _clip_features(vec)
out["yaw"] = float(vec[_FEAT_IDX["yaw"]])
out["pitch"] = float(vec[_FEAT_IDX["pitch"]])
out["roll"] = float(vec[_FEAT_IDX["roll"]])
out["s_face"] = float(vec[_FEAT_IDX["s_face"]])
out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]])
out["mar"] = float(vec[_FEAT_IDX["mar"]])
X = vec[self._indices].reshape(1, -1).astype(np.float32)
prob = self._model.predict_proba(X)[0] # [prob_unfocused, prob_focused]
out["raw_score"] = self._smoother.update(float(prob[1]), True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def reset_session(self):
self._temporal = TemporalTracker()
self._smoother.reset()
def close(self):
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def _resolve_l2cs_weights():
for p in [
os.path.join(_PROJECT_ROOT, "models", "L2CS-Net", "models", "L2CSNet_gaze360.pkl"),
os.path.join(_PROJECT_ROOT, "models", "L2CSNet_gaze360.pkl"),
os.path.join(_PROJECT_ROOT, "checkpoints", "L2CSNet_gaze360.pkl"),
]:
if os.path.isfile(p):
return p
return None
def is_l2cs_weights_available():
return _resolve_l2cs_weights() is not None
class L2CSPipeline:
# Uses in-tree l2cs.Pipeline (RetinaFace + ResNet50) for gaze estimation
# and MediaPipe for head pose, EAR, MAR, and roll de-rotation.
YAW_THRESHOLD = 22.0
PITCH_THRESHOLD = 20.0
def __init__(self, weights_path=None, arch="ResNet50", device="cpu",
threshold=0.52, detector=None):
resolved = weights_path or _resolve_l2cs_weights()
if resolved is None or not os.path.isfile(resolved):
raise FileNotFoundError(
"L2CS weights not found. Place L2CSNet_gaze360.pkl in "
"models/L2CS-Net/models/ or checkpoints/"
)
# add in-tree L2CS-Net to import path
l2cs_root = os.path.join(_PROJECT_ROOT, "models", "L2CS-Net")
if l2cs_root not in sys.path:
sys.path.insert(0, l2cs_root)
from l2cs import Pipeline as _L2CSPipeline
import torch
# bypass upstream select_device bug by constructing torch.device directly
self._pipeline = _L2CSPipeline(
weights=pathlib.Path(resolved), arch=arch, device=torch.device(device),
)
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._threshold = threshold
self._smoother = _OutputSmoother()
print(
f"[L2CS] Loaded {resolved} | arch={arch} device={device} "
f"yaw_thresh={self.YAW_THRESHOLD} pitch_thresh={self.PITCH_THRESHOLD} "
f"threshold={threshold}"
)
@staticmethod
def _derotate_gaze(pitch_rad, yaw_rad, roll_deg):
# remove head roll so tilted-but-looking-at-screen reads as (0,0)
roll_rad = -math.radians(roll_deg)
cos_r, sin_r = math.cos(roll_rad), math.sin(roll_rad)
return (yaw_rad * sin_r + pitch_rad * cos_r,
yaw_rad * cos_r - pitch_rad * sin_r)
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks, "is_focused": False, "raw_score": 0.0,
"s_face": 0.0, "s_eye": 0.0, "gaze_pitch": None, "gaze_yaw": None,
"yaw": None, "pitch": None, "roll": None, "mar": None, "is_yawning": False,
}
# MediaPipe: head pose, eye/mouth scores
roll_deg = 0.0
if landmarks is not None:
angles = self._head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
roll_deg = angles[2]
out["s_face"] = self._head_pose.score(landmarks, w, h)
out["s_eye"] = self._eye_scorer.score(landmarks)
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > MAR_YAWN_THRESHOLD
# L2CS gaze (uses its own RetinaFace detector internally)
results = self._pipeline.step(bgr_frame)
if results is None or results.pitch.shape[0] == 0:
smoothed = self._smoother.update(0.0, landmarks is not None)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
pitch_rad = float(results.pitch[0])
yaw_rad = float(results.yaw[0])
pitch_rad, yaw_rad = self._derotate_gaze(pitch_rad, yaw_rad, roll_deg)
out["gaze_pitch"] = pitch_rad
out["gaze_yaw"] = yaw_rad
yaw_deg = abs(math.degrees(yaw_rad))
pitch_deg = abs(math.degrees(pitch_rad))
# fall back to L2CS angles if MediaPipe didn't produce head pose
out["yaw"] = out.get("yaw") or math.degrees(yaw_rad)
out["pitch"] = out.get("pitch") or math.degrees(pitch_rad)
# cosine scoring: 1.0 at centre, 0.0 at threshold
yaw_t = min(yaw_deg / self.YAW_THRESHOLD, 1.0)
pitch_t = min(pitch_deg / self.PITCH_THRESHOLD, 1.0)
yaw_score = 0.5 * (1.0 + math.cos(math.pi * yaw_t))
pitch_score = 0.5 * (1.0 + math.cos(math.pi * pitch_t))
gaze_score = 0.55 * yaw_score + 0.45 * pitch_score
if out["is_yawning"]:
gaze_score = 0.0
out["raw_score"] = self._smoother.update(float(gaze_score), True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def reset_session(self):
self._smoother.reset()
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()