|
import math |
|
|
|
import numpy as np |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
import gradio as gr |
|
|
|
from datasets import load_dataset |
|
from sklearn.datasets import fetch_openml |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.metrics import classification_report |
|
|
|
LOGS_DATASET_URI = 'pgurazada1/machine-failure-mlops-demo-logs' |
|
|
|
|
|
|
|
|
|
dataset = fetch_openml(data_id=42890, as_frame=True, parser="auto") |
|
data_df = dataset.data |
|
|
|
target = 'Machine failure' |
|
numeric_features = [ |
|
'Air temperature [K]', |
|
'Process temperature [K]', |
|
'Rotational speed [rpm]', |
|
'Torque [Nm]', |
|
'Tool wear [min]' |
|
] |
|
|
|
categorical_features = ['Type'] |
|
|
|
X = data_df[numeric_features + categorical_features] |
|
y = data_df[target] |
|
|
|
Xtrain, Xtest, ytrain, ytest = train_test_split( |
|
X, y, |
|
test_size=0.2, |
|
random_state=42 |
|
) |
|
|
|
def get_data(): |
|
""" |
|
Connect to the HuggingFace dataset where the logs are stored. |
|
Pull the data into a dataframe |
|
""" |
|
data = load_dataset(LOGS_DATASET_URI) |
|
sample_df = data['train'].to_pandas().sample(100) |
|
|
|
return sample_df |
|
|
|
|
|
def check_model_drift(): |
|
""" |
|
Check proportion of machine failure as compared to |
|
its proportion in training data. If the deviation is more than |
|
2 standard deviations, flag a model drift. |
|
""" |
|
sample_df = get_data() |
|
p_pos_label_training_data = 0.03475 |
|
training_data_size = 8000 |
|
|
|
n_0 = sample_df.prediction.value_counts()[0] |
|
|
|
try: |
|
n_1 = sample_df.prediction.value_counts()[1] |
|
except Exception as e: |
|
n_1 = 0 |
|
|
|
p_pos_label_sample_logs = n_1/(n_0+n_1) |
|
|
|
variance = (p_pos_label_training_data * (1-p_pos_label_training_data))/training_data_size |
|
p_diff = abs(p_pos_label_training_data - p_pos_label_sample_logs) |
|
|
|
if p_diff > 2 * math.sqrt(variance): |
|
return "Model Drift Detected! Check Logs!" |
|
else: |
|
return "No Model Drift!" |
|
|
|
|
|
def plot_target_distributions(): |
|
sample_df = get_data() |
|
|
|
figure, axes = plt.subplots(2, 1, figsize=(9, 7)) |
|
sns.countplot(x=ytrain, stat='proportion', ax=axes[0]) |
|
axes[0].set_title("Distribution of targets in training data") |
|
axes[0].set_xlabel('') |
|
|
|
sns.countplot(x=sample_df.prediction, stat='proportion', ax=axes[1]) |
|
axes[1].set_title("Distribution of predicted targets from the deployed model") |
|
axes[1].set_xlabel('') |
|
|
|
plt.close() |
|
|
|
return figure |
|
|
|
|
|
def psi(actual_proportions, expected_proportions): |
|
|
|
psi_values = (actual_proportions - expected_proportions) * \ |
|
np.log(actual_proportions / expected_proportions) |
|
|
|
return sum(psi_values) |
|
|
|
|
|
def check_data_drift(): |
|
""" |
|
Compare training data features and live features. If the deviation is |
|
more than 2 standard deviations, flag data drift. |
|
Numeric features and catagorical features are dealt with separately. |
|
""" |
|
sample_df = get_data() |
|
data_drift_status = {} |
|
|
|
numeric_features = [ |
|
'Air temperature [K]', |
|
'Process temperature [K]', |
|
'Rotational speed [rpm]', |
|
'Torque [Nm]', |
|
'Tool wear [min]' |
|
] |
|
|
|
categorical_features = ['Type'] |
|
|
|
|
|
|
|
for feature in numeric_features: |
|
mean_feature_training_data = Xtrain[feature].mean() |
|
std_feature_training_data = Xtrain[feature].std() |
|
|
|
mean_feature_sample_logs = sample_df[feature].mean() |
|
|
|
mean_diff = abs(mean_feature_training_data - mean_feature_sample_logs) |
|
|
|
if mean_diff > 2 * std_feature_training_data: |
|
data_drift_status[feature] = ["Data Drift Detected! Check Logs!"] |
|
else: |
|
data_drift_status[feature] = ["No Data Drift!"] |
|
|
|
|
|
|
|
live_proportions = sample_df['Type'].value_counts(normalize=True).values |
|
training_proportions = Xtrain['Type'].value_counts(normalize=True).values |
|
|
|
psi_value = psi(live_proportions, training_proportions) |
|
|
|
if psi_value > 0.1: |
|
data_drift_status['Type'] = ["Data Drift Detected! Check Logs!"] |
|
else: |
|
data_drift_status['Type'] = ["No Data Drift!"] |
|
|
|
return pd.DataFrame.from_dict(data_drift_status) |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Base()) as demo: |
|
gr.Markdown("# Real-time Monitoring Dashboard") |
|
|
|
gr.Markdown("## Model drift detection (every 5 seconds)") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Textbox(check_model_drift, every=5, label="Model Drift Status") |
|
|
|
gr.Markdown("## Distribution of Training Targets") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Plot(plot_target_distributions, every=86400, label="Target Data Distributions") |
|
|
|
gr.Markdown("## Data drift detection (every 5 seconds)") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.DataFrame(check_data_drift, every=5, min_width=240, label="Data Drift Status") |
|
|
|
|
|
demo.queue().launch() |