final_v2 / evaluation /feature_importance.py
k22056537
feat: sync integration updates across app and ML pipeline
eb4abb8
"""
Feature importance and leave-one-feature-out ablation for the 10 face_orientation features.
Run: python -m evaluation.feature_importance
Outputs:
- XGBoost gain-based importance (from trained checkpoint)
- Leave-one-feature-out LOPO F1 (ablation): drop each feature in turn, report mean LOPO F1.
- Writes evaluation/feature_selection_justification.md
"""
import os
import sys
import argparse
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score
from xgboost import XGBClassifier
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
from data_preparation.prepare_dataset import get_default_split_config, load_per_person, SELECTED_FEATURES
from models.xgboost.config import XGB_BASE_PARAMS, build_xgb_classifier, get_xgb_params
_, SEED = get_default_split_config()
FEATURES = SELECTED_FEATURES["face_orientation"]
def _resolve_xgb_path():
return os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json")
def xgb_feature_importance():
"""Load trained XGBoost and return gain-based importance for the 10 features."""
path = _resolve_xgb_path()
if not os.path.isfile(path):
print(f"[WARN] No XGBoost checkpoint at {path}; skip importance.")
return None
model = XGBClassifier()
model.load_model(path)
imp = model.get_booster().get_score(importance_type="gain")
# Booster uses f0, f1, ...; we use same order as FEATURES (training order)
by_idx = {int(k.replace("f", "")): v for k, v in imp.items() if k.startswith("f")}
order = [by_idx.get(i, 0.0) for i in range(len(FEATURES))]
return dict(zip(FEATURES, order))
def _make_eval_model(seed: int, quick: bool):
if not quick:
return build_xgb_classifier(seed, verbosity=0)
params = get_xgb_params()
params["n_estimators"] = 200
params["random_state"] = seed
params["verbosity"] = 0
return XGBClassifier(**params)
def run_ablation_lopo(by_person, persons, quick: bool):
"""Leave-one-feature-out: for each feature, train XGBoost on the other 9 with LOPO, report mean F1."""
results = {}
for drop_feat in FEATURES:
print(f" -> dropping {drop_feat} ({len(results)+1}/{len(FEATURES)})")
idx_keep = [i for i, f in enumerate(FEATURES) if f != drop_feat]
f1s = []
for held_out in persons:
train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out])
train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out])
X_test, y_test = by_person[held_out]
X_tr = train_X[:, idx_keep]
X_te = X_test[:, idx_keep]
scaler = StandardScaler().fit(X_tr)
X_tr_sc = scaler.transform(X_tr)
X_te_sc = scaler.transform(X_te)
xgb = _make_eval_model(SEED, quick)
xgb.fit(X_tr_sc, train_y)
pred = xgb.predict(X_te_sc)
f1s.append(f1_score(y_test, pred, average="weighted"))
results[drop_feat] = np.mean(f1s)
return results
def run_baseline_lopo_f1(by_person, persons, quick: bool):
"""Full 10-feature LOPO mean F1 for reference."""
f1s = []
for held_out in persons:
train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out])
train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out])
X_test, y_test = by_person[held_out]
scaler = StandardScaler().fit(train_X)
X_tr_sc = scaler.transform(train_X)
X_te_sc = scaler.transform(X_test)
xgb = _make_eval_model(SEED, quick)
xgb.fit(X_tr_sc, train_y)
pred = xgb.predict(X_te_sc)
f1s.append(f1_score(y_test, pred, average="weighted"))
return np.mean(f1s)
# Channel subsets for ablation (subset name -> list of feature names)
CHANNEL_SUBSETS = {
"head_pose": ["head_deviation", "s_face", "pitch"],
"eye_state": ["ear_left", "ear_avg", "ear_right", "perclos"],
"gaze": ["h_gaze", "gaze_offset", "s_eye"],
}
def run_channel_ablation(by_person, persons, quick: bool, baseline: float):
"""LOPO XGBoost with head-only, eye-only, gaze-only, and all 10. Returns dict subset_name -> mean F1."""
results = {}
for subset_name, feat_list in CHANNEL_SUBSETS.items():
print(f" -> channel {subset_name}")
idx_keep = [FEATURES.index(f) for f in feat_list]
f1s = []
for held_out in persons:
train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out])
train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out])
X_test, y_test = by_person[held_out]
X_tr = train_X[:, idx_keep]
X_te = X_test[:, idx_keep]
scaler = StandardScaler().fit(X_tr)
X_tr_sc = scaler.transform(X_tr)
X_te_sc = scaler.transform(X_te)
xgb = _make_eval_model(SEED, quick)
xgb.fit(X_tr_sc, train_y)
pred = xgb.predict(X_te_sc)
f1s.append(f1_score(y_test, pred, average="weighted"))
results[subset_name] = np.mean(f1s)
results["all_10"] = baseline
return results
def _parse_args():
parser = argparse.ArgumentParser(description="Feature importance + LOPO ablation")
parser.add_argument(
"--quick",
action="store_true",
help="Use fewer trees (200) for faster iteration.",
)
parser.add_argument(
"--skip-lofo",
action="store_true",
help="Skip leave-one-feature-out ablation.",
)
parser.add_argument(
"--skip-channel",
action="store_true",
help="Skip channel ablation.",
)
return parser.parse_args()
def main():
args = _parse_args()
print("=== Feature importance (XGBoost gain) ===")
if args.quick:
print("Running in quick mode (n_estimators=200).")
imp = xgb_feature_importance()
if imp:
for name in FEATURES:
print(f" {name}: {imp.get(name, 0):.2f}")
order = sorted(imp.items(), key=lambda x: -x[1])
print(" Top-5 by gain:", [x[0] for x in order[:5]])
print("\n[DATA] Loading per-person splits once...")
by_person, _, _ = load_per_person("face_orientation")
persons = sorted(by_person.keys())
print("\n=== Baseline LOPO (all 10 features) ===")
baseline = run_baseline_lopo_f1(by_person, persons, quick=args.quick)
print(f" Baseline (all 10 features) mean LOPO F1: {baseline:.4f}")
ablation = None
worst_drop = None
if args.skip_lofo:
print("\n=== Leave-one-feature-out ablation (LOPO mean F1) ===")
print(" skipped (--skip-lofo)")
else:
print("\n=== Leave-one-feature-out ablation (LOPO mean F1) ===")
ablation = run_ablation_lopo(by_person, persons, quick=args.quick)
for feat in FEATURES:
delta = baseline - ablation[feat]
print(f" drop {feat}: F1={ablation[feat]:.4f} (Δ={delta:+.4f})")
worst_drop = min(ablation.items(), key=lambda x: x[1])
print(f" Largest F1 drop when dropping: {worst_drop[0]} (F1={worst_drop[1]:.4f})")
channel_f1 = None
if args.skip_channel:
print("\n=== Channel ablation (LOPO mean F1) ===")
print(" skipped (--skip-channel)")
else:
print("\n=== Channel ablation (LOPO mean F1) ===")
channel_f1 = run_channel_ablation(by_person, persons, quick=args.quick, baseline=baseline)
for name, f1 in channel_f1.items():
print(f" {name}: {f1:.4f}")
out_dir = os.path.join(_PROJECT_ROOT, "evaluation")
out_path = os.path.join(out_dir, "feature_selection_justification.md")
lines = [
"# Feature selection justification",
"",
"The face_orientation model uses 10 of 17 extracted features. This document summarises empirical support.",
"",
"## 1. Domain rationale",
"",
"The 10 features were chosen to cover three channels:",
"- **Head pose:** head_deviation, s_face, pitch",
"- **Eye state:** ear_left, ear_right, ear_avg, perclos",
"- **Gaze:** h_gaze, gaze_offset, s_eye",
"",
"Excluded: v_gaze (noisy), mar (rare events), yaw/roll (redundant with head_deviation/s_face), blink_rate/closure_duration/yawn_duration (temporal overlap with perclos).",
"",
"## 2. XGBoost feature importance (gain)",
"",
f"Config used: `{XGB_BASE_PARAMS}`.",
"Quick mode: " + ("yes (200 trees)" if args.quick else "no (full config)"),
"",
"From the trained XGBoost checkpoint (gain on the 10 features):",
"",
"| Feature | Gain |",
"|---------|------|",
]
if imp:
for name in FEATURES:
lines.append(f"| {name} | {imp.get(name, 0):.2f} |")
order = sorted(imp.items(), key=lambda x: -x[1])
lines.append("")
lines.append(f"**Top 5 by gain:** {', '.join(x[0] for x in order[:5])}.")
else:
lines.append("(Run with XGBoost checkpoint to populate.)")
lines.extend([
"",
"## 3. Leave-one-feature-out ablation (LOPO)",
"",
f"Baseline (all 10 features) mean LOPO F1: **{baseline:.4f}**.",
"",
])
if ablation is None:
lines.append("Skipped in this run (`--skip-lofo`).")
else:
lines.extend([
"| Feature dropped | Mean LOPO F1 | Δ vs baseline |",
"|------------------|--------------|---------------|",
])
for feat in FEATURES:
delta = baseline - ablation[feat]
lines.append(f"| {feat} | {ablation[feat]:.4f} | {delta:+.4f} |")
lines.append("")
lines.append(f"Dropping **{worst_drop[0]}** hurts most (F1={worst_drop[1]:.4f}), consistent with it being important.")
lines.append("")
lines.append("## 4. Channel ablation (LOPO)")
lines.append("")
if channel_f1 is None:
lines.append("Skipped in this run (`--skip-channel`).")
else:
lines.append("| Subset | Mean LOPO F1 |")
lines.append("|--------|--------------|")
for name in ["head_pose", "eye_state", "gaze", "all_10"]:
lines.append(f"| {name} | {channel_f1[name]:.4f} |")
lines.append("")
lines.append("## 5. Conclusion")
lines.append("")
if ablation is None:
lines.append("Selection is supported by (1) domain rationale (three attention channels), (2) XGBoost gain importance, and (3) channel ablation. Run without `--skip-lofo` for full leave-one-out ablation.")
else:
lines.append("Selection is supported by (1) domain rationale (three attention channels), (2) XGBoost gain importance, and (3) leave-one-out ablation. SHAP or correlation-based pruning can be added in future work.")
lines.append("")
with open(out_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
print(f"\nReport written to {out_path}")
if __name__ == "__main__":
main()