Spaces:
Sleeping
Sleeping
| """ | |
| Feature engineering and preprocessing pipeline. | |
| Key design decisions | |
| -------------------- | |
| * CUSTOMERS.AFFECTED and OUTAGE.DURATION are **removed** from the feature | |
| matrix because they directly define the target (data leakage). | |
| * Weather is proxied via ANOMALY.LEVEL (Oceanic Niño Index) which is already | |
| in the dataset — no external API needed. | |
| * Engineered features are derived solely from existing columns. | |
| * The sklearn ColumnTransformer is serialised so inference uses the exact | |
| same transformations. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from typing import Tuple | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.compose import ColumnTransformer | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.preprocessing import OneHotEncoder, StandardScaler | |
| from src.config import ( | |
| ARTIFACTS_DIR, | |
| CATEGORICAL_FEATURES, | |
| FEATURE_NAMES_FILE, | |
| LEAK_COLUMNS, | |
| NUMERIC_FEATURES, | |
| PREPROCESSOR_FILE, | |
| RANDOM_STATE, | |
| TARGET_COL, | |
| TEST_SIZE, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def engineer_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """Derive additional columns from raw data — no external sources.""" | |
| df = df.copy() | |
| # Demand-to-customer ratio: MW lost per thousand customers | |
| if "DEMAND.LOSS.MW" in df.columns and "TOTAL.CUSTOMERS" in df.columns: | |
| df["demand_per_1k_cust"] = ( | |
| df["DEMAND.LOSS.MW"] / (df["TOTAL.CUSTOMERS"] / 1_000) | |
| ).replace([np.inf, -np.inf], np.nan) | |
| # Residential share of total sales (economic structure proxy) | |
| if "RES.SALES" in df.columns and "TOTAL.SALES" in df.columns: | |
| df["res_sales_share"] = ( | |
| df["RES.SALES"] / df["TOTAL.SALES"] | |
| ).replace([np.inf, -np.inf], np.nan) | |
| # Price spread: industrial vs residential (rate design signal) | |
| if "RES.PRICE" in df.columns and "IND.PRICE" in df.columns: | |
| df["price_spread_res_ind"] = df["RES.PRICE"] - df["IND.PRICE"] | |
| # Population density contrast (urban vs rural stress indicator) | |
| if "POPDEN_URBAN" in df.columns and "POPDEN_RURAL" in df.columns: | |
| df["urban_rural_density_ratio"] = ( | |
| df["POPDEN_URBAN"] / df["POPDEN_RURAL"].replace(0, np.nan) | |
| ).replace([np.inf, -np.inf], np.nan) | |
| # Season bucket from MONTH | |
| if "MONTH" in df.columns: | |
| month = df["MONTH"].astype(float) | |
| df["season"] = pd.cut( | |
| month, | |
| bins=[0, 3, 6, 9, 12], | |
| labels=["winter", "spring", "summer", "fall"], | |
| include_lowest=True, | |
| ).astype(str) | |
| return df | |
| # Additional engineered numeric & categorical columns ---------------------- | |
| _ENGINEERED_NUMERIC = [ | |
| "demand_per_1k_cust", | |
| "res_sales_share", | |
| "price_spread_res_ind", | |
| "urban_rural_density_ratio", | |
| ] | |
| _ENGINEERED_CATEGORICAL = ["season"] | |
| def _resolve_columns(df: pd.DataFrame) -> Tuple[list[str], list[str]]: | |
| """Return (numeric_cols, categorical_cols) present in df, minus leakage.""" | |
| available = set(df.columns) | |
| leak = set(LEAK_COLUMNS) | |
| num = [c for c in NUMERIC_FEATURES + _ENGINEERED_NUMERIC if c in available and c not in leak] | |
| cat = [c for c in CATEGORICAL_FEATURES + _ENGINEERED_CATEGORICAL if c in available] | |
| return num, cat | |
| def build_preprocessor(num_cols: list[str], cat_cols: list[str]) -> ColumnTransformer: | |
| """Construct a ColumnTransformer that handles imputation + encoding.""" | |
| numeric_pipe = Pipeline([ | |
| ("imputer", SimpleImputer(strategy="median")), | |
| ("scaler", StandardScaler()), | |
| ]) | |
| categorical_pipe = Pipeline([ | |
| ("imputer", SimpleImputer(strategy="constant", fill_value="MISSING")), | |
| ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), | |
| ]) | |
| preprocessor = ColumnTransformer( | |
| transformers=[ | |
| ("num", numeric_pipe, num_cols), | |
| ("cat", categorical_pipe, cat_cols), | |
| ], | |
| remainder="drop", | |
| ) | |
| return preprocessor | |
| def prepare_splits( | |
| df: pd.DataFrame, | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, ColumnTransformer, list[str]]: | |
| """ | |
| Full feature-engineering → split → fit-transform workflow. | |
| Returns | |
| ------- | |
| X_train, X_test, y_train, y_test, fitted_preprocessor, feature_names | |
| """ | |
| df = engineer_features(df) | |
| num_cols, cat_cols = _resolve_columns(df) | |
| logger.info("Feature columns — numeric: %d, categorical: %d", len(num_cols), len(cat_cols)) | |
| X = df[num_cols + cat_cols] | |
| y = df[TARGET_COL].values | |
| X_train_raw, X_test_raw, y_train, y_test = train_test_split( | |
| X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y | |
| ) | |
| preprocessor = build_preprocessor(num_cols, cat_cols) | |
| X_train = preprocessor.fit_transform(X_train_raw) | |
| X_test = preprocessor.transform(X_test_raw) | |
| # Recover human-readable feature names | |
| ohe = preprocessor.named_transformers_["cat"].named_steps["encoder"] | |
| cat_feature_names = list(ohe.get_feature_names_out(cat_cols)) | |
| feature_names = num_cols + cat_feature_names | |
| # Persist preprocessor + feature names | |
| import joblib | |
| joblib.dump(preprocessor, ARTIFACTS_DIR / PREPROCESSOR_FILE) | |
| with open(ARTIFACTS_DIR / FEATURE_NAMES_FILE, "w") as f: | |
| json.dump(feature_names, f) | |
| logger.info("Train: %d samples | Test: %d samples", X_train.shape[0], X_test.shape[0]) | |
| return X_train, X_test, y_train, y_test, preprocessor, feature_names | |