| | """ |
| | Gradio app for NSL-KDD binary intrusion detection demo (MVP) |
| | Expecting these files in the same repo/root of the Space: |
| | - nsl_kdd_tf_model.h5 (optional; if present will be used) |
| | - scaler.pkl (optional; sklearn StandardScaler, must match model training) |
| | - columns.json (optional; list of feature column names used by the model) |
| | |
| | If artifacts are missing, the app will instruct you how to add them and offers a quick fallback |
| | where you can upload a CSV and the app will train a lightweight sklearn model for demo purposes. |
| | """ |
| |
|
| | import os |
| | import json |
| | import tempfile |
| | import traceback |
| | from typing import Tuple, List |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | import gradio as gr |
| |
|
| | |
| | TF_AVAILABLE = True |
| | try: |
| | import tensorflow as tf |
| | except Exception: |
| | TF_AVAILABLE = False |
| |
|
| | from sklearn.preprocessing import StandardScaler |
| | from sklearn.linear_model import LogisticRegression |
| | import joblib |
| |
|
| | |
| | MODEL_FILE = "nsl_kdd_tf_model.h5" |
| | SCALER_FILE = "scaler.pkl" |
| | COLUMNS_FILE = "columns.json" |
| |
|
| | |
| | def load_artifacts(): |
| | model = None |
| | scaler = None |
| | columns = None |
| | model_type = None |
| |
|
| | |
| | if os.path.exists(COLUMNS_FILE): |
| | with open(COLUMNS_FILE, "r", encoding="utf-8") as f: |
| | columns = json.load(f) |
| |
|
| | |
| | if os.path.exists(SCALER_FILE): |
| | try: |
| | scaler = joblib.load(SCALER_FILE) |
| | except Exception: |
| | try: |
| | scaler = joblib.load(open(SCALER_FILE, "rb")) |
| | except Exception: |
| | scaler = None |
| |
|
| | |
| | if os.path.exists(MODEL_FILE) and TF_AVAILABLE: |
| | try: |
| | model = tf.keras.models.load_model(MODEL_FILE) |
| | model_type = "tensorflow" |
| | except Exception: |
| | model = None |
| |
|
| | return model, scaler, columns, model_type |
| |
|
| | MODEL, SCALER, COLUMNS, MODEL_TYPE = load_artifacts() |
| |
|
| | def model_available_message() -> str: |
| | if MODEL is not None and SCALER is not None and COLUMNS is not None: |
| | return "✅ Pretrained TensorFlow model and artifacts loaded. Ready to predict." |
| | pieces = [] |
| | if MODEL is None: |
| | pieces.append(f"Missing `{MODEL_FILE}`") |
| | if SCALER is None: |
| | pieces.append(f"Missing `{SCALER_FILE}`") |
| | if COLUMNS is None: |
| | pieces.append(f"Missing `{COLUMNS_FILE}`") |
| | msg = "⚠️ Artifacts missing: " + ", ".join(pieces) + ".\n\n" |
| | msg += "To run the TF model, add those files to the Space repository (same folder as app.py).\n" |
| | msg += "Alternatively, upload a CSV of NSL-KDD records (the app will train a quick sklearn model for demo).\n\n" |
| | msg += "columns.json should be a JSON array of feature names that match the model input (same as X_train.columns).\n" |
| | return msg |
| |
|
| | |
| | def prepare_X_from_df(df: pd.DataFrame, expected_columns: List[str], scaler_obj) -> np.ndarray: |
| | |
| | X = df.reindex(columns=expected_columns, fill_value=0) |
| | |
| | X = X.apply(pd.to_numeric, errors="coerce").fillna(0.0) |
| | if scaler_obj is not None: |
| | Xs = scaler_obj.transform(X) |
| | else: |
| | |
| | Xs = X.values.astype(np.float32) |
| | return Xs |
| |
|
| | def predict_batch_from_df(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]: |
| | """ |
| | returns (result_df, status_message) |
| | result_df contains prob and predicted class per row |
| | """ |
| | try: |
| | if MODEL is not None and SCALER is not None and COLUMNS is not None and MODEL_TYPE == "tensorflow": |
| | Xs = prepare_X_from_df(df, COLUMNS, SCALER) |
| | probs = MODEL.predict(Xs).ravel() |
| | preds = (probs >= 0.5).astype(int) |
| | out = df.copy() |
| | out["_pred_prob"] = probs |
| | out["_pred_class"] = preds |
| | return out, "Predictions from TensorFlow model" |
| | else: |
| | |
| | if 'label' in df.columns or 'label_bin' in df.columns: |
| | |
| | |
| | cats = ['protocol_type', 'service', 'flag'] |
| | col_names = df.columns.tolist() |
| | |
| | num_cols = [c for c in col_names if c not in cats + ['label','label_bin']] |
| | X_num = df[num_cols].apply(pd.to_numeric, errors='coerce').fillna(0.0) |
| | X_cat = pd.get_dummies(df[cats], drop_first=True) |
| | X = pd.concat([X_num, X_cat], axis=1) |
| | y = df['label_bin'] if 'label_bin' in df.columns else df['label'].apply(lambda s: 0 if str(s).strip().lower()=="normal" else 1) |
| | |
| | scaler_local = StandardScaler() |
| | Xs = scaler_local.fit_transform(X) |
| | clf = LogisticRegression(max_iter=200) |
| | clf.fit(Xs, y) |
| | probs = clf.predict_proba(Xs)[:,1] |
| | preds = (probs >= 0.5).astype(int) |
| | out = df.copy() |
| | out["_pred_prob"] = probs |
| | out["_pred_class"] = preds |
| | return out, "Trained temporary LogisticRegression on uploaded CSV (used 'label' or 'label_bin' for training)." |
| | else: |
| | return pd.DataFrame(), "Cannot fallback: artifacts missing and uploaded CSV does not contain 'label' or 'label_bin' to train a temporary model." |
| | except Exception as e: |
| | tb = traceback.format_exc() |
| | return pd.DataFrame(), f"Prediction error: {e}\n\n{tb}" |
| |
|
| | def predict_single(sample_text: str) -> str: |
| | """ |
| | sample_text: CSV row or JSON dict representing one row with same columns as columns.json |
| | returns a readable string with probability and class |
| | """ |
| | try: |
| | if not sample_text: |
| | return "No input provided." |
| | |
| | try: |
| | d = json.loads(sample_text) |
| | if isinstance(d, dict): |
| | df = pd.DataFrame([d]) |
| | else: |
| | return "JSON must represent an object/dict for single sample." |
| | except Exception: |
| | |
| | try: |
| | df = pd.read_csv(pd.compat.StringIO(sample_text), header=None) |
| | |
| | if COLUMNS is not None and df.shape[1] == len(COLUMNS): |
| | df.columns = COLUMNS |
| | else: |
| | return "CSV input detected but header/column count mismatch. Prefer JSON object keyed by column names." |
| | except Exception: |
| | return "Could not parse input. Paste a JSON object like {\"duration\":0, \"protocol_type\":\"tcp\", ...} or upload a CSV row with header." |
| |
|
| | |
| | if MODEL is not None and SCALER is not None and COLUMNS is not None and MODEL_TYPE == "tensorflow": |
| | Xs = prepare_X_from_df(df, COLUMNS, SCALER) |
| | prob = float(MODEL.predict(Xs)[0,0]) |
| | pred = int(prob >= 0.5) |
| | return f"Pred prob: {prob:.4f} — predicted class: {pred} (0=normal, 1=attack)" |
| | else: |
| | return "Model artifacts not present in Space. Upload `nsl_kdd_tf_model.h5`, `scaler.pkl`, and `columns.json` to use the TensorFlow model. Alternatively upload a labelled CSV to train a quick demo model." |
| | except Exception as e: |
| | tb = traceback.format_exc() |
| | return f"Error: {e}\n\n{tb}" |
| |
|
| | |
| | with gr.Blocks(title="NSL-KDD Intrusion Detection — Demo MVP") as demo: |
| | gr.Markdown("# NSL-KDD Intrusion Detection — Demo (MVP)\n" |
| | "Upload your artifacts (`nsl_kdd_tf_model.h5`, `scaler.pkl`, `columns.json`) to the Space to use the TensorFlow model.\n" |
| | "Or upload a labelled CSV (contains `label` or `label_bin`) and the app will train a quick logistic regression for demo.\n\n" |
| | "Columns expected: the original notebook used 41 numeric features with one-hot for `protocol_type`, `service`, `flag`.\n" |
| | ) |
| | status = gr.Textbox(label="Status / Artifact check", value=model_available_message(), interactive=False) |
| | with gr.Row(): |
| | with gr.Column(scale=2): |
| | file_input = gr.File(label="Upload CSV for batch prediction or for training fallback", file_types=['.csv']) |
| | sample_input = gr.Textbox(label="Single-sample input (JSON object)", placeholder='{"duration":0, "protocol_type":"tcp", ...}', lines=6) |
| | predict_button = gr.Button("Predict single sample") |
| | batch_button = gr.Button("Run batch (on uploaded CSV)") |
| |
|
| | with gr.Column(scale=1): |
| | out_table = gr.Dataframe(label="Batch predictions (if any)") |
| |
|
| | single_out = gr.Textbox(label="Single sample result", interactive=False) |
| |
|
| | |
| | example_text = json.dumps({ |
| | "duration": 0, |
| | "protocol_type": "tcp", |
| | "service": "http", |
| | "flag": "SF", |
| | "src_bytes": 181, |
| | "dst_bytes": 5450 |
| | }, indent=2) |
| | gr.Markdown("**Example single-sample JSON (fill in more NSL-KDD fields if you have them):**") |
| | gr.Code(example_text, language="json") |
| |
|
| | |
| | def on_predict_single(sample_text): |
| | return predict_single(sample_text) |
| |
|
| | def on_batch_predict(file_obj): |
| | if file_obj is None: |
| | return pd.DataFrame(), "No file uploaded." |
| | try: |
| | |
| | df = pd.read_csv(file_obj.name) |
| | except Exception: |
| | try: |
| | |
| | df = pd.read_csv(file_obj) |
| | except Exception as e: |
| | return pd.DataFrame(), f"Could not read CSV: {e}" |
| |
|
| | out_df, msg = predict_batch_from_df(df) |
| | if out_df.empty: |
| | return pd.DataFrame(), msg |
| | |
| | display_df = out_df.copy() |
| | |
| | for c in ["_pred_prob", "_pred_class"]: |
| | if c in display_df.columns: |
| | cols = [c] + [x for x in display_df.columns if x != c] |
| | display_df = display_df[cols] |
| | return display_df, msg |
| |
|
| | predict_button.click(on_predict_single, inputs=[sample_input], outputs=[single_out]) |
| | batch_button.click(on_batch_predict, inputs=[file_input], outputs=[out_table, status]) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |
| |
|