Spaces:
Sleeping
Sleeping
import numpy as np | |
from tensorflow.keras.models import load_model | |
import joblib | |
class RTUAnomalizer: | |
model = None | |
kmeans_models = [] | |
def __init__( | |
self, | |
prediction_model_path=None, | |
clustering_model_paths=None, | |
num_inputs=None, | |
num_outputs=None, | |
): | |
self.num_inputs = num_inputs | |
self.num_outputs = num_outputs | |
if not prediction_model_path is None and not clustering_model_paths is None: | |
self.load_models(prediction_model_path, clustering_model_paths) | |
def initialize_lists(self, size=30): | |
initial_values = [0] * size | |
return initial_values.copy(), initial_values.copy(), initial_values.copy() | |
def load_models(self, prediction_model_path, clustering_model_paths): | |
self.model = load_model(prediction_model_path) | |
for path in clustering_model_paths: | |
self.kmeans_models.append(joblib.load(path)) | |
def predict(self, df_new): | |
return self.model.predict(df_new) | |
def calculate_residuals(self, df_trans, pred): | |
actual = df_trans[30, : self.num_outputs] | |
resid = actual - pred | |
return actual, resid | |
def resize_prediction(self, pred, df_trans): | |
pred = np.resize( | |
pred, (pred.shape[0], pred.shape[1] + len(df_trans[30, self.num_outputs :])) | |
) | |
pred[:, -len(df_trans[30, self.num_outputs :]) :] = df_trans[ | |
30, self.num_outputs : | |
] | |
return pred | |
def inverse_transform(self, scaler, pred, df_trans): | |
pred = scaler.inverse_transform(np.array(pred)) | |
actual = scaler.inverse_transform(np.array([df_trans[30, :]])) | |
return actual, pred | |
def update_lists(self, actual_list, pred_list, resid_list, actual, pred, resid): | |
actual_list.pop(0) | |
pred_list.pop(0) | |
resid_list.pop(0) | |
actual_list.append(actual[0, 1]) | |
pred_list.append(pred[0, 1]) | |
resid_list.append(resid[0, 1]) | |
return actual_list, pred_list, resid_list | |
def calculate_distances(self, resid): | |
dist = [] | |
for i, model in enumerate(self.kmeans_models): | |
dist.append( | |
np.linalg.norm( | |
resid[:, (i * 7) + 1 : (i * 7) + 8] - model.cluster_centers_[0], | |
ord=2, | |
axis=1, | |
) | |
) | |
return np.array(dist) | |
def pipeline(self, df_new, df_trans, scaler): | |
actual_list, pred_list, resid_list = self.initialize_lists() | |
pred = self.predict(df_new) | |
actual, resid = self.calculate_residuals(df_trans, pred) | |
pred = self.resize_prediction(pred, df_trans) | |
actual, pred = self.inverse_transform(scaler, pred, df_trans) | |
actual_list, pred_list, resid_list = self.update_lists( | |
actual_list, pred_list, resid_list, actual, pred, resid | |
) | |
dist = self.calculate_distances(resid) | |
return actual_list, pred_list, resid_list, dist | |