| """ |
| Hybrid ensemble: regularized DistilBERT probabilities + TF-IDF logistic regression. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
|
|
| import numpy as np |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.metrics import f1_score, roc_auc_score |
|
|
| from src.evaluation.threshold_tuning import predict_with_threshold, search_best_threshold |
| from sklearn.pipeline import Pipeline |
| from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
| from src.utils.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class StableLRModel: |
| """Regularized LR on TF-IDF (stable_training.yaml).""" |
|
|
| def __init__(self, lr_cfg: dict, tfidf_cfg: dict, *, C: float | None = None): |
| ngram = tuple(tfidf_cfg.get("ngram_range", [1, 2])) |
| self.pipeline = Pipeline( |
| [ |
| ( |
| "tfidf", |
| TfidfVectorizer( |
| max_features=int(tfidf_cfg.get("max_features", 5000)), |
| ngram_range=ngram, |
| sublinear_tf=bool(tfidf_cfg.get("sublinear_tf", True)), |
| min_df=int(tfidf_cfg.get("min_df", 3)), |
| analyzer="word", |
| strip_accents="unicode", |
| ), |
| ), |
| ( |
| "clf", |
| LogisticRegression( |
| C=float(C if C is not None else lr_cfg.get("C", 0.05)), |
| max_iter=int(lr_cfg.get("max_iter", 2000)), |
| class_weight=lr_cfg.get("class_weight", "balanced"), |
| solver=lr_cfg.get("solver", "lbfgs"), |
| random_state=42, |
| ), |
| ), |
| ] |
| ) |
| self.is_fitted = False |
|
|
| def fit(self, X_train, y_train): |
| logger.info(f"Training stable LR — C={self.pipeline.named_steps['clf'].C}") |
| self.pipeline.fit(X_train, y_train) |
| self.is_fitted = True |
| return self |
|
|
| @property |
| def C(self) -> float: |
| return float(self.pipeline.named_steps["clf"].C) |
|
|
| def set_C(self, c: float) -> None: |
| self.pipeline.named_steps["clf"].C = float(c) |
|
|
| def train_test_gap(self, X_train, y_train, X_test, y_test) -> tuple[float, float, float]: |
| """Return (f1_train, f1_test, gap) using weighted F1.""" |
| preds_train = self.predict(X_train) |
| preds_test = self.predict(X_test) |
| y_tr = np.asarray(y_train).astype(int) |
| y_te = np.asarray(y_test).astype(int) |
| f1_train = float(f1_score(y_tr, preds_train, average="weighted", zero_division=0)) |
| f1_test = float(f1_score(y_te, preds_test, average="weighted", zero_division=0)) |
| return f1_train, f1_test, abs(f1_train - f1_test) |
|
|
| def predict(self, X): |
| return self.pipeline.predict(X) |
|
|
| def predict_proba(self, X) -> np.ndarray: |
| return self.pipeline.predict_proba(X) |
|
|
| def save(self, path: str | Path) -> None: |
| import joblib |
|
|
| path = Path(path) |
| path.parent.mkdir(parents=True, exist_ok=True) |
| joblib.dump(self.pipeline, path) |
| logger.info(f"Stable LR saved: {path}") |
|
|
| @classmethod |
| def load(cls, path: str | Path) -> "StableLRModel": |
| import joblib |
|
|
| inst = cls.__new__(cls) |
| inst.pipeline = joblib.load(path) |
| inst.is_fitted = True |
| return inst |
|
|
|
|
| def fit_lr_with_gap_control( |
| X_train, |
| y_train, |
| X_test, |
| y_test, |
| lr_cfg: dict, |
| tfidf_cfg: dict, |
| *, |
| max_gap: float = 0.05, |
| X_train_gap=None, |
| y_train_gap=None, |
| ) -> tuple[StableLRModel, dict]: |
| """ |
| Fit LR on augmented train; tune regularization until |train F1 - test F1| < max_gap. |
| """ |
| gap_cfg = lr_cfg.get("gap_search", {}) |
| X_gap = X_train_gap if X_train_gap is not None else X_train |
| y_gap = y_train_gap if y_train_gap is not None else y_train |
|
|
| if not gap_cfg.get("enabled", True): |
| grid = [{"C": float(lr_cfg.get("C", 0.05)), **tfidf_cfg}] |
| else: |
| grid = gap_cfg.get("param_grid") or [ |
| {"C": c, **tfidf_cfg} for c in gap_cfg.get("C_candidates", [lr_cfg.get("C", 0.05)]) |
| ] |
|
|
| best: StableLRModel | None = None |
| best_meta: dict = {} |
| best_gap = float("inf") |
|
|
| for params in grid: |
| merged_tfidf = {**tfidf_cfg, **{k: v for k, v in params.items() if k != "C"}} |
| c = float(params.get("C", lr_cfg.get("C", 0.05))) |
| model = StableLRModel(lr_cfg, merged_tfidf, C=c) |
| model.fit(X_train, y_train) |
| f1_train, f1_test, gap = model.train_test_gap(X_gap, y_gap, X_test, y_test) |
| logger.info( |
| f"LR gap search — C={c} max_features={merged_tfidf.get('max_features')} " |
| f"min_df={merged_tfidf.get('min_df')} train_f1={f1_train:.4f} " |
| f"test_f1={f1_test:.4f} gap={gap:.4f}" |
| ) |
| meta = { |
| "C": c, |
| "max_features": int(merged_tfidf.get("max_features", 800)), |
| "min_df": int(merged_tfidf.get("min_df", 3)), |
| "f1_train": round(f1_train, 4), |
| "f1_test": round(f1_test, 4), |
| "train_test_gap": round(gap, 4), |
| "train_test_gap_pp": round(gap * 100, 2), |
| "gap_ok": gap < max_gap, |
| } |
| if gap < best_gap: |
| best, best_meta = model, meta |
| best_gap = gap |
| if gap < max_gap: |
| logger.info(f"LR gap OK at C={c}") |
| break |
|
|
| if not best_meta.get("gap_ok"): |
| logger.warning( |
| f"LR gap still {best_meta['train_test_gap']:.4f} after grid search; " |
| f"using best gap C={best_meta['C']}" |
| ) |
|
|
| return best, best_meta |
|
|
|
|
| def soft_vote_probs( |
| prob_a: np.ndarray, |
| prob_b: np.ndarray, |
| weight_a: float = 0.5, |
| weight_b: float = 0.5, |
| ) -> np.ndarray: |
| total = weight_a + weight_b |
| return (weight_a * prob_a + weight_b * prob_b) / total |
|
|
|
|
| def evaluate_ensemble( |
| bert_probs: np.ndarray, |
| lr_probs: np.ndarray, |
| y_true: np.ndarray, |
| *, |
| bert_weight: float = 0.5, |
| lr_weight: float = 0.5, |
| model_name: str = "Hybrid-ensemble", |
| threshold: float = 0.5, |
| ) -> dict: |
| """Combine probabilities and compute binary metrics.""" |
| combined = soft_vote_probs(bert_probs, lr_probs, bert_weight, lr_weight) |
| preds = predict_with_threshold(combined, threshold) |
| y = np.asarray(y_true).astype(int) |
|
|
| f1_test = float(f1_score(y, preds, average="weighted", zero_division=0)) |
| f1_toxic = float(f1_score(y, preds, pos_label=1, zero_division=0)) |
|
|
| return { |
| "model": model_name, |
| "threshold": round(threshold, 4), |
| "f1_weighted": round(f1_test, 4), |
| "f1_toxic": round(f1_toxic, 4), |
| "roc_auc": round(float(roc_auc_score(y, combined)), 4), |
| "fp": int(((y == 0) & (preds == 1)).sum()), |
| "fn": int(((y == 1) & (preds == 0)).sum()), |
| "ensemble_probs": combined, |
| "ensemble_preds": preds, |
| } |
|
|
|
|
| def tune_ensemble_threshold( |
| bert_probs: np.ndarray, |
| lr_probs: np.ndarray, |
| y_val: np.ndarray, |
| *, |
| bert_weight: float = 0.5, |
| lr_weight: float = 0.5, |
| metric: str = "f1_toxic", |
| min_threshold: float = 0.05, |
| max_threshold: float = 0.95, |
| step: float = 0.01, |
| ) -> tuple[float, float]: |
| """Search ensemble threshold on validation soft-voted probabilities.""" |
| combined = soft_vote_probs(bert_probs, lr_probs, bert_weight, lr_weight) |
| return search_best_threshold( |
| y_val, |
| combined, |
| metric=metric, |
| min_threshold=min_threshold, |
| max_threshold=max_threshold, |
| step=step, |
| ) |
|
|
|
|
| def compute_performance_weights( |
| bert_probs_val: np.ndarray, |
| lr_probs_val: np.ndarray, |
| y_val: np.ndarray, |
| *, |
| bert_threshold: float = 0.33, |
| lr_threshold: float = 0.5, |
| metric: str = "f1_weighted", |
| min_lr_weight: float = 0.15, |
| max_lr_weight: float = 0.45, |
| ) -> tuple[float, float, dict]: |
| """ |
| Set soft-vote weights proportional to validation F1 (per branch threshold). |
| """ |
| y = np.asarray(y_val).astype(int) |
| bert_preds = predict_with_threshold(bert_probs_val, bert_threshold) |
| lr_preds = predict_with_threshold(lr_probs_val, lr_threshold) |
|
|
| if metric == "f1_toxic": |
| bert_score = float(f1_score(y, bert_preds, pos_label=1, zero_division=0)) |
| lr_score = float(f1_score(y, lr_preds, pos_label=1, zero_division=0)) |
| else: |
| bert_score = float(f1_score(y, bert_preds, average="weighted", zero_division=0)) |
| lr_score = float(f1_score(y, lr_preds, average="weighted", zero_division=0)) |
|
|
| total = bert_score + lr_score |
| if total <= 0: |
| bw, lw = 0.7, 0.3 |
| else: |
| lw = lr_score / total |
| lw = float(np.clip(lw, min_lr_weight, max_lr_weight)) |
| bw = 1.0 - lw |
|
|
| return bw, lw, { |
| "bert_val_score": round(bert_score, 4), |
| "lr_val_score": round(lr_score, 4), |
| "bert_weight": round(bw, 4), |
| "lr_weight": round(lw, 4), |
| "weight_metric": metric, |
| } |
|
|
|
|
| def save_ensemble_meta(path: Path, meta: dict) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with open(path, "w") as f: |
| json.dump(meta, f, indent=2) |
|
|