text_classificators / src /model_evaluation.py
theformatisvalid's picture
Upload 7 files
2153792 verified
from typing import Dict, Any, Union, Callable, Optional, Tuple, List
import numpy as np
import pandas as pd
from collections import defaultdict
import torch
from sklearn.model_selection import (
StratifiedKFold, GroupKFold, TimeSeriesSplit,
GridSearchCV, RandomizedSearchCV
)
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, average_precision_score, log_loss,
confusion_matrix, classification_report
)
from sklearn.base import BaseEstimator
import warnings
warnings.filterwarnings("ignore")
OPTUNA_AVAILABLE = False
HYPEROPT_AVAILABLE = False
try:
import optuna
from optuna.samplers import TPESampler
OPTUNA_AVAILABLE = True
except ImportError:
pass
try:
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
HYPEROPT_AVAILABLE = True
except ImportError:
pass
WANDB_AVAILABLE = False
try:
import wandb
WANDB_AVAILABLE = True
except ImportError:
pass
def get_cv_splitter(
cv_type: str = "stratified",
n_splits: int = 5,
groups: Optional[np.ndarray] = None,
random_state: int = 42
):
if cv_type == "stratified":
return StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
elif cv_type == "group":
if groups is None:
raise ValueError("groups must be provided for GroupKFold")
return GroupKFold(n_splits=n_splits)
elif cv_type == "time":
return TimeSeriesSplit(n_splits=n_splits)
else:
raise ValueError("cv_type must be 'stratified', 'group', or 'time'")
def grid_search_cv(
model: BaseEstimator,
X: np.ndarray,
y: np.ndarray,
param_grid: Dict[str, List],
cv_type: str = "stratified",
n_splits: int = 5,
scoring: str = "f1_macro",
groups: Optional[np.ndarray] = None,
verbose: int = 1
) -> GridSearchCV:
cv = get_cv_splitter(cv_type, n_splits, groups)
search = GridSearchCV(
model, param_grid, cv=cv, scoring=scoring, verbose=verbose, n_jobs=-1
)
search.fit(X, y)
return search
def random_search_cv(
model: BaseEstimator,
X: np.ndarray,
y: np.ndarray,
param_distributions: Dict[str, Any],
n_iter: int = 20,
cv_type: str = "stratified",
n_splits: int = 5,
scoring: str = "f1_macro",
groups: Optional[np.ndarray] = None,
verbose: int = 1
) -> RandomizedSearchCV:
cv = get_cv_splitter(cv_type, n_splits, groups)
search = RandomizedSearchCV(
model, param_distributions, n_iter=n_iter, cv=cv,
scoring=scoring, verbose=verbose, n_jobs=-1, random_state=42
)
search.fit(X, y)
return search
def _optuna_objective(
trial,
model_fn: Callable,
X: np.ndarray,
y: np.ndarray,
cv,
scoring: str = "f1_macro"
) -> float:
if "logistic" in model_fn.__name__.lower():
C = trial.suggest_float("C", 1e-4, 1e2, log=True)
penalty = trial.suggest_categorical("penalty", ["l1", "l2"])
solver = "liblinear" if penalty == "l1" else "lbfgs"
model = model_fn(C=C, penalty=penalty, solver=solver)
elif "random_forest" in model_fn.__name__.lower():
n_estimators = trial.suggest_int("n_estimators", 50, 300)
max_depth = trial.suggest_int("max_depth", 3, 20)
model = model_fn(n_estimators=n_estimators, max_depth=max_depth)
else:
model = model_fn(trial)
scores = []
for train_idx, val_idx in cv.split(X, y):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model.fit(X_train, y_train)
y_pred = model.predict(X_val)
if scoring == "f1_macro":
score = f1_score(y_val, y_pred, average="macro")
elif scoring == "roc_auc":
y_proba = model.predict_proba(X_val)[:, 1]
score = roc_auc_score(y_val, y_proba)
else:
raise ValueError(f"Scoring {scoring} not implemented in custom Optuna loop")
scores.append(score)
return np.mean(scores)
def optuna_tuning(
model_fn: Callable,
X: np.ndarray,
y: np.ndarray,
n_trials: int = 50,
cv_type: str = "stratified",
n_splits: int = 5,
scoring: str = "f1_macro",
groups: Optional[np.ndarray] = None,
direction: str = "maximize"
) -> optuna.Study:
cv = get_cv_splitter(cv_type, n_splits, groups)
study = optuna.create_study(direction=direction, sampler=TPESampler(seed=42))
study.optimize(
lambda trial: _optuna_objective(trial, model_fn, X, y, cv, scoring),
n_trials=n_trials
)
return study
def hyperopt_tuning(
model_fn: Callable,
X: np.ndarray,
y: np.ndarray,
space: Dict,
max_evals: int = 50,
cv_type: str = "stratified",
n_splits: int = 5,
scoring: str = "f1_macro",
groups: Optional[np.ndarray] = None
):
cv = get_cv_splitter(cv_type, n_splits, groups)
def objective(params):
model = model_fn(**params)
scores = []
for train_idx, val_idx in cv.split(X, y):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model.fit(X_train, y_train)
y_pred = model.predict(X_val)
if scoring == "f1_macro":
score = f1_score(y_val, y_pred, average="macro")
elif scoring == "roc_auc":
y_proba = model.predict_proba(X_val)[:, 1]
score = roc_auc_score(y_val, y_proba)
else:
score = -1
scores.append(-score)
return {'loss': -np.mean(scores), 'status': STATUS_OK}
trials = Trials()
best = fmin(fn=objective, space=space, algo=tpe.suggest, max_evals=max_evals, trials=trials)
return best, trials
def compute_classification_metrics(
y_true: np.ndarray,
y_pred: np.ndarray,
y_proba: Optional[np.ndarray] = None,
average: str = "macro"
) -> Dict[str, float]:
metrics = {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average=average, zero_division=0),
"recall": recall_score(y_true, y_pred, average=average, zero_division=0),
"f1": f1_score(y_true, y_pred, average=average, zero_division=0),
}
if y_proba is not None:
if len(np.unique(y_true)) == 2:
metrics["roc_auc"] = roc_auc_score(y_true, y_proba[:, 1])
metrics["pr_auc"] = average_precision_score(y_true, y_proba[:, 1])
metrics["log_loss"] = log_loss(y_true, y_proba)
else:
try:
metrics["roc_auc"] = roc_auc_score(y_true, y_proba, multi_class="ovr", average=average)
metrics["pr_auc"] = average_precision_score(y_true, y_proba, average=average)
metrics["log_loss"] = log_loss(y_true, y_proba)
except ValueError:
metrics["roc_auc"] = np.nan
metrics["pr_auc"] = np.nan
return metrics
def evaluate_model(
model: BaseEstimator,
X_test: np.ndarray,
y_test: np.ndarray,
average: str = "macro",
return_pred: bool = False
) -> Union[Dict[str, float], Tuple[Dict[str, float], np.ndarray, Optional[np.ndarray]]]:
y_pred = model.predict(X_test)
y_proba = None
if hasattr(model, "predict_proba"):
y_proba = model.predict_proba(X_test)
metrics = compute_classification_metrics(y_test, y_pred, y_proba, average=average)
if return_pred:
return metrics, y_pred, y_proba
return metrics
def get_early_stopping(
monitor: str = "val_loss",
patience: int = 5,
mode: str = "min",
framework: str = "keras"
):
if framework == "keras":
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
es = EarlyStopping(monitor=monitor, patience=patience, restore_best_weights=True, mode=mode)
reduce_lr = ReduceLROnPlateau(monitor=monitor, factor=0.5, patience=3, min_lr=1e-7, mode=mode)
return [es, reduce_lr]
elif framework == "pytorch":
raise NotImplementedError("PyTorch callbacks require custom training loop")
else:
raise ValueError("framework must be 'keras' or 'pytorch'")
def init_wandb(
project_name: str = "text-classification",
run_name: Optional[str] = None,
config: Optional[Dict] = None
):
if not WANDB_AVAILABLE:
return None
wandb.init(project=project_name, name=run_name, config=config)
return wandb
def log_metrics_to_wandb(metrics: Dict[str, float]):
if WANDB_AVAILABLE and wandb.run:
wandb.log(metrics)
def suggest_transformer_hparams(trial) -> Dict[str, Any]:
return {
"learning_rate": trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True),
"per_device_train_batch_size": trial.suggest_categorical("batch_size", [8, 16, 32]),
"num_train_epochs": trial.suggest_int("num_train_epochs", 2, 6),
"weight_decay": trial.suggest_float("weight_decay", 0.0, 0.3),
"warmup_ratio": trial.suggest_float("warmup_ratio", 0.0, 0.2),
}
def evaluate_transformer_outputs(
y_true: List[int],
y_pred: List[int],
y_logits: Optional[np.ndarray] = None
) -> Dict[str, float]:
y_true = np.array(y_true)
y_pred = np.array(y_pred)
if y_logits is not None:
y_proba = torch.softmax(torch.tensor(y_logits), dim=-1).numpy()
else:
y_proba = None
return compute_classification_metrics(y_true, y_pred, y_proba, average="macro")
def confusion_matrix_df(y_true: np.ndarray, y_pred: np.ndarray, labels: Optional[List] = None) -> pd.DataFrame:
cm = confusion_matrix(y_true, y_pred, labels=labels)
if labels is None:
labels = sorted(np.unique(y_true))
return pd.DataFrame(cm, index=[f"True_{l}" for l in labels], columns=[f"Pred_{l}" for l in labels])