| """ |
| Logistic regression on TF-IDF(clean_text) + scaled metadata features. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| from scipy.sparse import hstack |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.metrics import f1_score |
| from sklearn.preprocessing import StandardScaler |
|
|
| from src.features.metadata_features import DEFAULT_METADATA_COLUMNS |
| from src.utils.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class MetadataLRModel: |
| """TF-IDF on clean text + numeric metadata → logistic regression.""" |
|
|
| def __init__( |
| self, |
| lr_cfg: dict, |
| tfidf_cfg: dict, |
| *, |
| metadata_columns: list[str] | None = None, |
| C: float | None = None, |
| ): |
| self.metadata_columns = metadata_columns or list(DEFAULT_METADATA_COLUMNS) |
| ngram = tuple(tfidf_cfg.get("ngram_range", [1, 2])) |
| self.tfidf = TfidfVectorizer( |
| max_features=int(tfidf_cfg.get("max_features", 5000)), |
| ngram_range=ngram, |
| sublinear_tf=bool(tfidf_cfg.get("sublinear_tf", True)), |
| min_df=int(tfidf_cfg.get("min_df", 3)), |
| analyzer="word", |
| strip_accents="unicode", |
| ) |
| self.scaler = StandardScaler() |
| self.clf = LogisticRegression( |
| C=float(C if C is not None else lr_cfg.get("C", 0.05)), |
| max_iter=int(lr_cfg.get("max_iter", 2000)), |
| class_weight=lr_cfg.get("class_weight", "balanced"), |
| solver=lr_cfg.get("solver", "lbfgs"), |
| random_state=42, |
| ) |
| self.is_fitted = False |
|
|
| @property |
| def C(self) -> float: |
| return float(self.clf.C) |
|
|
| def _meta_matrix(self, meta: pd.DataFrame) -> np.ndarray: |
| cols = [c for c in self.metadata_columns if c in meta.columns] |
| return meta[cols].astype(float).values |
|
|
| def _features(self, X_clean: pd.Series, meta: pd.DataFrame, *, fit: bool) -> np.ndarray: |
| if fit: |
| X_t = self.tfidf.fit_transform(X_clean.astype(str)) |
| X_m = self.scaler.fit_transform(self._meta_matrix(meta)) |
| else: |
| X_t = self.tfidf.transform(X_clean.astype(str)) |
| X_m = self.scaler.transform(self._meta_matrix(meta)) |
| return hstack([X_t, X_m]) |
|
|
| def fit( |
| self, |
| X_clean: pd.Series, |
| meta: pd.DataFrame, |
| y, |
| ) -> "MetadataLRModel": |
| X = self._features(X_clean, meta, fit=True) |
| self.clf.fit(X, y) |
| self.is_fitted = True |
| logger.info( |
| f"Metadata LR trained — C={self.C} | " |
| f"tfidf_dim={len(self.tfidf.vocabulary_)} | meta_dim={len(self.metadata_columns)}" |
| ) |
| return self |
|
|
| def predict_proba(self, X_clean: pd.Series, meta: pd.DataFrame) -> np.ndarray: |
| X = self._features(X_clean, meta, fit=False) |
| return self.clf.predict_proba(X) |
|
|
| def predict(self, X_clean: pd.Series, meta: pd.DataFrame) -> np.ndarray: |
| return self.predict_proba(X_clean, meta).argmax(axis=1) |
|
|
| def train_test_gap( |
| self, |
| X_train_clean, |
| meta_train, |
| y_train, |
| X_test_clean, |
| meta_test, |
| y_test, |
| ) -> tuple[float, float, float]: |
| preds_train = self.predict(X_train_clean, meta_train) |
| preds_test = self.predict(X_test_clean, meta_test) |
| y_tr = np.asarray(y_train).astype(int) |
| y_te = np.asarray(y_test).astype(int) |
| f1_train = float(f1_score(y_tr, preds_train, average="weighted", zero_division=0)) |
| f1_test = float(f1_score(y_te, preds_test, average="weighted", zero_division=0)) |
| return f1_train, f1_test, abs(f1_train - f1_test) |
|
|
| def save(self, path: str | Path) -> None: |
| import joblib |
|
|
| path = Path(path) |
| path.parent.mkdir(parents=True, exist_ok=True) |
| joblib.dump( |
| { |
| "tfidf": self.tfidf, |
| "scaler": self.scaler, |
| "clf": self.clf, |
| "metadata_columns": self.metadata_columns, |
| }, |
| path, |
| ) |
| logger.info(f"Metadata LR saved: {path}") |
|
|
| @classmethod |
| def load(cls, path: str | Path) -> "MetadataLRModel": |
| import joblib |
|
|
| blob = joblib.load(path) |
| inst = cls.__new__(cls) |
| inst.tfidf = blob["tfidf"] |
| inst.scaler = blob["scaler"] |
| inst.clf = blob["clf"] |
| inst.metadata_columns = blob["metadata_columns"] |
| inst.is_fitted = True |
| return inst |
|
|
|
|
| def fit_metadata_lr_with_gap_control( |
| X_train_clean, |
| meta_train, |
| y_train, |
| X_test_clean, |
| meta_test, |
| y_test, |
| lr_cfg: dict, |
| tfidf_cfg: dict, |
| *, |
| max_gap: float = 0.05, |
| X_train_gap_clean=None, |
| meta_train_gap=None, |
| y_train_gap=None, |
| ) -> tuple[MetadataLRModel, dict]: |
| gap_cfg = lr_cfg.get("gap_search", {}) |
| X_gap = X_train_gap_clean if X_train_gap_clean is not None else X_train_clean |
| meta_gap = meta_train_gap if meta_train_gap is not None else meta_train |
| y_gap = y_train_gap if y_train_gap is not None else y_train |
|
|
| grid = ( |
| gap_cfg.get("param_grid") |
| if gap_cfg.get("enabled", True) |
| else [{"C": float(lr_cfg.get("C", 0.05)), **tfidf_cfg}] |
| ) |
|
|
| best: MetadataLRModel | None = None |
| best_meta: dict = {} |
| best_gap = float("inf") |
|
|
| for params in grid: |
| merged = {**tfidf_cfg, **{k: v for k, v in params.items() if k != "C"}} |
| c = float(params.get("C", lr_cfg.get("C", 0.05))) |
| model = MetadataLRModel(lr_cfg, merged, C=c) |
| model.fit(X_train_clean, meta_train, y_train) |
| f1_train, f1_test, gap = model.train_test_gap( |
| X_gap, meta_gap, y_gap, X_test_clean, meta_test, y_test |
| ) |
| logger.info( |
| f"Metadata LR gap — C={c} max_features={merged.get('max_features')} " |
| f"train_f1={f1_train:.4f} test_f1={f1_test:.4f} gap={gap:.4f}" |
| ) |
| meta = { |
| "C": c, |
| "max_features": int(merged.get("max_features", 5000)), |
| "min_df": int(merged.get("min_df", 3)), |
| "f1_train": round(f1_train, 4), |
| "f1_test": round(f1_test, 4), |
| "train_test_gap": round(gap, 4), |
| "train_test_gap_pp": round(gap * 100, 2), |
| "gap_ok": gap < max_gap, |
| } |
| if gap < best_gap: |
| best, best_meta = model, meta |
| best_gap = gap |
| if gap < max_gap: |
| break |
|
|
| return best, best_meta |
|
|