Rossmann-Store-Sales / scripts /evaluate_model.py
ymlin105's picture
refactor: flatten metrics paths and polish project presentation
275e9d5
# ruff: noqa: E402
import json
import logging
from pathlib import Path
import sys
from typing import Any
import numpy as np
import pandas as pd
import xgboost as xgb
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from src.shared.config import settings
from src.shared.mlflow_utils import start_run
from src.training.data_loader import clean_data, load_raw_data
from src.training.features import apply_feature_pipeline, build_feature_matrix
from src.training.splits import holdout_masks, rolling_date_windows
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
ORIGINAL_CONFIG = {
"n_estimators": 400,
"learning_rate": 0.05,
"max_depth": 8,
"subsample": 0.8,
"colsample_bytree": 0.8,
"random_state": 42,
"n_jobs": -1,
"verbosity": 0,
"objective": "reg:squarederror",
}
def rmspe(y_true: np.ndarray, y_pred: np.ndarray) -> float:
safe_true = np.clip(y_true, a_min=1.0, a_max=None)
return float(np.sqrt(np.mean(np.square((y_true - y_pred) / safe_true))) * 100)
def prepare_dataset() -> tuple[pd.DataFrame, pd.DataFrame, np.ndarray, pd.Series]:
logger.info("Loading Rossmann data for evaluation")
df = load_raw_data(settings.data.train_path, settings.data.store_path)
df = clean_data(df)
df = apply_feature_pipeline(
df,
fourier_period=settings.pipeline.fourier_period,
fourier_order=settings.pipeline.fourier_order,
)
X = build_feature_matrix(df, settings.data.features)
y = df[settings.data.target].to_numpy()
dates = pd.to_datetime(df["Date"]).dt.normalize()
return df, X, y, dates
def baseline_predict(train_df: pd.DataFrame, valid_df: pd.DataFrame) -> np.ndarray:
global_mean = float(train_df["Sales"].mean())
store_dow_mean = train_df.groupby(["Store", "DayOfWeek"])["Sales"].mean()
store_mean = train_df.groupby("Store")["Sales"].mean()
valid_keys = list(zip(valid_df["Store"], valid_df["DayOfWeek"]))
return np.array(
[store_dow_mean.get(key, store_mean.get(key[0], global_mean)) for key in valid_keys],
dtype=float,
)
def fit_and_score(
X_train: pd.DataFrame,
X_valid: pd.DataFrame,
y_train: np.ndarray,
y_valid: np.ndarray,
params: dict[str, Any],
) -> dict[str, float]:
model = xgb.XGBRegressor(**params)
model.fit(X_train, np.log1p(y_train))
train_pred = np.expm1(model.predict(X_train))
valid_pred = np.expm1(model.predict(X_valid))
return {
"train_rmspe": round(rmspe(y_train, train_pred), 4),
"valid_rmspe": round(rmspe(y_valid, valid_pred), 4),
}
def holdout_evaluation(
df: pd.DataFrame,
X: pd.DataFrame,
y: np.ndarray,
dates: pd.Series,
validation_days: int,
) -> dict[str, Any]:
valid_mask, validation_start, validation_end = holdout_masks(dates, validation_days)
train_mask = ~valid_mask
train_df = df.loc[train_mask]
valid_df = df.loc[valid_mask]
y_valid = y[valid_mask]
baseline_score = rmspe(y_valid, baseline_predict(train_df, valid_df))
original_score = fit_and_score(
X.loc[train_mask],
X.loc[valid_mask],
y[train_mask],
y_valid,
ORIGINAL_CONFIG,
)
tuned_score = fit_and_score(
X.loc[train_mask],
X.loc[valid_mask],
y[train_mask],
y_valid,
settings.model_params["xgboost"],
)
return {
"validation_days": validation_days,
"validation_start_date": validation_start.strftime("%Y-%m-%d"),
"validation_end_date": validation_end.strftime("%Y-%m-%d"),
"rows_train": int(train_mask.sum()),
"rows_valid": int(valid_mask.sum()),
"baseline_rmspe": round(baseline_score, 4),
"pre_tuning_model": original_score,
"tuned_model": tuned_score,
"tuned_improvement_vs_pre_tuning": round(
original_score["valid_rmspe"] - tuned_score["valid_rmspe"], 4
),
"tuned_improvement_vs_baseline": round(
baseline_score - tuned_score["valid_rmspe"], 4
),
}
def rolling_backtest(
df: pd.DataFrame,
X: pd.DataFrame,
y: np.ndarray,
dates: pd.Series,
validation_days: int,
windows: int,
) -> list[dict[str, Any]]:
date_windows = rolling_date_windows(dates, validation_days, windows)
results: list[dict[str, Any]] = []
for index, window_dates in enumerate(date_windows, start=1):
train_mask = dates < window_dates.min()
valid_mask = dates.isin(window_dates)
train_df = df.loc[train_mask]
valid_df = df.loc[valid_mask]
y_valid = y[valid_mask]
baseline_score = rmspe(y_valid, baseline_predict(train_df, valid_df))
tuned_score = fit_and_score(
X.loc[train_mask],
X.loc[valid_mask],
y[train_mask],
y_valid,
settings.model_params["xgboost"],
)
results.append(
{
"window": index,
"validation_start_date": window_dates.min().strftime("%Y-%m-%d"),
"validation_end_date": window_dates.max().strftime("%Y-%m-%d"),
"rows_train": int(train_mask.sum()),
"rows_valid": int(valid_mask.sum()),
"baseline_rmspe": round(baseline_score, 4),
"tuned_valid_rmspe": tuned_score["valid_rmspe"],
"improvement_vs_baseline": round(baseline_score - tuned_score["valid_rmspe"], 4),
}
)
return results
def build_summary(holdout: dict[str, Any], backtest: list[dict[str, Any]]) -> dict[str, Any]:
backtest_scores = [window["tuned_valid_rmspe"] for window in backtest]
baseline_scores = [window["baseline_rmspe"] for window in backtest]
return {
"dataset_rows_after_cleaning": holdout["rows_train"] + holdout["rows_valid"],
"holdout": holdout,
"rolling_backtest": backtest,
"rolling_backtest_summary": {
"windows": len(backtest),
"average_tuned_rmspe": round(float(np.mean(backtest_scores)), 4),
"average_baseline_rmspe": round(float(np.mean(baseline_scores)), 4),
"average_improvement_vs_baseline": round(
float(np.mean(np.array(baseline_scores) - np.array(backtest_scores))), 4
),
},
"selected_model_params": settings.model_params["xgboost"],
}
def main() -> Path:
df, X, y, dates = prepare_dataset()
holdout = holdout_evaluation(df, X, y, dates, validation_days=42)
backtest = rolling_backtest(df, X, y, dates, validation_days=42, windows=3)
summary = build_summary(holdout, backtest)
output_path = Path("metrics/model_evaluation.json")
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(summary, f, indent=2)
run_name = f"xgb_backtest_{holdout['validation_start_date']}_{holdout['validation_end_date']}"
with start_run(run_name, experiment_name="rossmann-evaluation") as run:
if run is not None:
import mlflow
mlflow.log_param("validation_days", holdout["validation_days"])
mlflow.log_param("backtest_windows", summary["rolling_backtest_summary"]["windows"])
mlflow.log_metric("holdout_baseline_rmspe", holdout["baseline_rmspe"])
mlflow.log_metric("holdout_tuned_rmspe", holdout["tuned_model"]["valid_rmspe"])
mlflow.log_metric(
"average_backtest_rmspe",
summary["rolling_backtest_summary"]["average_tuned_rmspe"],
)
mlflow.log_metric(
"average_backtest_improvement_vs_baseline",
summary["rolling_backtest_summary"]["average_improvement_vs_baseline"],
)
mlflow.log_artifact(str(output_path))
logger.info("Evaluation summary written to %s", output_path)
return output_path
if __name__ == "__main__":
main()