|
|
"""Fault classification training utilities for PMU and PV datasets. |
|
|
|
|
|
This module trains deep learning models on high-frequency PMU measurements and |
|
|
supports classical machine learning baselines so the resulting artefacts can be |
|
|
served via the Gradio app in this repository or on Hugging Face Spaces. It |
|
|
implements a full training pipeline including preprocessing, sequence |
|
|
generation, model definition (CNN-LSTM, Temporal Convolutional Network, or |
|
|
Support Vector Machine), evaluation, and export of deployment metadata. |
|
|
|
|
|
Example |
|
|
------- |
|
|
python fault_classification_pmu.py \ |
|
|
--data-path data/Fault_Classification_PMU_Data.csv \ |
|
|
--label-column FaultType \ |
|
|
--model-type tcn \ |
|
|
--model-out pmu_tcn_model.keras \ |
|
|
--scaler-out pmu_feature_scaler.pkl \ |
|
|
--metadata-out pmu_metadata.json |
|
|
|
|
|
The script accepts CSV input where each row contains a timestamped PMU |
|
|
measurement and a categorical fault label. Features default to the 14 PMU |
|
|
channels used in the project documentation, but any subset can be provided |
|
|
via the ``--feature-columns`` argument. Data is automatically standardised |
|
|
and windowed to create temporal sequences that feed into the neural network. |
|
|
|
|
|
The exported metadata JSON file contains the feature ordering, label names, |
|
|
sequence length, stride, and chosen architecture. The Gradio front-end |
|
|
consumes this file to replicate the same preprocessing steps during inference. |
|
|
""" |
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import os |
|
|
import shutil |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Sequence, Tuple |
|
|
|
|
|
import math |
|
|
|
|
|
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "-1") |
|
|
os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2") |
|
|
os.environ.setdefault("TF_ENABLE_ONEDNN_OPTS", "0") |
|
|
|
|
|
import joblib |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from pandas.api.types import is_numeric_dtype |
|
|
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix |
|
|
from sklearn.model_selection import train_test_split |
|
|
from sklearn.preprocessing import LabelEncoder, StandardScaler |
|
|
from sklearn.svm import SVC |
|
|
from tensorflow.keras import callbacks, layers, models, optimizers |
|
|
|
|
|
|
|
|
|
|
|
class ProgressCallback(callbacks.Callback): |
|
|
"""Custom callback to provide training progress updates.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
total_epochs, |
|
|
status_file_path=None, |
|
|
*, |
|
|
status_update_interval: float = 10.0, |
|
|
batch_log_frequency: int = 10, |
|
|
): |
|
|
super().__init__() |
|
|
self.total_epochs = total_epochs |
|
|
self.status_file_path = status_file_path |
|
|
self.status_update_interval = max(1.0, float(status_update_interval)) |
|
|
self.batch_log_frequency = max(1, int(batch_log_frequency)) |
|
|
self.current_epoch = 0 |
|
|
self.train_start_time: Optional[float] = None |
|
|
self.last_status_report: Optional[float] = None |
|
|
self.total_batches_per_epoch = 0 |
|
|
self.batches_seen = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _now(self) -> float: |
|
|
import time |
|
|
|
|
|
return time.perf_counter() |
|
|
|
|
|
def _training_elapsed(self, now: Optional[float] = None) -> float: |
|
|
if self.train_start_time is None: |
|
|
return 0.0 |
|
|
if now is None: |
|
|
now = self._now() |
|
|
return max(0.0, now - self.train_start_time) |
|
|
|
|
|
def _report_status(self, message: str, *, force: bool = False) -> None: |
|
|
now = self._now() |
|
|
if not force and self.last_status_report is not None: |
|
|
if now - self.last_status_report < self.status_update_interval: |
|
|
return |
|
|
|
|
|
print(message, flush=True) |
|
|
|
|
|
if self.status_file_path: |
|
|
try: |
|
|
with open(self.status_file_path, "w") as f: |
|
|
f.write(message) |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
self.last_status_report = now |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def on_train_begin(self, logs=None): |
|
|
params = self.params or {} |
|
|
steps = params.get("steps") or params.get("steps_per_epoch") |
|
|
if steps: |
|
|
self.total_batches_per_epoch = int(steps) |
|
|
else: |
|
|
samples = params.get("samples") |
|
|
batch_size = params.get("batch_size") or 0 |
|
|
if samples and batch_size: |
|
|
self.total_batches_per_epoch = math.ceil(samples / batch_size) |
|
|
else: |
|
|
self.total_batches_per_epoch = 0 |
|
|
|
|
|
self.batches_seen = 0 |
|
|
self.last_status_report = None |
|
|
self.train_start_time = self._now() |
|
|
|
|
|
def on_epoch_begin(self, epoch, logs=None): |
|
|
import time |
|
|
|
|
|
now = self._now() |
|
|
if self.train_start_time is None: |
|
|
self.train_start_time = now |
|
|
|
|
|
self.current_epoch = epoch + 1 |
|
|
self.batches_seen = 0 |
|
|
|
|
|
progress_pct = (self.current_epoch / self.total_epochs) * 100 |
|
|
elapsed_time = self._training_elapsed(now) |
|
|
status_msg = ( |
|
|
f"Training epoch {self.current_epoch}/{self.total_epochs} " |
|
|
f"({progress_pct:.1f}%) - {elapsed_time:.1f}s elapsed" |
|
|
) |
|
|
self._report_status(status_msg, force=True) |
|
|
|
|
|
if self.current_epoch == 1: |
|
|
wall_clock = time.strftime("%H:%M:%S") |
|
|
print(f"Starting first epoch at {wall_clock}", flush=True) |
|
|
|
|
|
def on_batch_begin(self, batch, logs=None): |
|
|
if self.current_epoch == 1 and batch % self.batch_log_frequency == 0: |
|
|
elapsed = self._training_elapsed() |
|
|
print(f"Epoch {self.current_epoch}, Batch {batch} started - {elapsed:.1f}s elapsed", flush=True) |
|
|
|
|
|
def on_batch_end(self, batch, logs=None): |
|
|
self.batches_seen = batch + 1 |
|
|
|
|
|
if self.current_epoch == 1 and batch % self.batch_log_frequency == 0: |
|
|
logs = logs or {} |
|
|
loss = logs.get("loss", 0) |
|
|
elapsed = self._training_elapsed() |
|
|
print( |
|
|
f"Epoch {self.current_epoch}, Batch {batch} completed - Loss: {loss:.4f}, {elapsed:.1f}s elapsed", |
|
|
flush=True, |
|
|
) |
|
|
|
|
|
total_batches = self.total_batches_per_epoch or 0 |
|
|
if not total_batches: |
|
|
params = self.params or {} |
|
|
total_batches = ( |
|
|
params.get("steps") |
|
|
or params.get("steps_per_epoch") |
|
|
or 0 |
|
|
) |
|
|
|
|
|
if total_batches: |
|
|
epoch_fraction = min(1.0, (batch + 1) / total_batches) |
|
|
else: |
|
|
epoch_fraction = 0.0 |
|
|
|
|
|
overall_progress = ( |
|
|
(self.current_epoch - 1 + epoch_fraction) / self.total_epochs * 100 |
|
|
) |
|
|
elapsed_time = self._training_elapsed() |
|
|
status_msg = ( |
|
|
f"Epoch {self.current_epoch}/{self.total_epochs} - Batch {batch + 1}/{total_batches or '?'} " |
|
|
f"({overall_progress:.1f}%) - {elapsed_time:.1f}s elapsed" |
|
|
) |
|
|
self._report_status(status_msg) |
|
|
|
|
|
def on_epoch_end(self, epoch, logs=None): |
|
|
logs = logs or {} |
|
|
loss = logs.get("loss", 0) |
|
|
val_loss = logs.get("val_loss", 0) |
|
|
accuracy = logs.get("accuracy", logs.get("acc", 0)) |
|
|
val_accuracy = logs.get("val_accuracy", logs.get("val_acc", 0)) |
|
|
_ = epoch |
|
|
|
|
|
elapsed_time = self._training_elapsed() |
|
|
status_msg = ( |
|
|
f"Epoch {self.current_epoch}/{self.total_epochs} completed - " |
|
|
f"Loss: {loss:.4f}, Val Loss: {val_loss:.4f}, " |
|
|
f"Acc: {accuracy:.4f}, Val Acc: {val_accuracy:.4f} - {elapsed_time:.1f}s total" |
|
|
) |
|
|
self._report_status(status_msg, force=True) |
|
|
|
|
|
def on_train_end(self, logs=None): |
|
|
total_elapsed = self._training_elapsed() |
|
|
final_message = ( |
|
|
f"Training finished after {self.total_epochs} epoch(s) - " |
|
|
f"{total_elapsed:.1f}s total elapsed" |
|
|
) |
|
|
self._report_status(final_message, force=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_FEATURE_COLUMNS: List[str] = [ |
|
|
"[325] UPMU_SUB22:FREQ", |
|
|
"[326] UPMU_SUB22:DFDT", |
|
|
"[327] UPMU_SUB22:FLAG", |
|
|
"[328] UPMU_SUB22-L1:MAG", |
|
|
"[329] UPMU_SUB22-L1:ANG", |
|
|
"[330] UPMU_SUB22-L2:MAG", |
|
|
"[331] UPMU_SUB22-L2:ANG", |
|
|
"[332] UPMU_SUB22-L3:MAG", |
|
|
"[333] UPMU_SUB22-L3:ANG", |
|
|
"[334] UPMU_SUB22-C1:MAG", |
|
|
"[335] UPMU_SUB22-C1:ANG", |
|
|
"[336] UPMU_SUB22-C2:MAG", |
|
|
"[337] UPMU_SUB22-C2:ANG", |
|
|
"[338] UPMU_SUB22-C3:MAG", |
|
|
"[339] UPMU_SUB22-C3:ANG", |
|
|
] |
|
|
|
|
|
LABEL_GUESS_CANDIDATES: Tuple[str, ...] = ("Fault", "FaultType", "Label", "Target", "Class") |
|
|
|
|
|
|
|
|
def _normalise_column_name(name: str) -> str: |
|
|
return str(name).strip().lower() |
|
|
|
|
|
|
|
|
def _resolve_label_column(df: pd.DataFrame, requested: str) -> str: |
|
|
columns = [str(col) for col in df.columns] |
|
|
if not columns: |
|
|
raise ValueError("Provided dataframe does not contain any columns.") |
|
|
|
|
|
requested = str(requested or "").strip() |
|
|
if requested and requested in df.columns: |
|
|
return requested |
|
|
|
|
|
if requested: |
|
|
for col in df.columns: |
|
|
if str(col).strip() == requested: |
|
|
return str(col) |
|
|
lowered = requested.lower() |
|
|
lowered_map = {_normalise_column_name(col): str(col) for col in df.columns} |
|
|
if lowered in lowered_map: |
|
|
return lowered_map[lowered] |
|
|
|
|
|
lowered_map = {_normalise_column_name(col): str(col) for col in df.columns} |
|
|
for guess in LABEL_GUESS_CANDIDATES: |
|
|
key = guess.lower() |
|
|
if key in lowered_map: |
|
|
return lowered_map[key] |
|
|
|
|
|
for col in reversed(df.columns): |
|
|
if not is_numeric_dtype(df[col]): |
|
|
return str(col) |
|
|
|
|
|
available = ", ".join(columns) |
|
|
raise ValueError( |
|
|
f"Label column '{requested or ' '}' not found in provided dataframe. " |
|
|
f"Available columns: {available}" |
|
|
) |
|
|
|
|
|
|
|
|
def _resolve_features(df: pd.DataFrame, feature_columns: Sequence[str] | None, label_column: str) -> List[str]: |
|
|
if feature_columns: |
|
|
missing = [c for c in feature_columns if c not in df.columns] |
|
|
if missing: |
|
|
raise ValueError(f"Feature columns not present in CSV: {missing}") |
|
|
return list(feature_columns) |
|
|
|
|
|
|
|
|
|
|
|
preferred = [c for c in DEFAULT_FEATURE_COLUMNS if c in df.columns] |
|
|
|
|
|
excluded = {label_column, label_column.lower(), "timestamp", "Timestamp"} |
|
|
remainder = [c for c in df.columns if c not in preferred and c not in excluded] |
|
|
ordered = preferred + remainder |
|
|
if not ordered: |
|
|
raise ValueError("No feature columns detected. Specify --feature-columns explicitly.") |
|
|
return ordered |
|
|
|
|
|
|
|
|
def load_dataset( |
|
|
csv_path: Path, |
|
|
*, |
|
|
feature_columns: Sequence[str] | None, |
|
|
label_column: str, |
|
|
) -> Tuple[np.ndarray, np.ndarray, List[str], str]: |
|
|
"""Load the dataset from CSV. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
csv_path: |
|
|
Path to the CSV file containing PMU measurements. |
|
|
feature_columns: |
|
|
Optional explicit ordering of feature columns. |
|
|
label_column: |
|
|
Name of the column containing the categorical fault label. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
features: np.ndarray |
|
|
2-D array of shape (n_samples, n_features). |
|
|
labels: np.ndarray |
|
|
1-D array of label strings. |
|
|
columns: list[str] |
|
|
Actual feature ordering used. |
|
|
resolved_label: str |
|
|
The column name that supplied the labels. |
|
|
""" |
|
|
df = pd.read_csv(csv_path, sep=None, engine="python") |
|
|
resolved_label = _resolve_label_column(df, label_column) |
|
|
|
|
|
columns = _resolve_features(df, feature_columns, resolved_label) |
|
|
features = df[columns].astype(np.float32).values |
|
|
labels = df[resolved_label].astype(str).values |
|
|
return features, labels, columns, resolved_label |
|
|
|
|
|
|
|
|
def load_dataset_from_dataframe( |
|
|
df: pd.DataFrame, |
|
|
*, |
|
|
feature_columns: Sequence[str] | None, |
|
|
label_column: str, |
|
|
) -> Tuple[np.ndarray, np.ndarray, List[str], str]: |
|
|
"""Load dataset arrays directly from a DataFrame.""" |
|
|
|
|
|
resolved_label = _resolve_label_column(df, label_column) |
|
|
|
|
|
columns = _resolve_features(df, feature_columns, resolved_label) |
|
|
features = df[columns].astype(np.float32).values |
|
|
labels = df[resolved_label].astype(str).values |
|
|
return features, labels, columns, resolved_label |
|
|
|
|
|
|
|
|
def create_sequences( |
|
|
features: np.ndarray, |
|
|
labels: np.ndarray, |
|
|
*, |
|
|
sequence_length: int, |
|
|
stride: int, |
|
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Create overlapping sequences suitable for sequence models. |
|
|
|
|
|
The label assigned to a sequence corresponds to the label of the final |
|
|
timestep in the window. This choice aligns with fault detection use cases |
|
|
where the most recent measurement dictates the state of the system. |
|
|
""" |
|
|
if sequence_length <= 0: |
|
|
raise ValueError("sequence_length must be > 0") |
|
|
if stride <= 0: |
|
|
raise ValueError("stride must be > 0") |
|
|
if features.shape[0] != labels.shape[0]: |
|
|
raise ValueError("Features and labels must contain the same number of rows") |
|
|
if features.shape[0] < sequence_length: |
|
|
raise ValueError("Not enough samples to create a single sequence") |
|
|
|
|
|
sequences: List[np.ndarray] = [] |
|
|
seq_labels: List[str] = [] |
|
|
for start in range(0, features.shape[0] - sequence_length + 1, stride): |
|
|
end = start + sequence_length |
|
|
sequences.append(features[start:end]) |
|
|
seq_labels.append(labels[end - 1]) |
|
|
return np.stack(sequences), np.array(seq_labels) |
|
|
|
|
|
|
|
|
def build_cnn_lstm( |
|
|
input_shape: Tuple[int, int], |
|
|
num_classes: int, |
|
|
*, |
|
|
conv_filters: int = 128, |
|
|
kernel_size: int = 3, |
|
|
lstm_units: int = 128, |
|
|
dropout: float = 0.3, |
|
|
) -> models.Model: |
|
|
"""Construct a compact yet expressive CNN-LSTM architecture.""" |
|
|
inputs = layers.Input(shape=input_shape) |
|
|
x = layers.Conv1D(conv_filters, kernel_size, padding="same", activation="relu")(inputs) |
|
|
x = layers.BatchNormalization()(x) |
|
|
x = layers.Conv1D(conv_filters, kernel_size, dilation_rate=2, padding="same", activation="relu")(x) |
|
|
x = layers.BatchNormalization()(x) |
|
|
x = layers.Dropout(dropout)(x) |
|
|
x = layers.LSTM(lstm_units, return_sequences=False)(x) |
|
|
x = layers.Dropout(dropout)(x) |
|
|
outputs = layers.Dense(num_classes, activation="softmax")(x) |
|
|
model = models.Model(inputs, outputs) |
|
|
model.compile( |
|
|
optimizer=optimizers.Adam(learning_rate=1e-3), |
|
|
loss="sparse_categorical_crossentropy", |
|
|
metrics=["accuracy"], |
|
|
) |
|
|
return model |
|
|
|
|
|
|
|
|
def build_tcn( |
|
|
input_shape: Tuple[int, int], |
|
|
num_classes: int, |
|
|
*, |
|
|
filters: int = 64, |
|
|
kernel_size: int = 3, |
|
|
dilations: Sequence[int] = (1, 2, 4, 8), |
|
|
dropout: float = 0.2, |
|
|
) -> models.Model: |
|
|
"""Construct a lightweight Temporal Convolutional Network.""" |
|
|
|
|
|
inputs = layers.Input(shape=input_shape) |
|
|
x = inputs |
|
|
for dilation in dilations: |
|
|
residual = x |
|
|
x = layers.Conv1D( |
|
|
filters, |
|
|
kernel_size, |
|
|
padding="causal", |
|
|
activation="relu", |
|
|
dilation_rate=dilation, |
|
|
)(x) |
|
|
x = layers.BatchNormalization()(x) |
|
|
x = layers.Dropout(dropout)(x) |
|
|
x = layers.Conv1D( |
|
|
filters, |
|
|
kernel_size, |
|
|
padding="causal", |
|
|
activation="relu", |
|
|
dilation_rate=dilation, |
|
|
)(x) |
|
|
x = layers.BatchNormalization()(x) |
|
|
if residual.shape[-1] != filters: |
|
|
residual = layers.Conv1D(filters, 1, padding="same")(residual) |
|
|
x = layers.Add()([x, residual]) |
|
|
x = layers.Activation("relu")(x) |
|
|
|
|
|
x = layers.GlobalAveragePooling1D()(x) |
|
|
x = layers.Dropout(dropout)(x) |
|
|
outputs = layers.Dense(num_classes, activation="softmax")(x) |
|
|
|
|
|
model = models.Model(inputs, outputs) |
|
|
model.compile( |
|
|
optimizer=optimizers.Adam(learning_rate=1e-3), |
|
|
loss="sparse_categorical_crossentropy", |
|
|
metrics=["accuracy"], |
|
|
) |
|
|
return model |
|
|
|
|
|
|
|
|
def train_model( |
|
|
sequences: np.ndarray, |
|
|
labels: np.ndarray, |
|
|
*, |
|
|
validation_split: float, |
|
|
batch_size: int, |
|
|
epochs: int, |
|
|
model_type: str = "cnn_lstm", |
|
|
tensorboard_log_dir: Optional[Path] = None, |
|
|
status_file_path: Optional[Path] = None, |
|
|
) -> Tuple[object, LabelEncoder, Dict[str, object]]: |
|
|
"""Train a sequence model and return training history and validation outputs.""" |
|
|
|
|
|
model_type = model_type.lower().strip() |
|
|
if model_type not in {"cnn_lstm", "tcn", "svm"}: |
|
|
raise ValueError("model_type must be either 'cnn_lstm', 'tcn', or 'svm'") |
|
|
|
|
|
|
|
|
status_file = status_file_path if status_file_path else None |
|
|
|
|
|
label_encoder = LabelEncoder() |
|
|
y = label_encoder.fit_transform(labels) |
|
|
|
|
|
if model_type == "svm": |
|
|
features = sequences.reshape(sequences.shape[0], -1) |
|
|
else: |
|
|
features = sequences |
|
|
|
|
|
tb_dir: Optional[str] = None |
|
|
if model_type != "svm" and tensorboard_log_dir is not None: |
|
|
tensorboard_log_dir.mkdir(parents=True, exist_ok=True) |
|
|
tb_dir = str(tensorboard_log_dir.resolve()) |
|
|
else: |
|
|
tensorboard_log_dir = None |
|
|
|
|
|
|
|
|
unique_labels, label_counts = np.unique(y, return_counts=True) |
|
|
min_samples_per_class = np.min(label_counts) |
|
|
|
|
|
print(f"Label distribution: {dict(zip(unique_labels, label_counts))}") |
|
|
print(f"Minimum samples per class: {min_samples_per_class}") |
|
|
print(f"Total sequences: {len(sequences)}, Features per sequence: {sequences.shape[1:]}") |
|
|
|
|
|
|
|
|
import sys |
|
|
data_size_mb = sequences.nbytes / (1024 * 1024) |
|
|
print(f"Data size: {data_size_mb:.2f} MB") |
|
|
if data_size_mb > 1000: |
|
|
print("Warning: Large dataset detected. Consider reducing batch size or sequence length.") |
|
|
|
|
|
|
|
|
if np.any(np.isnan(sequences)) or np.any(np.isinf(sequences)): |
|
|
print("Warning: NaN or Inf values detected in sequences") |
|
|
sequences = np.nan_to_num(sequences, nan=0.0, posinf=1e6, neginf=-1e6) |
|
|
|
|
|
|
|
|
if min_samples_per_class >= 2: |
|
|
X_train, X_val, y_train, y_val = train_test_split( |
|
|
features, y, test_size=validation_split, stratify=y, random_state=42 |
|
|
) |
|
|
else: |
|
|
print(f"Warning: Some classes have only {min_samples_per_class} sample(s). Using simple random split instead of stratified split.") |
|
|
|
|
|
|
|
|
|
|
|
total_samples = len(y) |
|
|
if validation_split * total_samples < len(unique_labels): |
|
|
|
|
|
adjusted_split = max(0.1, len(unique_labels) / total_samples) |
|
|
adjusted_split = min(adjusted_split, 0.3) |
|
|
print(f"Adjusting validation split from {validation_split} to {adjusted_split}") |
|
|
validation_split = adjusted_split |
|
|
|
|
|
X_train, X_val, y_train, y_val = train_test_split( |
|
|
features, y, test_size=validation_split, random_state=42 |
|
|
) |
|
|
|
|
|
if model_type == "cnn_lstm": |
|
|
print("Building CNN-LSTM model...") |
|
|
|
|
|
|
|
|
if len(sequences) > 100000: |
|
|
print("Using lightweight CNN-LSTM for large dataset") |
|
|
model = build_cnn_lstm( |
|
|
input_shape=sequences.shape[1:], |
|
|
num_classes=len(label_encoder.classes_), |
|
|
conv_filters=64, |
|
|
lstm_units=64, |
|
|
dropout=0.2 |
|
|
) |
|
|
else: |
|
|
model = build_cnn_lstm( |
|
|
input_shape=sequences.shape[1:], num_classes=len(label_encoder.classes_) |
|
|
) |
|
|
print(f"CNN-LSTM model built. Input shape: {sequences.shape[1:]}, Classes: {len(label_encoder.classes_)}") |
|
|
print(f"Model parameters: {model.count_params():,}") |
|
|
|
|
|
|
|
|
if len(sequences) > 100000: |
|
|
callbacks_list = [ |
|
|
ProgressCallback(total_epochs=epochs, status_file_path=str(status_file) if status_file else None), |
|
|
callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2, min_lr=1e-5), |
|
|
callbacks.EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True), |
|
|
] |
|
|
print("Using aggressive callbacks for large dataset") |
|
|
else: |
|
|
callbacks_list = [ |
|
|
ProgressCallback(total_epochs=epochs, status_file_path=str(status_file) if status_file else None), |
|
|
callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5), |
|
|
callbacks.EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True), |
|
|
] |
|
|
if tensorboard_log_dir is not None: |
|
|
callbacks_list.insert(-2, callbacks.TensorBoard(log_dir=tb_dir, histogram_freq=0, write_graph=False)) |
|
|
|
|
|
print(f"Starting CNN-LSTM training with {len(X_train)} training samples, {len(X_val)} validation samples") |
|
|
print(f"Batch size: {batch_size}, Epochs: {epochs}") |
|
|
|
|
|
if status_file: |
|
|
with open(status_file, 'w') as f: |
|
|
f.write(f"CNN-LSTM training started - {len(X_train)} train, {len(X_val)} val samples, batch_size={batch_size}") |
|
|
|
|
|
history = model.fit( |
|
|
X_train, |
|
|
y_train, |
|
|
validation_data=(X_val, y_val), |
|
|
epochs=epochs, |
|
|
batch_size=batch_size, |
|
|
callbacks=callbacks_list, |
|
|
verbose=2, |
|
|
) |
|
|
|
|
|
print("CNN-LSTM training completed, starting prediction...") |
|
|
if status_file: |
|
|
with open(status_file, 'w') as f: |
|
|
f.write("CNN-LSTM training completed, evaluating model...") |
|
|
|
|
|
print(f"Making predictions on {len(X_val)} validation samples...") |
|
|
if status_file: |
|
|
with open(status_file, 'w') as f: |
|
|
f.write(f"Making predictions on {len(X_val)} validation samples...") |
|
|
y_pred = model.predict(X_val, verbose=0).argmax(axis=1) |
|
|
print("Predictions completed") |
|
|
training_history: Dict[str, object] = history.history |
|
|
elif model_type == "tcn": |
|
|
print("Building TCN model...") |
|
|
model = build_tcn(input_shape=sequences.shape[1:], num_classes=len(label_encoder.classes_)) |
|
|
print(f"TCN model built. Input shape: {sequences.shape[1:]}, Classes: {len(label_encoder.classes_)}") |
|
|
|
|
|
callbacks_list = [ |
|
|
ProgressCallback(total_epochs=epochs, status_file_path=str(status_file) if status_file else None), |
|
|
callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5), |
|
|
callbacks.EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True), |
|
|
] |
|
|
if tensorboard_log_dir is not None: |
|
|
callbacks_list.insert(-2, callbacks.TensorBoard(log_dir=tb_dir, histogram_freq=0, write_graph=False)) |
|
|
|
|
|
print(f"Starting TCN training with {len(X_train)} training samples, {len(X_val)} validation samples") |
|
|
print(f"Batch size: {batch_size}, Epochs: {epochs}") |
|
|
|
|
|
if status_file: |
|
|
with open(status_file, 'w') as f: |
|
|
f.write(f"TCN training started - {len(X_train)} train, {len(X_val)} val samples, batch_size={batch_size}") |
|
|
|
|
|
history = model.fit( |
|
|
X_train, |
|
|
y_train, |
|
|
validation_data=(X_val, y_val), |
|
|
epochs=epochs, |
|
|
batch_size=batch_size, |
|
|
callbacks=callbacks_list, |
|
|
verbose=2, |
|
|
) |
|
|
|
|
|
print("TCN training completed, starting prediction...") |
|
|
if status_file: |
|
|
with open(status_file, 'w') as f: |
|
|
f.write("TCN training completed, evaluating model...") |
|
|
|
|
|
print(f"Making TCN predictions on {len(X_val)} validation samples...") |
|
|
if status_file: |
|
|
with open(status_file, 'w') as f: |
|
|
f.write(f"Making TCN predictions on {len(X_val)} validation samples...") |
|
|
y_pred = model.predict(X_val, verbose=0).argmax(axis=1) |
|
|
print("TCN predictions completed") |
|
|
training_history = history.history |
|
|
else: |
|
|
print("Training SVM model...", flush=True) |
|
|
if status_file: |
|
|
with open(status_file, 'w') as f: |
|
|
f.write("Training SVM model...") |
|
|
|
|
|
model = SVC(kernel="rbf", probability=True, class_weight="balanced") |
|
|
model.fit(X_train, y_train) |
|
|
|
|
|
print("SVM training completed. Evaluating...", flush=True) |
|
|
if status_file: |
|
|
with open(status_file, 'w') as f: |
|
|
f.write("SVM training completed. Evaluating...") |
|
|
|
|
|
y_pred = model.predict(X_val) |
|
|
training_history = { |
|
|
"train_accuracy": float(model.score(X_train, y_train)), |
|
|
"val_accuracy": float(accuracy_score(y_val, y_pred)), |
|
|
} |
|
|
|
|
|
cm = confusion_matrix(y_val, y_pred) |
|
|
metrics: Dict[str, object] = { |
|
|
"history": training_history, |
|
|
"validation": { |
|
|
"y_true": y_val, |
|
|
"y_pred": y_pred, |
|
|
"class_names": label_encoder.classes_.tolist(), |
|
|
"confusion_matrix": cm, |
|
|
}, |
|
|
"model_type": model_type, |
|
|
"input_shape": list(sequences.shape[1:]), |
|
|
"tensorboard_log_dir": tb_dir, |
|
|
} |
|
|
return model, label_encoder, metrics |
|
|
|
|
|
|
|
|
def standardise_sequences(sequences: np.ndarray) -> Tuple[np.ndarray, StandardScaler]: |
|
|
"""Apply standard scaling per feature across all timesteps.""" |
|
|
scaler = StandardScaler() |
|
|
flattened = sequences.reshape(-1, sequences.shape[-1]) |
|
|
scaled = scaler.fit_transform(flattened) |
|
|
return scaled.reshape(sequences.shape), scaler |
|
|
|
|
|
|
|
|
def export_artifacts( |
|
|
*, |
|
|
model: object, |
|
|
scaler: StandardScaler, |
|
|
label_encoder: LabelEncoder, |
|
|
feature_columns: Sequence[str], |
|
|
label_column: str, |
|
|
sequence_length: int, |
|
|
stride: int, |
|
|
model_path: Path, |
|
|
scaler_path: Path, |
|
|
metadata_path: Path, |
|
|
metrics: dict, |
|
|
) -> None: |
|
|
"""Persist trained assets to disk for deployment.""" |
|
|
model_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
scaler_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
metadata_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
model_type = str(metrics.get("model_type", "cnn_lstm")) |
|
|
if model_type == "svm": |
|
|
joblib.dump(model, model_path) |
|
|
else: |
|
|
model.save(model_path) |
|
|
joblib.dump(scaler, scaler_path) |
|
|
|
|
|
validation = metrics["validation"] |
|
|
report_dict = classification_report( |
|
|
validation["y_true"], |
|
|
validation["y_pred"], |
|
|
target_names=label_encoder.classes_, |
|
|
output_dict=True, |
|
|
) |
|
|
|
|
|
metadata = { |
|
|
"feature_columns": list(feature_columns), |
|
|
"label_classes": label_encoder.classes_.tolist(), |
|
|
"label_column": label_column, |
|
|
"sequence_length": sequence_length, |
|
|
"stride": stride, |
|
|
"model_path": str(model_path), |
|
|
"scaler_path": str(scaler_path), |
|
|
"training_history": metrics["history"], |
|
|
"classification_report": report_dict, |
|
|
"model_type": model_type, |
|
|
"model_format": "joblib" if model_type == "svm" else "keras", |
|
|
"input_shape": metrics.get("input_shape"), |
|
|
"tensorboard_log_dir": metrics.get("tensorboard_log_dir"), |
|
|
} |
|
|
confusion = validation.get("confusion_matrix") |
|
|
if confusion is None: |
|
|
confusion = confusion_matrix(validation["y_true"], validation["y_pred"]) |
|
|
metadata["confusion_matrix"] = np.asarray(confusion).tolist() |
|
|
|
|
|
metadata_path.write_text(json.dumps(metadata, indent=2)) |
|
|
|
|
|
|
|
|
def train_from_dataframe( |
|
|
df: pd.DataFrame, |
|
|
*, |
|
|
label_column: str, |
|
|
feature_columns: Sequence[str] | None = None, |
|
|
sequence_length: int = 32, |
|
|
stride: int = 4, |
|
|
validation_split: float = 0.2, |
|
|
batch_size: int = 128, |
|
|
epochs: int = 50, |
|
|
model_type: str = "cnn_lstm", |
|
|
model_path: Path | str = "pmu_cnn_lstm_model.keras", |
|
|
scaler_path: Path | str = "pmu_feature_scaler.pkl", |
|
|
metadata_path: Path | str = "pmu_metadata.json", |
|
|
enable_tensorboard: bool = True, |
|
|
tensorboard_root: Path | str | None = None, |
|
|
) -> dict: |
|
|
"""Train a PMU fault classification model using an in-memory dataframe.""" |
|
|
|
|
|
model_path = Path(model_path) |
|
|
scaler_path = Path(scaler_path) |
|
|
metadata_path = Path(metadata_path) |
|
|
|
|
|
|
|
|
status_file = model_path.parent / "training_status.txt" |
|
|
print(f"Training progress will be written to: {status_file}") |
|
|
|
|
|
tensorboard_log_dir: Optional[Path] = None |
|
|
if enable_tensorboard and model_type.lower() != "svm": |
|
|
base_dir = Path(tensorboard_root) if tensorboard_root is not None else Path("tensorboard_runs") |
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") |
|
|
tensorboard_log_dir = base_dir / f"run-{timestamp}" |
|
|
|
|
|
features, labels, used_columns, resolved_label = load_dataset_from_dataframe( |
|
|
df, feature_columns=feature_columns, label_column=label_column |
|
|
) |
|
|
|
|
|
print(f"Input data: {len(features)} samples") |
|
|
print(f"Creating sequences with length={sequence_length}, stride={stride}") |
|
|
|
|
|
sequences, seq_labels = create_sequences( |
|
|
features, |
|
|
labels, |
|
|
sequence_length=sequence_length, |
|
|
stride=stride, |
|
|
) |
|
|
|
|
|
print(f"Generated {len(sequences)} sequences") |
|
|
|
|
|
|
|
|
if len(sequences) < 10: |
|
|
raise ValueError( |
|
|
f"Only {len(sequences)} sequences generated. Need at least 10 for training. " |
|
|
f"Try reducing sequence_length (currently {sequence_length}) or stride (currently {stride}), " |
|
|
"or provide more data." |
|
|
) |
|
|
|
|
|
|
|
|
if len(sequences) < 100 and model_type in ['cnn_lstm', 'tcn']: |
|
|
print(f"Warning: Only {len(sequences)} sequences available. Consider using SVM for small datasets.") |
|
|
|
|
|
sequences, scaler = standardise_sequences(sequences) |
|
|
|
|
|
|
|
|
original_batch_size = batch_size |
|
|
original_epochs = epochs |
|
|
original_validation_split = validation_split |
|
|
|
|
|
|
|
|
if len(sequences) > 100000: |
|
|
print(f"Large dataset detected ({len(sequences)} sequences). Optimizing parameters...") |
|
|
batch_size = min(batch_size * 2, 512) |
|
|
epochs = min(epochs, 30) |
|
|
print(f"Adjusted parameters for large dataset:") |
|
|
print(f" Batch size: {original_batch_size} -> {batch_size}") |
|
|
print(f" Epochs: {original_epochs} -> {epochs}") |
|
|
|
|
|
|
|
|
import gc |
|
|
gc.collect() |
|
|
|
|
|
elif len(sequences) < 100: |
|
|
|
|
|
batch_size = max(min(batch_size, len(sequences) // 4), 4) |
|
|
epochs = min(epochs, 20) |
|
|
validation_split = min(validation_split, 0.3) |
|
|
print(f"Adjusted parameters for small dataset:") |
|
|
print(f" Batch size: {original_batch_size} -> {batch_size}") |
|
|
print(f" Epochs: {original_epochs} -> {epochs}") |
|
|
print(f" Validation split: {original_validation_split} -> {validation_split}") |
|
|
|
|
|
model, label_encoder, metrics = train_model( |
|
|
sequences, |
|
|
seq_labels, |
|
|
validation_split=validation_split, |
|
|
batch_size=batch_size, |
|
|
epochs=epochs, |
|
|
model_type=model_type, |
|
|
tensorboard_log_dir=tensorboard_log_dir, |
|
|
status_file_path=status_file, |
|
|
) |
|
|
|
|
|
export_artifacts( |
|
|
model=model, |
|
|
scaler=scaler, |
|
|
label_encoder=label_encoder, |
|
|
feature_columns=used_columns, |
|
|
label_column=resolved_label, |
|
|
sequence_length=sequence_length, |
|
|
stride=stride, |
|
|
model_path=model_path, |
|
|
scaler_path=scaler_path, |
|
|
metadata_path=metadata_path, |
|
|
metrics=metrics, |
|
|
) |
|
|
|
|
|
tensorboard_zip_path: Optional[str] = None |
|
|
if tensorboard_log_dir and tensorboard_log_dir.exists(): |
|
|
try: |
|
|
tensorboard_zip_path = shutil.make_archive( |
|
|
base_name=str(tensorboard_log_dir.parent / tensorboard_log_dir.name), |
|
|
format="zip", |
|
|
root_dir=str(tensorboard_log_dir.parent), |
|
|
base_dir=tensorboard_log_dir.name, |
|
|
) |
|
|
tensorboard_zip_path = str(Path(tensorboard_zip_path).resolve()) |
|
|
except Exception: |
|
|
tensorboard_zip_path = None |
|
|
|
|
|
report_dict = classification_report( |
|
|
metrics["validation"]["y_true"], |
|
|
metrics["validation"]["y_pred"], |
|
|
target_names=metrics["validation"]["class_names"], |
|
|
output_dict=True, |
|
|
) |
|
|
confusion = metrics["validation"].get("confusion_matrix") |
|
|
if confusion is None: |
|
|
confusion = confusion_matrix(metrics["validation"]["y_true"], metrics["validation"]["y_pred"]) |
|
|
|
|
|
return { |
|
|
"num_samples": int(df.shape[0]), |
|
|
"num_sequences": int(sequences.shape[0]), |
|
|
"feature_columns": used_columns, |
|
|
"class_names": label_encoder.classes_.tolist(), |
|
|
"model_path": str(model_path.resolve()), |
|
|
"scaler_path": str(scaler_path.resolve()), |
|
|
"metadata_path": str(metadata_path.resolve()), |
|
|
"history": metrics["history"], |
|
|
"model_type": metrics.get("model_type", model_type), |
|
|
"classification_report": report_dict, |
|
|
"confusion_matrix": np.asarray(confusion).tolist(), |
|
|
"tensorboard_log_dir": metrics.get("tensorboard_log_dir"), |
|
|
"tensorboard_zip_path": tensorboard_zip_path, |
|
|
"label_column": resolved_label, |
|
|
} |
|
|
|
|
|
|
|
|
def run_training(args: argparse.Namespace) -> None: |
|
|
csv_path = Path(args.data_path) |
|
|
model_out = Path(args.model_out) |
|
|
scaler_out = Path(args.scaler_out) |
|
|
metadata_out = Path(args.metadata_out) |
|
|
|
|
|
features, labels, feature_columns, resolved_label = load_dataset( |
|
|
csv_path, feature_columns=args.feature_columns, label_column=args.label_column |
|
|
) |
|
|
|
|
|
sequences, seq_labels = create_sequences( |
|
|
features, |
|
|
labels, |
|
|
sequence_length=args.sequence_length, |
|
|
stride=args.stride, |
|
|
) |
|
|
|
|
|
sequences, scaler = standardise_sequences(sequences) |
|
|
tensorboard_log_dir: Optional[Path] = None |
|
|
if args.tensorboard and args.model_type != "svm": |
|
|
if args.tensorboard_log_dir: |
|
|
tensorboard_log_dir = Path(args.tensorboard_log_dir) |
|
|
else: |
|
|
tensorboard_log_dir = Path("tensorboard_runs") / datetime.utcnow().strftime("%Y%m%d-%H%M%S") |
|
|
model, label_encoder, metrics = train_model( |
|
|
sequences, |
|
|
seq_labels, |
|
|
validation_split=args.validation_split, |
|
|
batch_size=args.batch_size, |
|
|
epochs=args.epochs, |
|
|
model_type=args.model_type, |
|
|
tensorboard_log_dir=tensorboard_log_dir, |
|
|
status_file_path=None, |
|
|
) |
|
|
|
|
|
export_artifacts( |
|
|
model=model, |
|
|
scaler=scaler, |
|
|
label_encoder=label_encoder, |
|
|
feature_columns=feature_columns, |
|
|
label_column=resolved_label, |
|
|
sequence_length=args.sequence_length, |
|
|
stride=args.stride, |
|
|
model_path=model_out, |
|
|
scaler_path=scaler_out, |
|
|
metadata_path=metadata_out, |
|
|
metrics=metrics, |
|
|
) |
|
|
|
|
|
print("Training complete") |
|
|
print(f"Model architecture : {args.model_type}") |
|
|
print(f"Model saved to : {model_out}") |
|
|
print(f"Scaler saved to : {scaler_out}") |
|
|
print(f"Metadata saved to : {metadata_out}") |
|
|
print("Validation metrics:") |
|
|
report = classification_report( |
|
|
metrics["validation"]["y_true"], metrics["validation"]["y_pred"], target_names=metrics["validation"]["class_names"] |
|
|
) |
|
|
print(report) |
|
|
if metrics.get("tensorboard_log_dir"): |
|
|
tb_dir = metrics["tensorboard_log_dir"] |
|
|
print(f"TensorBoard logs written to: {tb_dir}") |
|
|
print(f"Launch TensorBoard with: tensorboard --logdir \"{tb_dir}\"") |
|
|
|
|
|
|
|
|
def parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: |
|
|
parser = argparse.ArgumentParser(description="Train a sequence model for PMU fault classification") |
|
|
parser.add_argument("--data-path", required=True, help="Path to Fault_Classification_PMU_Data CSV") |
|
|
parser.add_argument( |
|
|
"--label-column", |
|
|
default="Fault", |
|
|
help="Name of the target label column (default: Fault)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--feature-columns", |
|
|
nargs="*", |
|
|
default=None, |
|
|
help="Optional explicit list of feature columns. Defaults to all non-label columns", |
|
|
) |
|
|
parser.add_argument("--sequence-length", type=int, default=32, help="Number of timesteps per training window") |
|
|
parser.add_argument("--stride", type=int, default=4, help="Step size between consecutive windows") |
|
|
parser.add_argument("--validation-split", type=float, default=0.2, help="Validation set fraction") |
|
|
parser.add_argument("--batch-size", type=int, default=128, help="Training batch size") |
|
|
parser.add_argument("--epochs", type=int, default=50, help="Maximum number of training epochs") |
|
|
parser.add_argument( |
|
|
"--model-type", |
|
|
choices=["cnn_lstm", "tcn", "svm"], |
|
|
default="cnn_lstm", |
|
|
help="Model architecture to train (choices: cnn_lstm, tcn, svm)", |
|
|
) |
|
|
parser.add_argument("--model-out", default="pmu_cnn_lstm_model.keras", help="Path to save trained Keras model") |
|
|
parser.add_argument("--scaler-out", default="pmu_feature_scaler.pkl", help="Path to save fitted StandardScaler") |
|
|
parser.add_argument("--metadata-out", default="pmu_metadata.json", help="Path to save metadata JSON") |
|
|
parser.add_argument( |
|
|
"--tensorboard-log-dir", |
|
|
default=None, |
|
|
help="Optional directory to write TensorBoard logs (defaults to tensorboard_runs/<timestamp>)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--no-tensorboard", |
|
|
dest="tensorboard", |
|
|
action="store_false", |
|
|
help="Disable TensorBoard logging for neural network models", |
|
|
) |
|
|
parser.set_defaults(tensorboard=True) |
|
|
return parser.parse_args(argv) |
|
|
|
|
|
|
|
|
def main(argv: Sequence[str] | None = None) -> None: |
|
|
args = parse_args(argv) |
|
|
run_training(args) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|