ribesstefano's picture
Added confidence scores to reports
a589b70
from typing import Optional, Dict
import logging
import os
from .optuna_utils import get_majority_vote_metrics, get_dataframe_stats
from .protac_dataset import get_datasets
import optuna
import xgboost as xgb
import pandas as pd
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score, f1_score
import xgboost as xgb
import pandas as pd
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score, f1_score
import joblib
from optuna.samplers import TPESampler
import torch
xgb.set_config(verbosity=0)
def get_confidence_scores(y, y_pred, threshold=0.5):
# Calculate the likelihood for the false negative: get the mean value of
# the prediction for the false-positive and false-negatives
# Get the indices of the false positives and false negatives
false_positives = (y == 0) & ((y_pred > threshold).astype(int) == 1)
false_negatives = (y == 1) & ((y_pred > threshold).astype(int) == 0)
# Get the mean value of the predictions for the false positives and false negatives
false_positives_mean = y_pred[false_positives].mean()
false_negatives_mean = y_pred[false_negatives].mean()
return false_positives_mean, false_negatives_mean
def train_and_evaluate_xgboost(
protein2embedding: Dict,
cell2embedding: Dict,
smiles2fp: Dict,
train_df: pd.DataFrame,
val_df: pd.DataFrame,
params: dict,
test_df: Optional[pd.DataFrame] = None,
active_label: str = 'Active',
num_boost_round: int = 100,
shuffle_train_data: bool = False,
) -> tuple:
"""
Train and evaluate an XGBoost model with the given parameters.
Args:
train_df (pd.DataFrame): The training and validation data.
test_df (pd.DataFrame): The test data.
params (dict): Hyperparameters for the XGBoost model.
active_label (str): The active label column.
num_boost_round (int): Maximum number of epochs.
Returns:
tuple: The trained model, test predictions, and metrics.
"""
# Get datasets and their numpy arrays
train_ds, val_ds, test_ds = get_datasets(
protein2embedding=protein2embedding,
cell2embedding=cell2embedding,
smiles2fp=smiles2fp,
train_df=train_df,
val_df=val_df,
test_df=test_df,
disabled_embeddings=[],
active_label=active_label,
apply_scaling=False,
)
X_train, y_train = train_ds.get_numpy_arrays()
X_val, y_val = val_ds.get_numpy_arrays()
# Shuffle the training data
if shuffle_train_data:
idx = np.random.permutation(len(X_train))
X_train, y_train = X_train[idx], y_train[idx]
# Setup training and validation data in XGBoost data format
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
evallist = [(dval, 'eval'), (dtrain, 'train')]
# Setup test data
if test_df is not None:
X_test, y_test = test_ds.get_numpy_arrays()
dtest = xgb.DMatrix(X_test, label=y_test)
evallist.append((dtest, 'test'))
model = xgb.train(
params,
dtrain,
num_boost_round=num_boost_round,
evals=evallist,
early_stopping_rounds=10,
verbose_eval=False,
)
# Evaluate model
val_pred = model.predict(dval)
val_pred_binary = (val_pred > 0.5).astype(int)
fp_mean, fn_mean = get_confidence_scores(y_val, val_pred)
metrics = {
'val_acc': accuracy_score(y_val, val_pred_binary),
'val_roc_auc': roc_auc_score(y_val, val_pred),
'val_precision': precision_score(y_val, val_pred_binary),
'val_recall': recall_score(y_val, val_pred_binary),
'val_f1_score': f1_score(y_val, val_pred_binary),
'val_false_positives_mean': fp_mean,
'val_false_negatives_mean': fn_mean,
}
preds = {'val_pred': val_pred}
if test_df is not None:
test_pred = model.predict(dtest)
test_pred_binary = (test_pred > 0.5).astype(int)
fp_mean, fn_mean = get_confidence_scores(y_test, test_pred)
metrics.update({
'test_acc': accuracy_score(y_test, test_pred_binary),
'test_roc_auc': roc_auc_score(y_test, test_pred),
'test_precision': precision_score(y_test, test_pred_binary),
'test_recall': recall_score(y_test, test_pred_binary),
'test_f1_score': f1_score(y_test, test_pred_binary),
'test_false_positives_mean': fp_mean,
'test_false_negatives_mean': fn_mean,
})
preds.update({'test_pred': test_pred})
return model, preds, metrics
def xgboost_model_objective(
trial: optuna.Trial,
protein2embedding: Dict,
cell2embedding: Dict,
smiles2fp: Dict,
train_val_df: pd.DataFrame,
kf: StratifiedKFold,
groups: Optional[np.array] = None,
active_label: str = 'Active',
num_boost_round: int = 100,
model_name: Optional[str] = None,
) -> float:
""" Objective function for hyperparameter optimization with XGBoost.
Args:
trial (optuna.Trial): The Optuna trial object.
train_val_df (pd.DataFrame): The training and validation data.
kf (StratifiedKFold): Stratified K-Folds cross-validator.
test_df (Optional[pd.DataFrame]): The test data.
active_label (str): The active label column.
num_boost_round (int): Maximum number of epochs.
model_name (Optional[str]): The prefix name of the CV models to save, if supplied. Used as: `f"{model_name}_fold_{k}.json"`
"""
# Suggest hyperparameters to be used across the CV folds
params = {
'booster': 'gbtree',
'tree_method': 'hist', # if torch.cuda.is_available() else 'hist',
'objective': 'binary:logistic',
'eval_metric': 'auc',
'eta': trial.suggest_float('eta', 1e-4, 1e-1, log=True),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'min_child_weight': trial.suggest_float('min_child_weight', 1e-3, 10.0, log=True),
'gamma': trial.suggest_float('gamma', 1e-4, 1e-1, log=True),
'subsample': trial.suggest_float('subsample', 0.5, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
}
X = train_val_df.copy().drop(columns=active_label)
y = train_val_df[active_label].tolist()
report = []
val_preds = []
for k, (train_index, val_index) in enumerate(kf.split(X, y, groups)):
logging.info(f'Fold {k + 1}/{kf.get_n_splits()}')
train_df = train_val_df.iloc[train_index]
val_df = train_val_df.iloc[val_index]
# Get some statistics from the dataframes
stats = {
'model_type': 'XGBoost',
'fold': k,
'train_len': len(train_df),
'val_len': len(val_df),
'train_perc': len(train_df) / len(train_val_df),
'val_perc': len(val_df) / len(train_val_df),
}
stats.update(get_dataframe_stats(train_df, val_df, active_label=active_label))
if groups is not None:
stats['train_unique_groups'] = len(np.unique(groups[train_index]))
stats['val_unique_groups'] = len(np.unique(groups[val_index]))
bst, preds, metrics = train_and_evaluate_xgboost(
protein2embedding=protein2embedding,
cell2embedding=cell2embedding,
smiles2fp=smiles2fp,
train_df=train_df,
val_df=val_df,
params=params,
active_label=active_label,
num_boost_round=num_boost_round,
)
stats.update(metrics)
report.append(stats.copy())
val_preds.append(preds['val_pred'])
if model_name:
model_filename = f'{model_name}_fold{k}.json'
bst.save_model(model_filename)
logging.info(f'CV XGBoost model saved to: {model_filename}')
# Save the report in the trial
trial.set_user_attr('report', report)
trial.set_user_attr('val_preds', val_preds)
trial.set_user_attr('params', params)
# Get the average validation metrics across the folds
mean_val_roc_auc = np.mean([r['val_roc_auc'] for r in report])
logging.info(f'\tMean val ROC AUC: {mean_val_roc_auc:.4f}')
# Optuna aims to minimize the objective, so return the negative ROC AUC
return -mean_val_roc_auc
def xgboost_hyperparameter_tuning_and_training(
protein2embedding: Dict,
cell2embedding: Dict,
smiles2fp: Dict,
train_val_df: pd.DataFrame,
test_df: pd.DataFrame,
kf: StratifiedKFold,
groups: Optional[np.array] = None,
split_type: str = 'random',
n_models_for_test: int = 3,
n_trials: int = 50,
active_label: str = 'Active',
num_boost_round: int = 100,
study_filename: Optional[str] = None,
force_study: bool = False,
model_name: Optional[str] = None,
) -> dict:
""" Hyperparameter tuning and training of an XGBoost model.
Args:
train_val_df (pd.DataFrame): The training and validation data.
test_df (pd.DataFrame): The test data.
kf (StratifiedKFold): Stratified K-Folds cross-validator.
groups (Optional[np.array]): Group labels for the samples used while splitting the dataset into train/test set.
split_type (str): Type of the data split. Used for reporting information.
n_models_for_test (int): Number of models to train for testing.
fast_dev_run (bool): Whether to run a fast development run.
n_trials (int): Number of trials for hyperparameter optimization.
logger_save_dir (str): Directory to save logs.
logger_name (str): Name of the logger.
active_label (str): The active label column.
num_boost_round (int): Maximum number of epochs.
study_filename (Optional[str]): File name to save/load the Optuna study.
force_study (bool): Whether to force the study optimization even if the study file exists.
Returns:
dict: A dictionary containing reports from the CV and test.
"""
# Set the verbosity of Optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)
# Create an Optuna study object
sampler = TPESampler(seed=42)
study = optuna.create_study(direction='minimize', sampler=sampler)
study_loaded = False
if study_filename and not force_study:
if os.path.exists(study_filename):
study = joblib.load(study_filename)
study_loaded = True
logging.info(f'Loaded study from {study_filename}')
if not study_loaded or force_study:
study.optimize(
lambda trial: xgboost_model_objective(
trial=trial,
protein2embedding=protein2embedding,
cell2embedding=cell2embedding,
smiles2fp=smiles2fp,
train_val_df=train_val_df,
kf=kf,
groups=groups,
active_label=active_label,
num_boost_round=num_boost_round,
),
n_trials=n_trials,
)
if study_filename:
joblib.dump(study, study_filename)
cv_report = pd.DataFrame(study.best_trial.user_attrs['report'])
hparam_report = pd.DataFrame([study.best_params])
# Train the best CV models and store their models by running the objective
if model_name:
xgboost_model_objective(
trial=study.best_trial,
protein2embedding=protein2embedding,
cell2embedding=cell2embedding,
smiles2fp=smiles2fp,
train_val_df=train_val_df,
kf=kf,
groups=groups,
active_label=active_label,
num_boost_round=num_boost_round,
model_name=f'{model_name}_cv_model_{split_type}',
)
# Retrain N models with the best hyperparameters (measure model uncertainty)
best_models = []
test_report = []
test_preds = []
for i in range(n_models_for_test):
logging.info(f'Training best model {i + 1}/{n_models_for_test}')
model, preds, metrics = train_and_evaluate_xgboost(
protein2embedding=protein2embedding,
cell2embedding=cell2embedding,
smiles2fp=smiles2fp,
train_df=train_val_df,
val_df=test_df,
params=study.best_trial.user_attrs['params'],
active_label=active_label,
num_boost_round=num_boost_round,
shuffle_train_data=True,
)
metrics = {k.replace('val_', 'test_'): v for k, v in metrics.items()}
metrics['model_type'] = 'XGBoost'
metrics['test_model_id'] = i
metrics.update(get_dataframe_stats(
train_val_df,
test_df=test_df,
active_label=active_label,
))
test_report.append(metrics.copy())
test_preds.append(torch.tensor(preds['val_pred']))
best_models.append(model)
# Save the trained model
if model_name:
model_filename = f'{model_name}_best_model_{split_type}_n{i}-test_acc={metrics["test_acc"]:.2f}-test_roc_auc={metrics["test_roc_auc"]:.3f}.json'
model.save_model(model_filename)
logging.info(f'Best XGBoost model saved to: {model_filename}')
test_report = pd.DataFrame(test_report)
# Get the majority vote for the test predictions
majority_vote_metrics = get_majority_vote_metrics(test_preds, test_df, active_label)
majority_vote_metrics.update(get_dataframe_stats(train_val_df, test_df=test_df, active_label=active_label))
majority_vote_report = pd.DataFrame([majority_vote_metrics])
majority_vote_report['model_type'] = 'XGBoost'
# Add a column with the split_type to all reports
for report in [cv_report, hparam_report, test_report, majority_vote_report]:
report['split_type'] = split_type
# Return the reports
return {
'cv_report': cv_report,
'hparam_report': hparam_report,
'test_report': test_report,
'majority_vote_report' :majority_vote_report,
}