Spaces:
Running
Running
| """ | |
| Advanced Cybersecurity Model Trainer | |
| Comprehensive training module for security ML models | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold | |
| from sklearn.ensemble import ( | |
| RandomForestClassifier, | |
| GradientBoostingClassifier, | |
| AdaBoostClassifier, | |
| ExtraTreesClassifier, | |
| VotingClassifier, | |
| StackingClassifier | |
| ) | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.svm import SVC | |
| from sklearn.neural_network import MLPClassifier | |
| from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler | |
| from sklearn.metrics import ( | |
| classification_report, | |
| confusion_matrix, | |
| roc_auc_score, | |
| precision_recall_curve, | |
| f1_score, | |
| accuracy_score | |
| ) | |
| from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif | |
| from sklearn.decomposition import PCA | |
| import joblib | |
| import json | |
| from datetime import datetime | |
| from pathlib import Path | |
| import logging | |
| from typing import Dict, List, Any, Optional, Tuple | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| logger = logging.getLogger(__name__) | |
| class CyberSecurityNeuralNet(nn.Module): | |
| """Deep Neural Network for Cybersecurity Classification""" | |
| def __init__(self, input_size: int, hidden_sizes: List[int], num_classes: int, dropout: float = 0.3): | |
| super().__init__() | |
| layers = [] | |
| prev_size = input_size | |
| for hidden_size in hidden_sizes: | |
| layers.extend([ | |
| nn.Linear(prev_size, hidden_size), | |
| nn.BatchNorm1d(hidden_size), | |
| nn.ReLU(), | |
| nn.Dropout(dropout) | |
| ]) | |
| prev_size = hidden_size | |
| layers.append(nn.Linear(prev_size, num_classes)) | |
| self.network = nn.Sequential(*layers) | |
| def forward(self, x): | |
| return self.network(x) | |
| class AdvancedSecurityTrainer: | |
| """Advanced trainer for cybersecurity models with multiple algorithms""" | |
| def __init__(self, models_dir: str = "./trained_models"): | |
| self.models_dir = Path(models_dir) | |
| self.models_dir.mkdir(exist_ok=True) | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.trained_models = {} | |
| self.training_history = [] | |
| def preprocess_security_data( | |
| self, | |
| df: pd.DataFrame, | |
| target_col: str, | |
| feature_selection: bool = True, | |
| n_features: int = 50 | |
| ) -> Tuple[np.ndarray, np.ndarray, StandardScaler, LabelEncoder, List[str]]: | |
| """Preprocess security data with advanced feature engineering""" | |
| # Separate features and target | |
| X = df.drop(columns=[target_col]) | |
| y = df[target_col] | |
| # Store original feature names | |
| feature_names = list(X.columns) | |
| # Handle categorical features | |
| categorical_cols = X.select_dtypes(include=['object', 'category']).columns | |
| for col in categorical_cols: | |
| le = LabelEncoder() | |
| X[col] = le.fit_transform(X[col].astype(str)) | |
| # Handle missing values | |
| X = X.fillna(X.median()) | |
| # Encode target if categorical | |
| label_encoder = LabelEncoder() | |
| if y.dtype == 'object' or y.dtype.name == 'category': | |
| y = label_encoder.fit_transform(y) | |
| else: | |
| y = y.values | |
| # Scale features | |
| scaler = StandardScaler() | |
| X_scaled = scaler.fit_transform(X) | |
| # Feature selection | |
| if feature_selection and X_scaled.shape[1] > n_features: | |
| selector = SelectKBest(mutual_info_classif, k=min(n_features, X_scaled.shape[1])) | |
| X_scaled = selector.fit_transform(X_scaled, y) | |
| selected_indices = selector.get_support(indices=True) | |
| feature_names = [feature_names[i] for i in selected_indices] | |
| return X_scaled, y, scaler, label_encoder, feature_names | |
| def train_ensemble_model( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| model_name: str = "ensemble" | |
| ) -> Tuple[Any, Dict[str, float]]: | |
| """Train an ensemble of classifiers""" | |
| logger.info("Training ensemble model...") | |
| # Base estimators | |
| estimators = [ | |
| ('rf', RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)), | |
| ('gb', GradientBoostingClassifier(n_estimators=100, random_state=42)), | |
| ('et', ExtraTreesClassifier(n_estimators=100, random_state=42, n_jobs=-1)), | |
| ] | |
| # Voting classifier | |
| voting_clf = VotingClassifier(estimators=estimators, voting='soft') | |
| voting_clf.fit(X_train, y_train) | |
| # Evaluate | |
| y_pred = voting_clf.predict(X_test) | |
| y_proba = voting_clf.predict_proba(X_test) | |
| metrics = self._calculate_metrics(y_test, y_pred, y_proba) | |
| # Save model | |
| model_path = self.models_dir / f"{model_name}_ensemble.pkl" | |
| joblib.dump(voting_clf, model_path) | |
| logger.info(f"Ensemble model trained with accuracy: {metrics['accuracy']:.4f}") | |
| return voting_clf, metrics | |
| def train_stacking_model( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| model_name: str = "stacking" | |
| ) -> Tuple[Any, Dict[str, float]]: | |
| """Train a stacking classifier""" | |
| logger.info("Training stacking model...") | |
| # Base estimators | |
| estimators = [ | |
| ('rf', RandomForestClassifier(n_estimators=50, random_state=42)), | |
| ('gb', GradientBoostingClassifier(n_estimators=50, random_state=42)), | |
| ('svm', SVC(probability=True, random_state=42)), | |
| ] | |
| # Stacking classifier with logistic regression meta-learner | |
| stacking_clf = StackingClassifier( | |
| estimators=estimators, | |
| final_estimator=LogisticRegression(random_state=42), | |
| cv=3 | |
| ) | |
| stacking_clf.fit(X_train, y_train) | |
| # Evaluate | |
| y_pred = stacking_clf.predict(X_test) | |
| y_proba = stacking_clf.predict_proba(X_test) | |
| metrics = self._calculate_metrics(y_test, y_pred, y_proba) | |
| # Save model | |
| model_path = self.models_dir / f"{model_name}_stacking.pkl" | |
| joblib.dump(stacking_clf, model_path) | |
| logger.info(f"Stacking model trained with accuracy: {metrics['accuracy']:.4f}") | |
| return stacking_clf, metrics | |
| def train_neural_network( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| hidden_sizes: List[int] = [256, 128, 64], | |
| epochs: int = 100, | |
| batch_size: int = 32, | |
| learning_rate: float = 0.001, | |
| model_name: str = "neural_net" | |
| ) -> Tuple[nn.Module, Dict[str, float]]: | |
| """Train a deep neural network""" | |
| logger.info(f"Training neural network on {self.device}...") | |
| # Convert to tensors | |
| X_train_tensor = torch.FloatTensor(X_train).to(self.device) | |
| y_train_tensor = torch.LongTensor(y_train).to(self.device) | |
| X_test_tensor = torch.FloatTensor(X_test).to(self.device) | |
| y_test_tensor = torch.LongTensor(y_test).to(self.device) | |
| # Create data loader | |
| train_dataset = TensorDataset(X_train_tensor, y_train_tensor) | |
| train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) | |
| # Initialize model | |
| num_classes = len(np.unique(y_train)) | |
| model = CyberSecurityNeuralNet( | |
| input_size=X_train.shape[1], | |
| hidden_sizes=hidden_sizes, | |
| num_classes=num_classes | |
| ).to(self.device) | |
| # Loss and optimizer | |
| criterion = nn.CrossEntropyLoss() | |
| optimizer = optim.Adam(model.parameters(), lr=learning_rate) | |
| scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10) | |
| # Training loop | |
| best_accuracy = 0 | |
| for epoch in range(epochs): | |
| model.train() | |
| total_loss = 0 | |
| for batch_X, batch_y in train_loader: | |
| optimizer.zero_grad() | |
| outputs = model(batch_X) | |
| loss = criterion(outputs, batch_y) | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| # Validation | |
| model.eval() | |
| with torch.no_grad(): | |
| test_outputs = model(X_test_tensor) | |
| test_loss = criterion(test_outputs, y_test_tensor) | |
| _, predicted = torch.max(test_outputs, 1) | |
| accuracy = (predicted == y_test_tensor).float().mean().item() | |
| scheduler.step(test_loss) | |
| if accuracy > best_accuracy: | |
| best_accuracy = accuracy | |
| torch.save(model.state_dict(), self.models_dir / f"{model_name}_nn_best.pt") | |
| if (epoch + 1) % 20 == 0: | |
| logger.info(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}, Accuracy: {accuracy:.4f}") | |
| # Final evaluation | |
| model.eval() | |
| with torch.no_grad(): | |
| outputs = model(X_test_tensor) | |
| _, y_pred = torch.max(outputs, 1) | |
| y_pred = y_pred.cpu().numpy() | |
| y_proba = torch.softmax(outputs, dim=1).cpu().numpy() | |
| metrics = self._calculate_metrics(y_test, y_pred, y_proba) | |
| logger.info(f"Neural network trained with accuracy: {metrics['accuracy']:.4f}") | |
| return model, metrics | |
| def train_all_models( | |
| self, | |
| df: pd.DataFrame, | |
| target_col: str, | |
| model_name: str, | |
| test_size: float = 0.2 | |
| ) -> Dict[str, Any]: | |
| """Train all available model types and return best performing""" | |
| logger.info(f"Starting comprehensive training for {model_name}...") | |
| # Preprocess data | |
| X, y, scaler, label_encoder, feature_names = self.preprocess_security_data(df, target_col) | |
| # Split data | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=test_size, random_state=42, stratify=y | |
| ) | |
| results = {} | |
| # Train individual models | |
| models_to_train = [ | |
| ("random_forest", RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)), | |
| ("gradient_boosting", GradientBoostingClassifier(n_estimators=100, random_state=42)), | |
| ("extra_trees", ExtraTreesClassifier(n_estimators=100, random_state=42, n_jobs=-1)), | |
| ("logistic_regression", LogisticRegression(random_state=42, max_iter=1000)), | |
| ("mlp", MLPClassifier(hidden_layer_sizes=(128, 64), random_state=42, max_iter=500)), | |
| ] | |
| for name, model in models_to_train: | |
| try: | |
| logger.info(f"Training {name}...") | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_test) | |
| y_proba = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else None | |
| metrics = self._calculate_metrics(y_test, y_pred, y_proba) | |
| results[name] = { | |
| "model": model, | |
| "metrics": metrics | |
| } | |
| # Save model | |
| model_path = self.models_dir / f"{model_name}_{name}.pkl" | |
| joblib.dump(model, model_path) | |
| except Exception as e: | |
| logger.error(f"Failed to train {name}: {e}") | |
| results[name] = {"error": str(e)} | |
| # Train ensemble | |
| try: | |
| ensemble_model, ensemble_metrics = self.train_ensemble_model( | |
| X_train, y_train, X_test, y_test, model_name | |
| ) | |
| results["ensemble"] = { | |
| "model": ensemble_model, | |
| "metrics": ensemble_metrics | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to train ensemble: {e}") | |
| # Train stacking | |
| try: | |
| stacking_model, stacking_metrics = self.train_stacking_model( | |
| X_train, y_train, X_test, y_test, model_name | |
| ) | |
| results["stacking"] = { | |
| "model": stacking_model, | |
| "metrics": stacking_metrics | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to train stacking: {e}") | |
| # Find best model | |
| best_model_name = None | |
| best_accuracy = 0 | |
| for name, result in results.items(): | |
| if "metrics" in result and result["metrics"]["accuracy"] > best_accuracy: | |
| best_accuracy = result["metrics"]["accuracy"] | |
| best_model_name = name | |
| # Save preprocessing artifacts | |
| joblib.dump(scaler, self.models_dir / f"{model_name}_scaler.pkl") | |
| joblib.dump(label_encoder, self.models_dir / f"{model_name}_label_encoder.pkl") | |
| # Save metadata | |
| metadata = { | |
| "model_name": model_name, | |
| "target_column": target_col, | |
| "feature_names": feature_names, | |
| "num_features": len(feature_names), | |
| "num_samples": len(df), | |
| "num_classes": len(np.unique(y)), | |
| "best_model": best_model_name, | |
| "best_accuracy": best_accuracy, | |
| "all_results": { | |
| name: result.get("metrics", {"error": result.get("error")}) | |
| for name, result in results.items() | |
| }, | |
| "created_at": datetime.now().isoformat() | |
| } | |
| with open(self.models_dir / f"{model_name}_metadata.json", 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| logger.info(f"Training complete. Best model: {best_model_name} with accuracy: {best_accuracy:.4f}") | |
| return { | |
| "results": results, | |
| "metadata": metadata, | |
| "scaler": scaler, | |
| "label_encoder": label_encoder, | |
| "feature_names": feature_names | |
| } | |
| def _calculate_metrics( | |
| self, | |
| y_true: np.ndarray, | |
| y_pred: np.ndarray, | |
| y_proba: Optional[np.ndarray] = None | |
| ) -> Dict[str, float]: | |
| """Calculate comprehensive metrics""" | |
| metrics = { | |
| "accuracy": float(accuracy_score(y_true, y_pred)), | |
| "f1_weighted": float(f1_score(y_true, y_pred, average='weighted')), | |
| "f1_macro": float(f1_score(y_true, y_pred, average='macro')), | |
| } | |
| # ROC AUC for binary or multi-class | |
| if y_proba is not None: | |
| try: | |
| if len(np.unique(y_true)) == 2: | |
| metrics["roc_auc"] = float(roc_auc_score(y_true, y_proba[:, 1])) | |
| else: | |
| metrics["roc_auc"] = float(roc_auc_score(y_true, y_proba, multi_class='ovr')) | |
| except: | |
| pass | |
| return metrics | |
| # Convenience function for Gradio interface | |
| def train_comprehensive_model( | |
| file_path: str, | |
| target_column: str, | |
| model_name: str, | |
| test_size: float = 0.2 | |
| ) -> Dict[str, Any]: | |
| """Train comprehensive models from file path""" | |
| # Load dataset | |
| if file_path.endswith('.csv'): | |
| df = pd.read_csv(file_path) | |
| elif file_path.endswith('.json'): | |
| df = pd.read_json(file_path) | |
| elif file_path.endswith('.parquet'): | |
| df = pd.read_parquet(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file format: {file_path}") | |
| # Initialize trainer | |
| trainer = AdvancedSecurityTrainer() | |
| # Train all models | |
| results = trainer.train_all_models(df, target_column, model_name, test_size) | |
| return results | |