import math import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import gradio as gr from datasets import load_dataset from sklearn.datasets import fetch_openml from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report LOGS_DATASET_URI = 'pgurazada1/machine-failure-mlops-demo-logs' # Load and cache training data dataset = fetch_openml(data_id=42890, as_frame=True, parser="auto") data_df = dataset.data target = 'Machine failure' numeric_features = [ 'Air temperature [K]', 'Process temperature [K]', 'Rotational speed [rpm]', 'Torque [Nm]', 'Tool wear [min]' ] categorical_features = ['Type'] X = data_df[numeric_features + categorical_features] y = data_df[target] Xtrain, Xtest, ytrain, ytest = train_test_split( X, y, test_size=0.2, random_state=42 ) def get_data(): """ Connect to the HuggingFace dataset where the logs are stored. Pull the data into a dataframe """ data = load_dataset(LOGS_DATASET_URI) sample_df = data['train'].to_pandas().sample(100) return sample_df def check_model_drift(): """ Check proportion of machine failure as compared to its proportion in training data. If the deviation is more than 2 standard deviations, flag a model drift. """ sample_df = get_data() p_pos_label_training_data = 0.03475 training_data_size = 8000 n_0 = sample_df.prediction.value_counts()[0] try: n_1 = sample_df.prediction.value_counts()[1] except Exception as e: n_1 = 0 p_pos_label_sample_logs = n_1/(n_0+n_1) variance = (p_pos_label_training_data * (1-p_pos_label_training_data))/training_data_size p_diff = abs(p_pos_label_training_data - p_pos_label_sample_logs) if p_diff > 2 * math.sqrt(variance): return "Model Drift Detected! Check Logs!" else: return "No Model Drift!" def plot_target_distributions(): sample_df = get_data() figure, axes = plt.subplots(2, 1, figsize=(9, 7)) sns.countplot(x=ytrain, stat='proportion', ax=axes[0]) axes[0].set_title("Distribution of targets in training data") axes[0].set_xlabel('') sns.countplot(x=sample_df.prediction, stat='proportion', ax=axes[1]) axes[1].set_title("Distribution of predicted targets from the deployed model") axes[1].set_xlabel('') plt.close() return figure def psi(actual_proportions, expected_proportions): psi_values = (actual_proportions - expected_proportions) * \ np.log(actual_proportions / expected_proportions) return sum(psi_values) def check_data_drift(): """ Compare training data features and live features. If the deviation is more than 2 standard deviations, flag data drift. Numeric features and catagorical features are dealt with separately. """ sample_df = get_data() data_drift_status = {} numeric_features = [ 'Air temperature [K]', 'Process temperature [K]', 'Rotational speed [rpm]', 'Torque [Nm]', 'Tool wear [min]' ] categorical_features = ['Type'] # Numeric features for feature in numeric_features: mean_feature_training_data = Xtrain[feature].mean() std_feature_training_data = Xtrain[feature].std() mean_feature_sample_logs = sample_df[feature].mean() mean_diff = abs(mean_feature_training_data - mean_feature_sample_logs) if mean_diff > 2 * std_feature_training_data: data_drift_status[feature] = ["Data Drift Detected! Check Logs!"] else: data_drift_status[feature] = ["No Data Drift!"] # Categorical feature Type live_proportions = sample_df['Type'].value_counts(normalize=True).values training_proportions = Xtrain['Type'].value_counts(normalize=True).values psi_value = psi(live_proportions, training_proportions) if psi_value > 0.1: data_drift_status['Type'] = ["Data Drift Detected! Check Logs!"] else: data_drift_status['Type'] = ["No Data Drift!"] return pd.DataFrame.from_dict(data_drift_status) with gr.Blocks(theme=gr.themes.Base()) as demo: gr.Markdown("# Real-time Monitoring Dashboard") gr.Markdown("## Model drift detection (every 5 seconds)") with gr.Row(): with gr.Column(): gr.Textbox(check_model_drift, every=5, label="Model Drift Status") gr.Markdown("## Distribution of Training Targets") with gr.Row(): with gr.Column(): gr.Plot(plot_target_distributions, every=86400, label="Target Data Distributions") gr.Markdown("## Data drift detection (every 5 seconds)") with gr.Row(): with gr.Column(): gr.DataFrame(check_data_drift, every=5, min_width=240, label="Data Drift Status") demo.queue().launch()