Portfolio-Risk-Platform / backend /services /explainability_engine.py
sheikhkmmtahmid's picture
Fix mixed date format crash (pd.to_datetime format='mixed')
5f9784b
"""
Phase 8 – Explainability Engine (SHAP + ElasticNet Coefficients)
Consumes outputs from Phase 6 (trained models + preprocessors + test data).
No model retraining. No feature engineering.
Produces:
data/explainability/
shap_global_xgb_{asset}.csv - mean |SHAP| per feature (XGBoost)
shap_global_en_{asset}.csv - mean |SHAP| per feature (ElasticNet)
shap_values_xgb_{asset}.csv - full SHAP matrix, test set (XGBoost)
shap_values_en_{asset}.csv - full SHAP matrix, test set (ElasticNet)
elastic_net_coefficients_{asset}.csv - actual EN coefficients from pkl model
feature_importance_comparison.csv - unified table: XGB gain | SHAP | EN coeff
latest_prediction_explanation.json - per-asset local explanation, most recent row
explainability_report.json - full summary
"""
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import joblib
import numpy as np
import pandas as pd
import shap
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger(__name__)
ASSETS = ["spx", "ndx", "gold", "btc"]
MONTHS_PER_YEAR = 12
class _Timer:
def __init__(self) -> None:
self._t0 = time.time()
def log(self, tag: str, msg: str) -> None:
elapsed = time.time() - self._t0
m, s = divmod(int(elapsed), 60)
h, m = divmod(m, 60)
logger.info(f"[{h:02d}:{m:02d}:{s:02d}] [{tag}] {msg}")
class ExplainabilityEngine:
"""
Phase 8: SHAP-based explainability + ElasticNet coefficient analysis.
Loads Phase 6 pkl models + preprocessors, computes SHAP values on the
out-of-sample test set, and extracts EN coefficients from the model objects.
"""
def __init__(self, backend_root: str | Path) -> None:
self.root = Path(backend_root)
self.timer = _Timer()
self.models_dir = self.root / "models" / "phase6"
self.cache_dir = self.models_dir / "training_cache"
self.output_dir = self.root / "data" / "explainability"
self.output_dir.mkdir(parents=True, exist_ok=True)
# ── Public API ────────────────────────────────────────────────────────────
def run(self) -> dict:
self.timer.log("START", "Phase 8 Explainability started")
report: dict = {"assets": {}, "cross_asset_summary": {}}
comparison_rows: List[dict] = []
for asset in ASSETS:
self.timer.log(asset.upper(), f"Processing {asset.upper()}")
result = self._process_asset(asset)
report["assets"][asset] = result
comparison_rows.extend(result.pop("_comparison_rows"))
# Cross-asset feature importance comparison
comparison_df = pd.DataFrame(comparison_rows)
comparison_df.to_csv(self.output_dir / "feature_importance_comparison.csv", index=False)
self.timer.log("COMPARE", f"Feature importance comparison saved ({len(comparison_df)} rows)")
# Cross-asset summary: which features matter most on average
report["cross_asset_summary"] = self._cross_asset_summary(comparison_df)
# Write full report
with open(self.output_dir / "explainability_report.json", "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, default=str)
self.timer.log("END", "Phase 8 completed successfully")
return report
# ── Per-asset pipeline ────────────────────────────────────────────────────
def _process_asset(self, asset: str) -> dict:
# Load artifacts
feature_cols = self._load_feature_columns(asset)
test_df = self._load_test_snapshot(asset)
X_test_raw, y_test = self._split_xy(test_df, asset)
# ── ElasticNet ───────────────────────────────────────────────────────
en_model, en_pre = self._load_model(asset, "elastic_net")
X_test_en = en_pre.transform(X_test_raw)
transformed_cols = self._get_transformed_columns(asset)
en_coeffs_df = self._extract_en_coefficients(en_model, transformed_cols, asset)
shap_en_df, shap_en_global = self._compute_shap_linear(
en_model, en_pre, X_test_raw, transformed_cols, asset
)
# ── XGBoost ──────────────────────────────────────────────────────────
xgb_model, xgb_pre = self._load_model(asset, "xgboost")
X_test_xgb = xgb_pre.transform(X_test_raw)
shap_xgb_df, shap_xgb_global = self._compute_shap_tree(
xgb_model, X_test_xgb, transformed_cols, asset
)
xgb_gain_df = self._load_xgb_gain_importance(asset)
# ── Latest-prediction local explanation ──────────────────────────────
local_explanation = self._explain_latest_prediction(
asset, X_test_raw, y_test,
en_model, en_pre, shap_en_df,
xgb_model, xgb_pre, shap_xgb_df,
transformed_cols,
)
# ── Comparison rows for cross-asset table ────────────────────────────
comparison_rows = self._build_comparison_rows(
asset, transformed_cols, shap_xgb_global, shap_en_global, en_coeffs_df, xgb_gain_df
)
self.timer.log(asset.upper(), "Done")
return {
"en_active_features": int((en_coeffs_df["coefficient"] != 0).sum()),
"en_total_features": len(en_coeffs_df),
"xgb_top_feature": shap_xgb_global.iloc[0]["feature"] if not shap_xgb_global.empty else None,
"en_top_feature": shap_en_global.iloc[0]["feature"] if not shap_en_global.empty and shap_en_global.iloc[0]["mean_abs_shap"] > 0 else "all_zero",
"latest_explanation": local_explanation,
"_comparison_rows": comparison_rows,
}
# ── SHAP: XGBoost (TreeExplainer) ────────────────────────────────────────
def _compute_shap_tree(
self,
model,
X_transformed: np.ndarray,
feature_names: List[str],
asset: str,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_transformed) # (n_test, n_features)
shap_df = pd.DataFrame(shap_values, columns=feature_names)
shap_df.to_csv(self.output_dir / f"shap_values_xgb_{asset}.csv", index=False)
global_df = pd.DataFrame({
"feature": feature_names,
"mean_abs_shap": np.abs(shap_values).mean(axis=0),
}).sort_values("mean_abs_shap", ascending=False).reset_index(drop=True)
global_df.to_csv(self.output_dir / f"shap_global_xgb_{asset}.csv", index=False)
self.timer.log(asset.upper(), f"XGBoost SHAP computed β€” top feature: {global_df.iloc[0]['feature']} ({global_df.iloc[0]['mean_abs_shap']:.5f})")
return shap_df, global_df
# ── SHAP: ElasticNet (LinearExplainer) ───────────────────────────────────
def _compute_shap_linear(
self,
model,
preprocessor,
X_raw: pd.DataFrame,
feature_names: List[str],
asset: str,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
X_transformed = preprocessor.transform(X_raw)
# Background = training data mean (masker for linear models)
explainer = shap.LinearExplainer(model, X_transformed, feature_perturbation="interventional")
shap_values = explainer.shap_values(X_transformed) # (n_test, n_features)
shap_df = pd.DataFrame(shap_values, columns=feature_names)
shap_df.to_csv(self.output_dir / f"shap_values_en_{asset}.csv", index=False)
global_df = pd.DataFrame({
"feature": feature_names,
"mean_abs_shap": np.abs(shap_values).mean(axis=0),
}).sort_values("mean_abs_shap", ascending=False).reset_index(drop=True)
global_df.to_csv(self.output_dir / f"shap_global_en_{asset}.csv", index=False)
top = global_df.iloc[0]
self.timer.log(asset.upper(), f"ElasticNet SHAP computed β€” top feature: {top['feature']} ({top['mean_abs_shap']:.5f})")
return shap_df, global_df
# ── ElasticNet coefficients ───────────────────────────────────────────────
def _extract_en_coefficients(
self,
model,
feature_names: List[str],
asset: str,
) -> pd.DataFrame:
"""
Extract actual coefficients from the ElasticNetCV pkl object.
The saved CSV has all-zeros due to heavy regularization β€” the pkl has the truth.
"""
coef = np.asarray(model.coef_).flatten()
intercept = float(model.intercept_) if hasattr(model, "intercept_") else 0.0
df = pd.DataFrame({
"feature": feature_names,
"coefficient": coef,
"abs_coefficient": np.abs(coef),
"direction": np.where(coef > 0, "positive", np.where(coef < 0, "negative", "zero")),
}).sort_values("abs_coefficient", ascending=False).reset_index(drop=True)
df.attrs["intercept"] = intercept
# Add intercept row at end
intercept_row = pd.DataFrame([{
"feature": "__intercept__",
"coefficient": intercept,
"abs_coefficient": abs(intercept),
"direction": "positive" if intercept >= 0 else "negative",
}])
df = pd.concat([df, intercept_row], ignore_index=True)
df.to_csv(self.output_dir / f"elastic_net_coefficients_{asset}.csv", index=False)
n_active = int((np.abs(coef) > 0).sum())
self.timer.log(asset.upper(), f"EN coefficients extracted β€” {n_active}/{len(coef)} active, intercept={intercept:.6f}")
return df
# ── Latest-prediction local explanation ──────────────────────────────────
def _explain_latest_prediction(
self,
asset: str,
X_test_raw: pd.DataFrame,
y_test: pd.Series,
en_model, en_pre,
shap_en_df: pd.DataFrame,
xgb_model, xgb_pre,
shap_xgb_df: pd.DataFrame,
feature_names: List[str],
) -> dict:
"""
Explain the most recent (last row of test set) prediction for each model.
Returns top-5 drivers from each model.
"""
last_idx = -1 # most recent test observation
# Raw feature values
raw_values = X_test_raw.iloc[last_idx].to_dict()
# EN prediction + SHAP
X_last_en = en_pre.transform(X_test_raw.iloc[[last_idx]])
en_pred = float(en_model.predict(X_last_en)[0])
en_shap_last = shap_en_df.iloc[last_idx].to_dict()
en_top5 = sorted(en_shap_last.items(), key=lambda x: abs(x[1]), reverse=True)[:5]
# XGB prediction + SHAP
X_last_xgb = xgb_pre.transform(X_test_raw.iloc[[last_idx]])
xgb_pred = float(xgb_model.predict(X_last_xgb)[0])
xgb_shap_last = shap_xgb_df.iloc[last_idx].to_dict()
xgb_top5 = sorted(xgb_shap_last.items(), key=lambda x: abs(x[1]), reverse=True)[:5]
actual = float(y_test.iloc[last_idx])
explanation = {
"asset": asset,
"actual_return": round(actual, 6),
"elastic_net": {
"prediction": round(en_pred, 6),
"error": round(en_pred - actual, 6),
"top_drivers": [
{"feature": f, "shap_value": round(v, 6), "direction": "bullish" if v > 0 else "bearish"}
for f, v in en_top5
],
},
"xgboost": {
"prediction": round(xgb_pred, 6),
"error": round(xgb_pred - actual, 6),
"top_drivers": [
{"feature": f, "shap_value": round(v, 6), "direction": "bullish" if v > 0 else "bearish"}
for f, v in xgb_top5
],
},
"raw_feature_snapshot": {k: round(float(v), 6) if isinstance(v, (int, float, np.floating)) else str(v)
for k, v in raw_values.items()},
}
return explanation
# ── Cross-asset summary ───────────────────────────────────────────────────
def _cross_asset_summary(self, comparison_df: pd.DataFrame) -> dict:
"""Average XGB SHAP importance across all assets β€” universal drivers."""
if "xgb_mean_abs_shap" not in comparison_df.columns:
return {}
avg = (
comparison_df.groupby("feature")["xgb_mean_abs_shap"]
.mean()
.sort_values(ascending=False)
.reset_index()
)
avg.columns = ["feature", "avg_xgb_shap_across_assets"]
avg.to_csv(self.output_dir / "cross_asset_shap_importance.csv", index=False)
return {
"top5_universal_drivers": avg.head(5).to_dict(orient="records"),
"note": "Average XGBoost SHAP importance across SPX, NDX, Gold",
}
# ── Comparison table builder ──────────────────────────────────────────────
def _build_comparison_rows(
self,
asset: str,
feature_names: List[str],
shap_xgb_global: pd.DataFrame,
shap_en_global: pd.DataFrame,
en_coeffs_df: pd.DataFrame,
xgb_gain_df: pd.DataFrame,
) -> List[dict]:
# Build lookup dicts
xgb_shap_map = dict(zip(shap_xgb_global["feature"], shap_xgb_global["mean_abs_shap"]))
en_shap_map = dict(zip(shap_en_global["feature"], shap_en_global["mean_abs_shap"]))
en_coeff_map = {}
for _, row in en_coeffs_df.iterrows():
if row["feature"] != "__intercept__":
en_coeff_map[row["feature"]] = row["coefficient"]
xgb_gain_map = {}
if xgb_gain_df is not None:
xgb_gain_map = dict(zip(xgb_gain_df["feature"], xgb_gain_df["importance"]))
rows = []
for feat in feature_names:
rows.append({
"asset": asset,
"feature": feat,
"xgb_gain": round(xgb_gain_map.get(feat, 0.0), 6),
"xgb_mean_abs_shap": round(xgb_shap_map.get(feat, 0.0), 6),
"en_mean_abs_shap": round(en_shap_map.get(feat, 0.0), 6),
"en_coefficient": round(en_coeff_map.get(feat, 0.0), 6),
"en_direction": ("positive" if en_coeff_map.get(feat, 0.0) > 0
else "negative" if en_coeff_map.get(feat, 0.0) < 0
else "zero"),
})
return rows
# ── Helpers ───────────────────────────────────────────────────────────────
def _load_model(self, asset: str, model_type: str):
if model_type == "elastic_net":
model_path = self.models_dir / f"elastic_net_{asset}.pkl"
pre_path = self.models_dir / f"elastic_net_{asset}_preprocessor.pkl"
else:
model_path = self.models_dir / f"xgb_{asset}.pkl"
pre_path = self.models_dir / f"xgb_{asset}_preprocessor.pkl"
model = joblib.load(model_path)
pre = joblib.load(pre_path)
return model, pre
def _load_test_snapshot(self, asset: str) -> pd.DataFrame:
path = self.cache_dir / f"test_snapshot_{asset}.csv"
df = pd.read_csv(path)
df["date"] = pd.to_datetime(df["date"], format="mixed")
return df.sort_values("date").reset_index(drop=True)
def _load_feature_columns(self, asset: str) -> List[str]:
path = self.models_dir / f"elastic_net_{asset}_feature_columns.json"
with open(path, encoding="utf-8") as f:
data = json.load(f)
return data["raw_feature_columns"]
def _get_transformed_columns(self, asset: str) -> List[str]:
path = self.models_dir / f"elastic_net_{asset}_feature_columns.json"
with open(path, encoding="utf-8") as f:
data = json.load(f)
return data["transformed_feature_columns"]
def _split_xy(self, df: pd.DataFrame, asset: str) -> Tuple[pd.DataFrame, pd.Series]:
target_col = f"{asset}_target"
raw_cols = self._load_feature_columns(asset)
# Drop non-feature columns
drop_cols = [c for c in df.columns if c not in raw_cols and c != target_col]
X = df[raw_cols].copy()
y = df[target_col].copy()
return X, y
def _load_xgb_gain_importance(self, asset: str) -> Optional[pd.DataFrame]:
path = self.models_dir / "feature_importance" / f"xgb_{asset}_importance.csv"
if path.exists():
return pd.read_csv(path)
return None