forensic-shell / server /grader.py
yashppawar's picture
Upload folder using huggingface_hub
6f6baad verified
"""
Deterministic graders for ForensicShell tasks.
Each grader takes a submitted ForensicReport (as dict) and the scenario ground-truth
dict and returns a float in [0.0, 1.0]. Partial credit is awarded per correct subfield
so the reward function has meaningful gradient, not just 0/1.
Design choices:
- modified_files uses F0.5 (precision-weighted) instead of Jaccard: submitting
false-positive files (claiming an unmodified file was attacked) is penalized
more than missing a file. This mirrors real forensics where false positives
waste incident response effort.
- Timeline scoring is multiplicative (phase_F1 * ordering): having all 5 phases
in the wrong order scores 0, not ~0.30. Correct phases AND correct order
required for full credit.
"""
from typing import Dict, List
def _safe_str(x) -> str:
return (x or "").strip().lower() if isinstance(x, str) else ""
def _fbeta(pred: List[str], truth: List[str], beta: float = 0.5) -> float:
"""
F-beta score over string sets. beta < 1 weighs precision more than recall.
F0.5 penalizes false positives (extra wrong files) 2x harder than false
negatives (missing files), matching real forensic triage priorities.
"""
pred_set = {s.strip() for s in pred if isinstance(s, str) and s.strip()}
truth_set = {s.strip() for s in truth if isinstance(s, str) and s.strip()}
if not pred_set and not truth_set:
return 1.0
if not pred_set or not truth_set:
return 0.0
tp = len(pred_set & truth_set)
precision = tp / len(pred_set)
recall = tp / len(truth_set)
if precision + recall == 0:
return 0.0
beta2 = beta * beta
return (1 + beta2) * precision * recall / (beta2 * precision + recall)
def _kendall_tau_normalized(pred_order: List[str], true_order: List[str]) -> float:
"""
Normalized Kendall-tau in [0, 1] where 1.0 == identical ordering restricted to the
overlap set. If fewer than 2 shared phases, returns 1.0 (nothing to order).
"""
overlap = [p for p in pred_order if p in true_order]
# Keep only first occurrence of each overlap item in prediction
seen = set()
pred_overlap: List[str] = []
for p in overlap:
if p not in seen:
pred_overlap.append(p)
seen.add(p)
true_overlap = [p for p in true_order if p in seen]
n = len(pred_overlap)
if n < 2:
return 1.0
true_rank = {p: i for i, p in enumerate(true_overlap)}
concordant = 0
discordant = 0
for i in range(n):
for j in range(i + 1, n):
a = true_rank[pred_overlap[i]]
b = true_rank[pred_overlap[j]]
if a < b:
concordant += 1
elif a > b:
discordant += 1
total = concordant + discordant
if total == 0:
return 1.0
tau = (concordant - discordant) / total # in [-1, 1]
return (tau + 1.0) / 2.0 # normalize to [0, 1]
def _grade_t1_login(report: Dict, truth: Dict) -> float:
user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0
ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0
return 0.5 * user_ok + 0.5 * ip_ok
def _grade_t2_modified(report: Dict, truth: Dict) -> float:
user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0
ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0
# F0.5: precision-weighted — false positives penalized harder than false negatives
files_score = _fbeta(report.get("modified_files") or [], truth.get("modified_files") or [], beta=0.5)
sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0
return 0.2 * user_ok + 0.2 * ip_ok + 0.3 * files_score + 0.3 * sha_ok
def _grade_t3_timeline(report: Dict, truth: Dict) -> float:
user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0
ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0
# F0.5 for files (same precision-weighting as t2)
files_score = _fbeta(report.get("modified_files") or [], truth.get("modified_files") or [], beta=0.5)
sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0
pred_timeline = report.get("timeline") or []
true_timeline = truth.get("timeline") or []
pred_phases = [
(e.get("phase") if isinstance(e, dict) else getattr(e, "phase", None))
for e in pred_timeline
]
pred_phases = [p for p in pred_phases if isinstance(p, str)]
true_phases = [e["phase"] for e in true_timeline]
# F1 over phase set (standard F1 — we don't precision-weight phases)
pred_set = set(pred_phases)
true_set = set(true_phases)
if not pred_set and not true_set:
phase_f1 = 1.0
elif not pred_set or not true_set:
phase_f1 = 0.0
else:
tp = len(pred_set & true_set)
precision = tp / len(pred_set)
recall = tp / len(true_set)
phase_f1 = 0.0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall)
# Ordering quality
order_score = _kendall_tau_normalized(pred_phases, true_phases)
# MULTIPLICATIVE timeline scoring: having all phases in wrong order gives
# F1=1.0 * tau=0.0 = 0.0, not the ~0.30 an additive scheme would produce.
# Correct phases AND correct order both required for full timeline credit.
timeline_score = phase_f1 * order_score
return (
0.15 * user_ok
+ 0.15 * ip_ok
+ 0.15 * files_score
+ 0.15 * sha_ok
+ 0.40 * timeline_score
)
GRADERS = {
"t1_login": _grade_t1_login,
"t2_modified": _grade_t2_modified,
"t3_timeline": _grade_t3_timeline,
}
def _grade_generic(report: Dict, truth: Dict) -> float:
"""
Dispatcher for procedurally generated scenarios. Picks the right sub-grader
by inspecting which fields are present in the ground-truth dict.
"""
if "timeline" in truth:
return _grade_t3_timeline(report, truth)
if "backdoor_sha256" in truth:
return _grade_t2_modified(report, truth)
return _grade_t1_login(report, truth)
def grade(task_id: str, report: Dict, truth: Dict) -> float:
"""Dispatch to the right grader for this task. Returns float in [0.0, 1.0]."""
if task_id and task_id.startswith("gen_"):
fn = _grade_generic
else:
fn = GRADERS.get(task_id)
if fn is None:
return 0.0
score = fn(report or {}, truth or {})
if score < 0.0:
return 0.0
if score > 1.0:
return 1.0
return float(score)