temporal-twins-code / models /xgboost_model.py
temporal-twins-anon's picture
Add anonymous Temporal Twins code release
a3682cf verified
"""
models/xgboost_model.py
=======================
Leakage-free XGBoost baseline trained on causal node-prefix features.
The baseline intentionally uses the real `xgboost.XGBClassifier` only.
It does not rely on multiprocessing or sklearn substitutes.
"""
from __future__ import annotations
from typing import List
import numpy as np
import pandas as pd
from xgboost import XGBClassifier
from models.base import TemporalModel
# Columns that must never reach a learned baseline
_BLOCKED_COLS = frozenset({
"motif_hit_count", "motif_source", "trigger_event_idx", "label_event_idx",
"label_delay", "is_fallback_label", "fraud_source",
"twin_role", "twin_label", "twin_pair_id", "template_id",
"dynamic_fraud_state", "motif_chain_state", "motif_strength",
})
class XGBoostWrapper(TemporalModel):
"""XGBoost baseline with node-level prefix aggregates."""
def __init__(self, n_estimators: int = 200, max_depth: int = 6):
self.n_estimators = n_estimators
self.max_depth = max_depth
self._model: XGBClassifier | None = None
self._constant_prob: float | None = None
self._feature_names: List[str] = []
@property
def name(self) -> str:
return "XGBoost"
@property
def is_temporal(self) -> bool:
return False
@staticmethod
def _extract_features(df: pd.DataFrame) -> pd.DataFrame:
"""Causal node-level aggregation from a sorted prefix only."""
leaked = _BLOCKED_COLS & set(df.columns)
assert not leaked, f"Oracle columns leaked into XGBoost: {leaked}"
df = df.sort_values("timestamp").reset_index(drop=True).copy()
df["_td"] = df.groupby("sender_id")["timestamp"].diff().fillna(0.0)
df["_rc10"] = (
df.groupby("sender_id")["timestamp"]
.transform(lambda x: x.rolling(10, min_periods=1).count())
)
grp = df.groupby("sender_id")
feats = pd.DataFrame({
"txn_count": grp["sender_id"].count(),
"txn_cnt10_last": grp["_rc10"].last(),
"amount_mean": grp["amount"].mean(),
"amount_std": grp["amount"].std().fillna(0.0),
"amount_max": grp["amount"].max(),
"td_mean": grp["_td"].mean(),
"td_std": grp["_td"].std().fillna(0.0),
"fail_rate": grp["failed"].mean() if "failed" in df.columns else 0.0,
"retry_rate": grp["is_retry"].mean() if "is_retry" in df.columns else 0.0,
})
pair_counts = (
df.groupby(["sender_id", "receiver_id"])
.size()
.reset_index(name="_n")
)
pair_counts["_tot"] = pair_counts.groupby("sender_id")["_n"].transform("sum")
pair_counts["_p"] = pair_counts["_n"] / pair_counts["_tot"]
pair_counts["_h"] = -pair_counts["_p"] * np.log2(pair_counts["_p"] + 1e-9)
feats["recv_entropy"] = pair_counts.groupby("sender_id")["_h"].sum()
if "pair_freq" in df.columns:
feats["pair_freq_mean"] = grp["pair_freq"].mean()
else:
feats["pair_freq_mean"] = 0.0
return feats.fillna(0.0)
def fit(self, df_train: pd.DataFrame, num_epochs: int = 3) -> None:
"""No-op backbone step; actual supervised fit happens on a training prefix."""
self._model = None
self._constant_prob = None
self._feature_names = []
def train_node_classifier_on_prefix(
self,
df_prefix: pd.DataFrame,
eval_nodes: List[int],
y_labels: np.ndarray,
num_epochs: int = 150,
) -> None:
X = self._extract_features(df_prefix).reindex(eval_nodes).fillna(0.0)
y = np.asarray(y_labels, dtype=np.int64)
self._feature_names = list(X.columns)
if len(np.unique(y)) < 2:
self._model = None
self._constant_prob = float(y.mean()) if len(y) else 0.0
return
scale_pos_weight = max(1.0, float((y == 0).sum()) / max(float((y == 1).sum()), 1.0))
self._model = XGBClassifier(
n_estimators=self.n_estimators,
max_depth=self.max_depth,
learning_rate=0.05,
objective="binary:logistic",
eval_metric="logloss",
scale_pos_weight=scale_pos_weight,
random_state=42,
verbosity=0,
n_jobs=1,
tree_method="exact",
)
self._model.fit(X.values.astype(np.float32), y)
self._constant_prob = None
# Print top-5 feature importances for static shortcut audit
importances = self._model.feature_importances_
ranked = np.argsort(importances)[::-1]
feat_names = list(X.columns)
print(" [XGBoost] Top-5 feature importances:")
for i in ranked[:5]:
print(f" {feat_names[i]:<20}: {importances[i]:.4f}")
def predict(self, df_eval: pd.DataFrame, eval_nodes: List[int]) -> np.ndarray:
X_eval = self._extract_features(df_eval).reindex(eval_nodes).fillna(0.0)
if self._constant_prob is not None:
return np.full(len(eval_nodes), self._constant_prob, dtype=np.float32)
assert self._model is not None, "Call train_node_classifier_on_prefix() first."
probs = self._model.predict_proba(X_eval.values.astype(np.float32))[:, 1]
return np.asarray(probs, dtype=np.float32)
def reset_memory(self) -> None:
"""No-op: XGBoost has no temporal memory."""
pass