Spaces:
Sleeping
Sleeping
| """Training entrypoint for fraud detection models with MLflow tracking.""" | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| import joblib | |
| import mlflow | |
| import pandas as pd | |
| import yaml | |
| from sklearn.linear_model import LogisticRegression | |
| from src.data_ingestion import load_data, run_data_validation | |
| from src.evaluate import calculate_metrics, rank_models, select_best_threshold | |
| from src.preprocessing import preprocess_for_training | |
| try: | |
| from xgboost import XGBClassifier | |
| except Exception: # pragma: no cover - handled at runtime | |
| XGBClassifier = None | |
| DEFAULT_CONFIG_PATH = Path("configs/train.yaml") | |
| DEFAULT_DATA_PATH = Path("data/raw/creditcard.csv") | |
| DEFAULT_MODEL_PATH = Path("models/model.pkl") | |
| DEFAULT_PREPROCESSOR_PATH = Path("models/preprocessor.pkl") | |
| DEFAULT_REPORT_PATH = Path("artifacts/model_training_report.json") | |
| DEFAULT_MODEL_REPORT_PATH = Path("artifacts/model_report.json") | |
| DEFAULT_VALIDATION_REPORT_PATH = Path("artifacts/data_validation.json") | |
| def load_training_config(config_path: str | Path = DEFAULT_CONFIG_PATH) -> dict[str, Any]: | |
| """Load YAML training configuration.""" | |
| config = yaml.safe_load(Path(config_path).read_text(encoding="utf-8")) or {} | |
| config.setdefault("experiment", {}) | |
| config.setdefault("training", {}) | |
| config.setdefault("mlflow", {}) | |
| return config | |
| def create_model(model_name: str, random_state: int) -> Any: | |
| """Create model instance from configured model name.""" | |
| if model_name == "logistic_regression": | |
| return LogisticRegression( | |
| max_iter=500, | |
| solver="lbfgs", | |
| class_weight="balanced", | |
| random_state=random_state, | |
| ) | |
| if model_name == "xgboost": | |
| if XGBClassifier is None: | |
| raise RuntimeError("xgboost is not available in the environment") | |
| return XGBClassifier( | |
| n_estimators=300, | |
| max_depth=5, | |
| learning_rate=0.05, | |
| subsample=0.9, | |
| colsample_bytree=0.9, | |
| eval_metric="logloss", | |
| random_state=random_state, | |
| n_jobs=2, | |
| ) | |
| raise ValueError(f"Unsupported model: {model_name}") | |
| def train_single_model( | |
| model_name: str, | |
| X_train: pd.DataFrame, | |
| y_train: pd.Series, | |
| X_test: pd.DataFrame, | |
| y_test: pd.Series, | |
| *, | |
| random_state: int, | |
| ) -> tuple[Any, dict[str, Any]]: | |
| """Train one model and return model + metrics.""" | |
| model = create_model(model_name, random_state=random_state) | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_test) | |
| y_pred_proba = model.predict_proba(X_test)[:, 1] | |
| metrics = calculate_metrics(y_test, y_pred, y_pred_proba) | |
| return model, metrics | |
| def log_run_to_mlflow( | |
| *, | |
| experiment_name: str, | |
| model_name: str, | |
| params: dict[str, Any], | |
| metrics: dict[str, Any], | |
| preprocessor_path: Path, | |
| model_temp_path: Path, | |
| artifact_dir: Path, | |
| ) -> str: | |
| """Log one training run to MLflow and return run id.""" | |
| mlflow.set_experiment(experiment_name) | |
| with mlflow.start_run(run_name=model_name) as run: | |
| mlflow.log_params(params) | |
| metric_values = {k: v for k, v in metrics.items() if isinstance(v, float)} | |
| mlflow.log_metrics(metric_values) | |
| # Structured artifacts for debugging and reproducibility. | |
| metrics_path = artifact_dir / f"metrics_{model_name}.json" | |
| metrics_path.parent.mkdir(parents=True, exist_ok=True) | |
| metrics_path.write_text(json.dumps(metrics, indent=2), encoding="utf-8") | |
| mlflow.log_artifact(str(preprocessor_path), artifact_path="preprocessor") | |
| mlflow.log_artifact(str(model_temp_path), artifact_path="model") | |
| mlflow.log_artifact(str(metrics_path), artifact_path="metrics") | |
| return run.info.run_id | |
| def save_model(model: Any, output_path: str | Path = DEFAULT_MODEL_PATH) -> Path: | |
| """Save model artifact to disk.""" | |
| path = Path(output_path) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| joblib.dump(model, path) | |
| return path | |
| def run_training_pipeline( | |
| *, | |
| config_path: str | Path = DEFAULT_CONFIG_PATH, | |
| data_path: str | Path = DEFAULT_DATA_PATH, | |
| model_path: str | Path = DEFAULT_MODEL_PATH, | |
| preprocessor_path: str | Path = DEFAULT_PREPROCESSOR_PATH, | |
| report_path: str | Path = DEFAULT_REPORT_PATH, | |
| model_report_path: str | Path = DEFAULT_MODEL_REPORT_PATH, | |
| validation_report_path: str | Path = DEFAULT_VALIDATION_REPORT_PATH, | |
| ) -> dict[str, Any]: | |
| """Execute end-to-end training and experiment tracking pipeline.""" | |
| config = load_training_config(config_path) | |
| experiment_name = config["experiment"].get("name", "fraud-detection-baseline") | |
| tracking_uri = config["mlflow"].get("tracking_uri", "file:./mlruns") | |
| mlflow.set_tracking_uri(tracking_uri) | |
| training_cfg = config["training"] | |
| random_state = int(training_cfg.get("random_state", 42)) | |
| test_size = float(training_cfg.get("test_size", 0.2)) | |
| imbalance_method = str(training_cfg.get("imbalance_method", "class_weight")) | |
| models = training_cfg.get("models") or [training_cfg.get("model", "logistic_regression")] | |
| threshold_cfg = config.get("threshold", {}) | |
| min_recall_target = float(threshold_cfg.get("min_recall_target", 0.90)) | |
| threshold_grid_size = int(threshold_cfg.get("grid_size", 99)) | |
| threshold_min = float(threshold_cfg.get("min_threshold", 0.01)) | |
| threshold_max = float(threshold_cfg.get("max_threshold", 0.99)) | |
| run_data_validation(file_path=data_path, report_path=validation_report_path) | |
| raw_df = load_data(data_path) | |
| prep = preprocess_for_training( | |
| raw_df, | |
| test_size=test_size, | |
| random_state=random_state, | |
| imbalance_method=imbalance_method, | |
| preprocessor_path=preprocessor_path, | |
| ) | |
| results: list[dict[str, Any]] = [] | |
| skipped_models: list[dict[str, str]] = [] | |
| artifact_dir = Path(report_path).parent | |
| artifact_dir.mkdir(parents=True, exist_ok=True) | |
| preprocessor_path_obj = Path(preprocessor_path) | |
| for model_name in models: | |
| try: | |
| model, metrics = train_single_model( | |
| model_name=model_name, | |
| X_train=prep["X_train"], | |
| y_train=prep["y_train"], | |
| X_test=prep["X_test"], | |
| y_test=prep["y_test"], | |
| random_state=random_state, | |
| ) | |
| except RuntimeError as exc: | |
| skipped_models.append({"model_name": model_name, "reason": str(exc)}) | |
| continue | |
| temp_model_path = Path(model_path).parent / f"{model_name}.pkl" | |
| save_model(model, temp_model_path) | |
| run_id = log_run_to_mlflow( | |
| experiment_name=experiment_name, | |
| model_name=model_name, | |
| params={ | |
| "model_name": model_name, | |
| "test_size": test_size, | |
| "random_state": random_state, | |
| "imbalance_method": imbalance_method, | |
| }, | |
| metrics=metrics, | |
| preprocessor_path=preprocessor_path_obj, | |
| model_temp_path=temp_model_path, | |
| artifact_dir=artifact_dir, | |
| ) | |
| results.append({"model_name": model_name, "model": model, "metrics": metrics, "run_id": run_id}) | |
| if not results: | |
| raise RuntimeError("No models were successfully trained.") | |
| ranked = rank_models(results) | |
| best = ranked[0] | |
| y_test_proba_best = best["model"].predict_proba(prep["X_test"])[:, 1] | |
| threshold_selection = select_best_threshold( | |
| prep["y_test"], | |
| y_test_proba_best, | |
| min_recall=min_recall_target, | |
| min_threshold=threshold_min, | |
| max_threshold=threshold_max, | |
| grid_size=threshold_grid_size, | |
| ) | |
| model_report = { | |
| "timestamp_utc": datetime.now(timezone.utc).isoformat(), | |
| "best_model_name": best["model_name"], | |
| "default_threshold_metrics": best["metrics"], | |
| "threshold_selection": threshold_selection, | |
| "evaluation_summary": { | |
| "test_rows": int(len(prep["y_test"])), | |
| "min_recall_target": min_recall_target, | |
| "selection_reason": threshold_selection["selection_reason"], | |
| }, | |
| } | |
| model_report_path_obj = Path(model_report_path) | |
| model_report_path_obj.parent.mkdir(parents=True, exist_ok=True) | |
| model_report_path_obj.write_text(json.dumps(model_report, indent=2), encoding="utf-8") | |
| final_model_path = save_model(best["model"], model_path) | |
| report = { | |
| "timestamp_utc": datetime.now(timezone.utc).isoformat(), | |
| "experiment_name": experiment_name, | |
| "tracking_uri": tracking_uri, | |
| "data_path": str(data_path), | |
| "preprocessor_path": str(preprocessor_path), | |
| "model_path": str(final_model_path), | |
| "model_report_path": str(model_report_path_obj), | |
| "best_model": { | |
| "model_name": best["model_name"], | |
| "run_id": best["run_id"], | |
| "metrics": best["metrics"], | |
| "selected_threshold": threshold_selection["selected_threshold"], | |
| "threshold_metrics": threshold_selection["selected_metrics"], | |
| }, | |
| "all_results": [ | |
| {"model_name": entry["model_name"], "run_id": entry["run_id"], "metrics": entry["metrics"]} | |
| for entry in ranked | |
| ], | |
| "skipped_models": skipped_models, | |
| } | |
| report_path_obj = Path(report_path) | |
| report_path_obj.parent.mkdir(parents=True, exist_ok=True) | |
| report_path_obj.write_text(json.dumps(report, indent=2), encoding="utf-8") | |
| return report | |
| def _build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser(description="Train fraud model and log to MLflow.") | |
| parser.add_argument("--config", default=str(DEFAULT_CONFIG_PATH), help="Training config YAML path.") | |
| parser.add_argument("--data-path", default=str(DEFAULT_DATA_PATH), help="Dataset CSV path.") | |
| parser.add_argument("--model-path", default=str(DEFAULT_MODEL_PATH), help="Output model artifact path.") | |
| parser.add_argument( | |
| "--preprocessor-path", | |
| default=str(DEFAULT_PREPROCESSOR_PATH), | |
| help="Output preprocessor artifact path.", | |
| ) | |
| parser.add_argument("--report-path", default=str(DEFAULT_REPORT_PATH), help="Training report JSON path.") | |
| parser.add_argument( | |
| "--model-report-path", | |
| default=str(DEFAULT_MODEL_REPORT_PATH), | |
| help="Model evaluation report JSON path.", | |
| ) | |
| return parser | |
| def main() -> None: | |
| args = _build_parser().parse_args() | |
| report = run_training_pipeline( | |
| config_path=args.config, | |
| data_path=args.data_path, | |
| model_path=args.model_path, | |
| preprocessor_path=args.preprocessor_path, | |
| report_path=args.report_path, | |
| model_report_path=args.model_report_path, | |
| ) | |
| best = report["best_model"] | |
| print("Training completed.") | |
| print(f"Best model: {best['model_name']}") | |
| print(f"Selected threshold: {best['selected_threshold']:.4f}") | |
| print(json.dumps(best["threshold_metrics"], indent=2)) | |
| if __name__ == "__main__": | |
| main() | |