| | |
| | import os |
| | import sys |
| | import types |
| | import json |
| | import traceback |
| | from typing import Any, Dict, List, Tuple |
| |
|
| | import joblib |
| | import numpy as np |
| | import pandas as pd |
| |
|
| | |
| | |
| | |
| | class XGBWrappedModel: |
| | """ |
| | Wrapper saved in model.joblib: |
| | - preprocessor_: sklearn ColumnTransformer |
| | - model_: XGBClassifier (or similar exposing predict_proba) |
| | - explainer_: optional SHAP explainer |
| | - feature_names_out_: names after preprocessing |
| | Provides: |
| | - predict_proba(X_df) |
| | - top_contrib(X_df, k) |
| | """ |
| | def __init__(self, preprocessor=None, booster=None, explainer=None, |
| | feat_names_out=None, cat_prefix="cat__", num_prefix="num__"): |
| | self.preprocessor_ = preprocessor |
| | self.model_ = booster |
| | self.explainer_ = explainer |
| | self.feature_names_out_ = np.array(feat_names_out).astype(str) if feat_names_out is not None else None |
| | self.cat_prefix = cat_prefix |
| | self.num_prefix = num_prefix |
| |
|
| | def predict_proba(self, X_df: pd.DataFrame): |
| | Z = self.preprocessor_.transform(X_df) |
| | |
| | return self.model_.predict_proba(Z) |
| |
|
| | def top_contrib(self, X_df: pd.DataFrame, k: int = 5) -> Tuple[List[str], List[float]]: |
| | if self.explainer_ is None: |
| | return [], [] |
| | Z = self.preprocessor_.transform(X_df) |
| | try: |
| | sv = self.explainer_.shap_values(Z) |
| | if isinstance(sv, list): |
| | sv = sv[1] if len(sv) > 1 else sv[0] |
| | except Exception: |
| | res = self.explainer_(Z) |
| | sv = res.values |
| | sv_row = np.array(sv[0], dtype=float) |
| |
|
| | def to_orig(name: str) -> str: |
| | if name.startswith(self.cat_prefix): |
| | return name[len(self.cat_prefix):].split("_", 1)[0] |
| | if name.startswith(self.num_prefix): |
| | return name[len(self.num_prefix):] |
| | return name.split("_", 1)[0] |
| |
|
| | if self.feature_names_out_ is None: |
| | names_out = [f"f{i}" for i in range(len(sv_row))] |
| | else: |
| | names_out = list(self.feature_names_out_) |
| |
|
| | orig_names = [to_orig(n) for n in names_out] |
| | abs_sum: Dict[str, float] = {} |
| | signed_sum: Dict[str, float] = {} |
| | for n, v in zip(orig_names, sv_row): |
| | abs_sum[n] = abs_sum.get(n, 0.0) + abs(float(v)) |
| | signed_sum[n] = signed_sum.get(n, 0.0) + float(v) |
| |
|
| | ranked = sorted(abs_sum.items(), key=lambda kv: kv[1], reverse=True)[:k] |
| | names = [n for n, _ in ranked] |
| | values = [signed_sum[n] for n, _ in ranked] |
| | return names, values |
| |
|
| | |
| | |
| | sys.modules['__main__'].__dict__['XGBWrappedModel'] = XGBWrappedModel |
| | if 'train_export_xgb' not in sys.modules: |
| | sys.modules['train_export_xgb'] = types.ModuleType('train_export_xgb') |
| | sys.modules['train_export_xgb'].__dict__['XGBWrappedModel'] = XGBWrappedModel |
| |
|
| |
|
| | |
| | |
| | |
| | NUMERIC_COLS = ["age", "avg_glucose_level", "bmi", "hypertension", "heart_disease"] |
| | |
| | CAT_COLS = ["gender", "ever_married", "work_type", "smoking_status", "Residence_type"] |
| | ALL_CANON = NUMERIC_COLS + CAT_COLS |
| |
|
| | EXPLAIN_ORDER = [ |
| | "age", "avg_glucose_level", "bmi", "hypertension", "heart_disease", |
| | "gender", "ever_married", "work_type", "smoking_status", "Residence_type" |
| | ] |
| |
|
| |
|
| | |
| | |
| | |
| | def _to_int01(x: Any) -> int: |
| | if isinstance(x, (bool, np.bool_)): |
| | return int(bool(x)) |
| | try: |
| | if isinstance(x, str): |
| | s = x.strip().lower() |
| | if s in {"1", "true", "t", "yes", "y"}: |
| | return 1 |
| | if s in {"0", "false", "f", "no", "n"}: |
| | return 0 |
| | return int(float(x)) |
| | except Exception: |
| | return 0 |
| |
|
| |
|
| | def _coerce_dataframe(rows: List[Dict[str, Any]]) -> pd.DataFrame: |
| | """ |
| | Build a clean DataFrame: |
| | - Canonical Residence key is 'Residence_type' (capital R). |
| | - Accept 'residence_type' and map it to 'Residence_type' if needed. |
| | - Ensure numerics are float64 and 0/1 flags are ints then float64. |
| | - Ensure categoricals are plain strings (object), no NA. |
| | - Also mirror lowercase 'residence_type' for legacy models. |
| | """ |
| | norm_rows: List[Dict[str, Any]] = [] |
| | for r in rows: |
| | r = dict(r or {}) |
| | if "Residence_type" not in r and "residence_type" in r: |
| | r["Residence_type"] = r["residence_type"] |
| | entry = {k: r.get(k, None) for k in ALL_CANON} |
| | norm_rows.append(entry) |
| |
|
| | df = pd.DataFrame(norm_rows, columns=ALL_CANON) |
| |
|
| | for col in ["hypertension", "heart_disease"]: |
| | df[col] = df[col].map(_to_int01) |
| |
|
| | for col in ["age", "avg_glucose_level", "bmi"]: |
| | df[col] = pd.to_numeric(df[col], errors="coerce") |
| |
|
| | df[NUMERIC_COLS] = df[NUMERIC_COLS].astype("float64") |
| |
|
| | for col in CAT_COLS: |
| | df[col] = df[col].where(df[col].notna(), "Unknown") |
| | df[col] = df[col].map(lambda v: "Unknown" if v is None else str(v)).astype(object) |
| |
|
| | |
| | df["residence_type"] = df["Residence_type"].astype(object) |
| |
|
| | return df |
| |
|
| |
|
| | |
| | |
| | |
| | def _iter_estimators(est): |
| | yield est |
| | if hasattr(est, "named_steps"): |
| | for step in est.named_steps.values(): |
| | yield from _iter_estimators(step) |
| | if hasattr(est, "transformers"): |
| | for _, tr, _ in est.transformers: |
| | yield from _iter_estimators(tr) |
| |
|
| |
|
| | def _numeric_like(x) -> bool: |
| | if x is None: |
| | return True |
| | if isinstance(x, (int, np.integer, float, np.floating)): |
| | return True |
| | if isinstance(x, str): |
| | try: |
| | float(x) |
| | return True |
| | except Exception: |
| | return False |
| | return False |
| |
|
| |
|
| | def _sanitize_onehot_categories(model): |
| | """Coerce OneHotEncoder.categories_ to consistent dtypes to avoid np.isnan crashes.""" |
| | try: |
| | from sklearn.preprocessing import OneHotEncoder |
| | except Exception: |
| | OneHotEncoder = None |
| |
|
| | if OneHotEncoder is None: |
| | return |
| |
|
| | for node in _iter_estimators(model): |
| | if isinstance(node, OneHotEncoder) and hasattr(node, "categories_"): |
| | new_cats = [] |
| | for cats in node.categories_: |
| | arr = np.asarray(cats, dtype=object) |
| | if all(_numeric_like(v) for v in arr): |
| | vals = [] |
| | for v in arr: |
| | try: |
| | vals.append(np.nan if v is None else float(v)) |
| | except Exception: |
| | vals.append(np.nan) |
| | new_cats.append(np.asarray(vals, dtype=float)) |
| | else: |
| | strs = ["Unknown" if (v is None or (isinstance(v, float) and np.isnan(v))) else str(v) for v in arr] |
| | new_cats.append(np.asarray(strs, dtype=object)) |
| | node.categories_ = new_cats |
| | if hasattr(node, "handle_unknown"): |
| | node.handle_unknown = "ignore" |
| |
|
| |
|
| | def _patch_check_unknown(): |
| | """Patch sklearn _check_unknown to avoid np.isnan on object arrays (older builds).""" |
| | try: |
| | from sklearn.utils import _encode |
| | _orig = _encode._check_unknown |
| |
|
| | def _safe_check_unknown(values, known_values, return_mask=False): |
| | try: |
| | return _orig(values, known_values, return_mask=return_mask) |
| | except TypeError: |
| | vals = np.asarray(values, dtype=object) |
| | known = np.asarray(known_values, dtype=object) |
| | mask = np.isin(vals, known, assume_unique=False) |
| | diff = vals[~mask] |
| | if return_mask: |
| | return diff, mask |
| | return diff |
| |
|
| | _encode._check_unknown = _safe_check_unknown |
| | print("[handler] Patched sklearn.utils._encode._check_unknown", flush=True) |
| | except Exception as e: |
| | print(f"[handler] Patch for _check_unknown not applied: {e}", flush=True) |
| |
|
| |
|
| | |
| | |
| | |
| | def _introspect_model(model) -> Dict[str, Any]: |
| | info: Dict[str, Any] = {"type": str(type(model))} |
| | try: |
| | if hasattr(model, "named_steps"): |
| | info["pipeline_steps"] = list(model.named_steps.keys()) |
| | for name, step in model.named_steps.items(): |
| | if step.__class__.__name__ == "ColumnTransformer": |
| | info["column_transformer"] = str(step) |
| | try: |
| | info["transformers_"] = [(n, str(t.__class__), cols) for (n, t, cols) in step.transformers] |
| | except Exception: |
| | pass |
| | except Exception: |
| | pass |
| | try: |
| | info["feature_names_in_"] = list(getattr(model, "feature_names_in_", [])) |
| | except Exception: |
| | pass |
| | return info |
| |
|
| |
|
| | |
| | |
| | |
| | class EndpointHandler: |
| | def __init__(self, path: str = "/repository") -> None: |
| | _patch_check_unknown() |
| |
|
| | model_path = os.path.join(path, "model.joblib") |
| | self.model = joblib.load(model_path) |
| |
|
| | try: |
| | self.threshold = float(os.getenv("THRESHOLD", "0.38")) |
| | except Exception: |
| | self.threshold = 0.38 |
| |
|
| | self.explainer = getattr(self.model, "explainer_", None) |
| |
|
| | _sanitize_onehot_categories(self.model) |
| |
|
| | print("[handler] Model loaded", flush=True) |
| | print(f"[handler] Using threshold: {self.threshold}", flush=True) |
| |
|
| | def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| | debug = bool(data.get("debug", False)) |
| | explain = bool(data.get("explain", False)) |
| |
|
| | rows = data.get("inputs") or [] |
| | if isinstance(rows, dict): |
| | rows = [rows] |
| | if not isinstance(rows, list) or not rows: |
| | return {"error": "inputs must be a non-empty list of records", "threshold": self.threshold} |
| |
|
| | df = _coerce_dataframe(rows) |
| |
|
| | debug_info = { |
| | "columns": list(df.columns), |
| | "dtypes": {c: str(df[c].dtype) for c in df.columns}, |
| | "threshold": self.threshold, |
| | "model": _introspect_model(self.model), |
| | "head": df.head(1).to_dict(orient="records"), |
| | } |
| |
|
| | |
| | try: |
| | if hasattr(self.model, "predict_proba"): |
| | proba = self.model.predict_proba(df)[:, 1].astype(float) |
| | else: |
| | raw = self.model.predict(df).astype(float) |
| | proba = 1.0 / (1.0 + np.exp(-raw)) |
| | except Exception as e: |
| | return { |
| | "error": f"model.predict failed: {e}", |
| | "trace": traceback.format_exc(), |
| | "debug": debug_info, |
| | "threshold": self.threshold, |
| | } |
| |
|
| | p = float(proba[0]) |
| | label = int(p >= self.threshold) |
| |
|
| | resp: Dict[str, Any] = { |
| | "risk_probability": p, |
| | "risk_label": label, |
| | "threshold": self.threshold, |
| | } |
| |
|
| | if explain: |
| | if hasattr(self.model, "top_contrib"): |
| | try: |
| | names, vals = self.model.top_contrib(df, k=5) |
| | if names: |
| | resp["shap"] = {"feature_names": names, "values": vals} |
| | except Exception as e: |
| | resp["shap_error"] = f"top_contrib failed: {e}" |
| | elif self.explainer is not None: |
| | try: |
| | shap_vals = self.explainer(df) |
| | vals = shap_vals.values[0] if hasattr(shap_vals, "values") else shap_vals[0] |
| | contrib = [] |
| | for feat in EXPLAIN_ORDER: |
| | if feat in df.columns: |
| | idx = list(df.columns).index(feat) |
| | contrib.append({"feature": feat, "effect": float(vals[idx])}) |
| | resp["shap"] = {"contrib": contrib} |
| | except Exception as e: |
| | resp["shap_error"] = f"explainer failed: {e}" |
| |
|
| | if debug: |
| | resp["debug"] = debug_info |
| |
|
| | try: |
| | print(f"[handler] prob={p:.4f} label={label}", flush=True) |
| | except Exception: |
| | pass |
| |
|
| | return resp |
| |
|