|
|
|
|
|
|
|
|
|
import pandas as pd |
|
import numpy as np |
|
import joblib |
|
import json |
|
import re |
|
from datetime import datetime |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import warnings |
|
from typing import Union, List, Dict, Any |
|
import time |
|
|
|
warnings.filterwarnings('ignore') |
|
|
|
class EnhancedAnomalyIntelligence: |
|
""" |
|
Enhanced Production-ready Anomaly Intelligence System v2.0 |
|
Features: Equipment Intelligence + Safety Override Rules + Conservative Prediction |
|
""" |
|
|
|
def __init__(self): |
|
self.models = {} |
|
self.model_metadata = None |
|
self.safety_rules = None |
|
self.embeddings = None |
|
self.embedding_metadata = None |
|
self.sentence_model = None |
|
self._models_loaded = False |
|
|
|
|
|
self.equipment_type_scores = {} |
|
self.section_risk_multipliers = {} |
|
|
|
def _load_models(self): |
|
"""Load all enhanced models and metadata (called once)""" |
|
if self._models_loaded: |
|
return |
|
|
|
print("Loading enhanced models and metadata...") |
|
|
|
try: |
|
|
|
self.model_metadata = joblib.load('enhanced_model_metadata_v2.joblib') |
|
target_columns = self.model_metadata['target_columns'] |
|
|
|
|
|
for target in target_columns: |
|
model_filename = f"enhanced_model_{target.replace(' ', '_').replace('é', 'e')}_v2.joblib" |
|
self.models[target] = joblib.load(model_filename) |
|
print(f"✓ Loaded {target} model") |
|
|
|
|
|
try: |
|
with open('safety_override_rules_v2.json', 'r') as f: |
|
self.safety_rules = json.load(f) |
|
print("✓ Loaded safety override rules") |
|
except FileNotFoundError: |
|
print("⚠️ Warning: safety_override_rules_v2.json not found - safety rules disabled") |
|
self.safety_rules = {} |
|
|
|
|
|
try: |
|
self.embeddings = np.load('anomaly_embeddings.npy') |
|
self.embedding_metadata = joblib.load('embedding_metadata.joblib') |
|
print("✓ Loaded similarity search embeddings") |
|
except FileNotFoundError: |
|
print("⚠️ Warning: Embedding files not found - similarity search disabled") |
|
self.embeddings = None |
|
self.embedding_metadata = None |
|
|
|
|
|
try: |
|
from sentence_transformers import SentenceTransformer |
|
try: |
|
self.sentence_model = SentenceTransformer('dangvantuan/sentence-camembert-large') |
|
print("✓ Loaded French CamemBERT model") |
|
except: |
|
try: |
|
self.sentence_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') |
|
print("✓ Loaded multilingual model") |
|
except: |
|
self.sentence_model = SentenceTransformer('distiluse-base-multilingual-cased') |
|
print("✓ Loaded basic multilingual model") |
|
except Exception as e: |
|
print(f"⚠️ Warning: Could not load sentence transformer: {e}") |
|
self.sentence_model = None |
|
|
|
|
|
if 'training_config' in self.model_metadata: |
|
training_config = self.model_metadata['training_config'] |
|
print("✓ Loaded training configuration") |
|
|
|
self._models_loaded = True |
|
print("✓ All enhanced models loaded successfully") |
|
|
|
except Exception as e: |
|
raise Exception(f"Failed to load enhanced models: {str(e)}") |
|
|
|
def predict_single(self, anomaly_data: Dict, |
|
confidence_threshold: float = 0.7, |
|
include_similar: bool = True, |
|
format_type: str = 'rich', |
|
apply_safety_rules: bool = True) -> Dict: |
|
""" |
|
Enhanced single anomaly prediction with equipment intelligence and safety rules |
|
|
|
Args: |
|
anomaly_data: Dictionary with anomaly information |
|
confidence_threshold: Threshold for flagging manual review |
|
include_similar: Whether to include similar anomalies |
|
format_type: 'rich' for UI, 'simple' for database |
|
apply_safety_rules: Whether to apply safety override rules |
|
""" |
|
self._load_models() |
|
|
|
try: |
|
|
|
enhanced_features = self._extract_enhanced_features_single(anomaly_data) |
|
|
|
|
|
predictions, confidences, probabilities = self._predict_criticality(enhanced_features) |
|
|
|
|
|
if apply_safety_rules and self.safety_rules: |
|
predictions = self._apply_safety_override_rules(enhanced_features, predictions) |
|
|
|
|
|
total_criticality = sum(predictions.values()) |
|
overall_confidence = np.mean(list(confidences.values())) |
|
|
|
|
|
needs_review = self._determine_manual_review_need( |
|
enhanced_features, predictions, overall_confidence, confidence_threshold |
|
) |
|
|
|
|
|
equipment_risk_assessment = self._assess_equipment_risk(enhanced_features, predictions) |
|
|
|
|
|
similar_anomalies = [] |
|
if include_similar and self.sentence_model is not None: |
|
similar_anomalies = self._find_similar_anomalies( |
|
anomaly_data.get('Description', ''), top_k=3 |
|
) |
|
|
|
|
|
if format_type == 'simple': |
|
return self._format_simple_response( |
|
anomaly_data, predictions, total_criticality, |
|
overall_confidence, needs_review, equipment_risk_assessment |
|
) |
|
else: |
|
return self._format_rich_response( |
|
anomaly_data, predictions, confidences, |
|
total_criticality, overall_confidence, |
|
similar_anomalies, needs_review, confidence_threshold, |
|
equipment_risk_assessment, enhanced_features |
|
) |
|
|
|
except Exception as e: |
|
return { |
|
'error': f'Enhanced prediction failed: {str(e)}', |
|
'timestamp': datetime.now().isoformat(), |
|
'input_description': anomaly_data.get('Description', 'N/A') |
|
} |
|
|
|
def predict_batch(self, anomaly_list: List[Dict], |
|
confidence_threshold: float = 0.7, |
|
include_similar: bool = False, |
|
format_type: str = 'simple', |
|
apply_safety_rules: bool = True) -> List[Dict]: |
|
""" |
|
Enhanced batch prediction with equipment intelligence |
|
|
|
Args: |
|
anomaly_list: List of anomaly dictionaries |
|
confidence_threshold: Threshold for flagging manual review |
|
include_similar: Whether to include similar anomalies (slower for batch) |
|
format_type: 'rich' for UI, 'simple' for database |
|
apply_safety_rules: Whether to apply safety override rules |
|
""" |
|
self._load_models() |
|
|
|
print(f"Processing enhanced batch of {len(anomaly_list)} anomalies...") |
|
start_time = time.time() |
|
|
|
results = [] |
|
|
|
try: |
|
|
|
all_features = [] |
|
for anomaly_data in anomaly_list: |
|
enhanced_features = self._extract_enhanced_features_single(anomaly_data) |
|
all_features.append(enhanced_features) |
|
|
|
|
|
batch_df = pd.DataFrame(all_features) |
|
|
|
|
|
batch_predictions = {} |
|
batch_confidences = {} |
|
|
|
target_columns = self.model_metadata['target_columns'] |
|
for target in target_columns: |
|
model = self.models[target] |
|
preds = model.predict(batch_df) |
|
probas = model.predict_proba(batch_df) |
|
confs = np.max(probas, axis=1) |
|
|
|
batch_predictions[target] = preds |
|
batch_confidences[target] = confs |
|
|
|
|
|
for i, anomaly_data in enumerate(anomaly_list): |
|
|
|
predictions = {target: int(batch_predictions[target][i]) |
|
for target in target_columns} |
|
confidences = {target: float(batch_confidences[target][i]) |
|
for target in target_columns} |
|
|
|
enhanced_features = all_features[i] |
|
|
|
|
|
if apply_safety_rules and self.safety_rules: |
|
predictions = self._apply_safety_override_rules(enhanced_features, predictions) |
|
|
|
total_criticality = sum(predictions.values()) |
|
overall_confidence = np.mean(list(confidences.values())) |
|
|
|
|
|
needs_review = self._determine_manual_review_need( |
|
enhanced_features, predictions, overall_confidence, confidence_threshold |
|
) |
|
|
|
equipment_risk_assessment = self._assess_equipment_risk(enhanced_features, predictions) |
|
|
|
|
|
similar_anomalies = [] |
|
if include_similar and self.sentence_model is not None: |
|
similar_anomalies = self._find_similar_anomalies( |
|
anomaly_data.get('Description', ''), top_k=2 |
|
) |
|
|
|
|
|
if format_type == 'simple': |
|
result = self._format_simple_response( |
|
anomaly_data, predictions, total_criticality, |
|
overall_confidence, needs_review, equipment_risk_assessment |
|
) |
|
else: |
|
result = self._format_rich_response( |
|
anomaly_data, predictions, confidences, |
|
total_criticality, overall_confidence, |
|
similar_anomalies, needs_review, confidence_threshold, |
|
equipment_risk_assessment, enhanced_features |
|
) |
|
|
|
results.append(result) |
|
|
|
processing_time = time.time() - start_time |
|
print(f"✓ Enhanced batch processing completed in {processing_time:.2f}s") |
|
print(f" Average time per anomaly: {processing_time/len(anomaly_list):.3f}s") |
|
|
|
flagged_count = sum(1 for r in results if r.get('needs_manual_review', False)) |
|
safety_overrides = sum(1 for r in results if r.get('safety_override_applied', False)) |
|
|
|
print(f" Flagged for manual review: {flagged_count}/{len(anomaly_list)} ({flagged_count/len(anomaly_list)*100:.1f}%)") |
|
print(f" Safety overrides applied: {safety_overrides}/{len(anomaly_list)} ({safety_overrides/len(anomaly_list)*100:.1f}%)") |
|
|
|
return results |
|
|
|
except Exception as e: |
|
|
|
error_result = { |
|
'error': f'Enhanced batch prediction failed: {str(e)}', |
|
'timestamp': datetime.now().isoformat() |
|
} |
|
return [error_result] * len(anomaly_list) |
|
|
|
def _extract_enhanced_features_single(self, anomaly_data: Dict) -> Dict: |
|
"""Extract enhanced features including equipment intelligence""" |
|
|
|
|
|
temp_df = pd.DataFrame([anomaly_data]) |
|
|
|
|
|
enhanced_features = self._extract_enhanced_features(temp_df) |
|
|
|
|
|
feature_columns = self.model_metadata.get('all_feature_columns', []) |
|
|
|
input_data = {} |
|
|
|
|
|
input_data['Description'] = anomaly_data.get('Description', '') |
|
|
|
|
|
numerical_features = self.model_metadata.get('numerical_features', []) |
|
for feat in numerical_features: |
|
if feat in enhanced_features.columns: |
|
value = enhanced_features[feat].iloc[0] |
|
|
|
if pd.isna(value): |
|
input_data[feat] = 0.0 |
|
elif isinstance(value, (bool, np.bool_)): |
|
input_data[feat] = float(value) |
|
else: |
|
input_data[feat] = float(value) |
|
else: |
|
input_data[feat] = 0.0 |
|
|
|
|
|
categorical_features = self.model_metadata.get('categorical_features', []) |
|
for feat in categorical_features: |
|
input_data[feat] = anomaly_data.get(feat, 'Unknown') |
|
|
|
return input_data |
|
|
|
def _extract_enhanced_features(self, df): |
|
"""Extract enhanced features (matching training pipeline logic)""" |
|
import re |
|
|
|
features_df = df.copy() |
|
|
|
|
|
features_df['combined_text'] = features_df['Description'].fillna('') + ' ' + features_df.get('Description de l\'équipement', '').fillna('') |
|
features_df['combined_text_lower'] = features_df['combined_text'].str.lower() |
|
|
|
|
|
features_df['description_length'] = features_df['Description'].str.len() |
|
features_df['description_word_count'] = features_df['Description'].str.split().str.len() |
|
features_df['equipment_desc_length'] = features_df.get('Description de l\'équipement', '').str.len() |
|
features_df['equipment_desc_word_count'] = features_df.get('Description de l\'équipement', '').str.split().str.len() |
|
features_df['combined_length'] = features_df['combined_text'].str.len() |
|
features_df['combined_word_count'] = features_df['combined_text'].str.split().str.len() |
|
|
|
|
|
def classify_equipment_type(equipment_desc): |
|
"""Classify equipment based on training analysis""" |
|
equipment_upper = str(equipment_desc).upper() |
|
|
|
|
|
if any(keyword in equipment_upper for keyword in ['ALTERNATEUR', 'TRANSFO PRINCIPAL', 'PROTECTION ALTERNATEUR']): |
|
return 'ELECTRICAL_CRITICAL', 8.0 |
|
elif any(keyword in equipment_upper for keyword in ['VENTILATEUR DE REFROIDISSEMENT', 'REFROIDISSEMENT TP', 'MOTEUR VENTILATEUR DE REFROIDISSEMENT']): |
|
return 'COOLING_CRITICAL', 7.5 |
|
elif any(keyword in equipment_upper for keyword in ['TURBINE', 'SOUPAPE REGULATRICE', 'REFRIGERANT HUILE', 'POMPE DE SOULÈVEMENT']): |
|
return 'TURBINE_SYSTEMS', 7.5 |
|
elif any(keyword in equipment_upper for keyword in ['DISJONCTEUR', 'TRANSFORMATEUR', 'MOTEUR', 'ARMOIRE', 'GROUPE']): |
|
return 'ELECTRICAL_STANDARD', 6.5 |
|
elif any(keyword in equipment_upper for keyword in ['RECHAUFFEUR', 'RÉCHAUFFEUR', 'CHAUDIERE', 'CHAUDIÈRE']): |
|
return 'HEATING_SYSTEMS', 6.5 |
|
elif any(keyword in equipment_upper for keyword in ['VENTILATEUR', 'TIRAGE', 'SOUFFLAGE', 'AIR PRIMAIRE', 'AIR SECONDAIRE']): |
|
return 'VENTILATION_SYSTEMS', 6.0 |
|
elif any(keyword in equipment_upper for keyword in ['POMPE', 'SOUPAPE', 'VANNE', 'CONVOYEUR', 'BROYEUR', 'COAL FEEDER']): |
|
return 'PROCESS_SYSTEMS', 5.5 |
|
elif any(keyword in equipment_upper for keyword in ['DECRASSEUR', 'DÉGRILLEUR', 'FILTRE', 'CAPTEUR', 'TRANSMETTEUR']): |
|
return 'AUXILIARY_SYSTEMS', 5.0 |
|
else: |
|
return 'UNKNOWN', 4.5 |
|
|
|
def detect_equipment_redundancy(equipment_desc): |
|
"""Detect equipment redundancy based on naming patterns""" |
|
equipment_upper = str(equipment_desc).upper() |
|
|
|
if any(pattern in equipment_upper for pattern in ['PRINCIPAL', 'UNIQUE']): |
|
return 'SINGLE_CRITICAL', 1.3 |
|
elif any(re.search(pattern, equipment_upper) for pattern in [r'\b[AB]$', r'N°[12]$', r'PRIMAIRE$', r'SECONDAIRE$']): |
|
return 'DUAL_SYSTEM', 1.0 |
|
elif any(re.search(pattern, equipment_upper) for pattern in [r'N°[3-9]$', r'N°[0-9][0-9]$']): |
|
return 'MULTIPLE_SYSTEM', 0.8 |
|
else: |
|
return 'UNKNOWN_REDUNDANCY', 1.0 |
|
|
|
|
|
if 'Description de l\'équipement' in features_df.columns: |
|
equipment_classifications = features_df['Description de l\'équipement'].apply(classify_equipment_type) |
|
features_df['equipment_type_class'] = [x[0] for x in equipment_classifications] |
|
features_df['equipment_base_criticality'] = [x[1] for x in equipment_classifications] |
|
|
|
redundancy_classifications = features_df['Description de l\'équipement'].apply(detect_equipment_redundancy) |
|
features_df['equipment_redundancy_class'] = [x[0] for x in redundancy_classifications] |
|
features_df['equipment_redundancy_multiplier'] = [x[1] for x in redundancy_classifications] |
|
else: |
|
features_df['equipment_type_class'] = 'UNKNOWN' |
|
features_df['equipment_base_criticality'] = 4.5 |
|
features_df['equipment_redundancy_class'] = 'UNKNOWN_REDUNDANCY' |
|
features_df['equipment_redundancy_multiplier'] = 1.0 |
|
|
|
|
|
section_risk_multipliers = {'34EL': 1.2, '34MM': 1.1, '34MD': 1.1, '34MC': 1.0, '34CT': 1.0} |
|
features_df['section_risk_multiplier'] = features_df.get('Section propriétaire', '').map(section_risk_multipliers).fillna(1.0) |
|
|
|
|
|
features_df['equipment_risk_score'] = (features_df['equipment_base_criticality'] * |
|
features_df['equipment_redundancy_multiplier'] * |
|
features_df['section_risk_multiplier']) |
|
|
|
|
|
def extract_keywords_dual_field(description, equipment_desc, keyword_dict): |
|
"""Extract keywords from both description and equipment description""" |
|
combined_text = (str(description) + ' ' + str(equipment_desc)).lower() |
|
found_keywords = [] |
|
|
|
for category, keywords in keyword_dict.items(): |
|
for keyword in keywords: |
|
if keyword in combined_text: |
|
found_keywords.append(category) |
|
break |
|
|
|
return found_keywords |
|
|
|
|
|
equipment_keywords = { |
|
'pompe': ['pompe', 'pompes'], |
|
'vanne': ['vanne', 'vannes'], |
|
'ventilateur': ['ventilateur', 'ventilateurs', 'ventilo'], |
|
'moteur': ['moteur', 'moteurs', 'moto'], |
|
'alternateur': ['alternateur', 'alternateurs'], |
|
'transformateur': ['transformateur', 'transformateurs', 'transfo'], |
|
'turbine': ['turbine', 'turbines'], |
|
'principal': ['principal', 'principale'], |
|
'groupe': ['groupe', 'groupes'] |
|
} |
|
|
|
problem_keywords = { |
|
'fuite': ['fuite', 'fuites', 'fuit', 'fuyant'], |
|
'vibration': ['vibration', 'vibrations', 'vibre'], |
|
'bruit_anormal': ['bruit anormal', 'bruit anormale'], |
|
'percement': ['percement', 'percé', 'percée'], |
|
'éclatement': ['éclatement', 'eclatement'], |
|
'fissure': ['fissure', 'fissuré', 'fissures'], |
|
'aggravation': ['aggravation'], |
|
'sifflement': ['sifflement', 'siffler'], |
|
'défaillance': ['défaillance', 'défaillant'], |
|
'dysfonctionnement': ['dysfonctionnement', 'dysfonctionnel'], |
|
'sens_inverse': ['sens inverse', 'sens contraire'], |
|
'surchauffe': ['surchauffe', 'surchauffé', 'température élevée', 'temp elevee'] |
|
} |
|
|
|
action_keywords = { |
|
'maintenance': ['maintenance', 'entretien'], |
|
'prévision': ['prévoir', 'prévoire', 'prevoir'], |
|
'remplacement': ['remplacement', 'remplacer', 'remplacé'] |
|
} |
|
|
|
urgency_keywords = { |
|
'safety': ['safety', 'sécurité'], |
|
'urgent': ['urgent', 'urgence'], |
|
'critique': ['critique', 'critiques'], |
|
'important': ['important', 'importante'] |
|
} |
|
|
|
|
|
description_col = features_df['Description'] |
|
equipment_col = features_df.get('Description de l\'équipement', '') |
|
|
|
features_df['equipment_mentioned'] = features_df.apply( |
|
lambda row: extract_keywords_dual_field(row['Description'], row.get('Description de l\'équipement', ''), equipment_keywords), |
|
axis=1 |
|
) |
|
features_df['equipment_count'] = features_df['equipment_mentioned'].str.len() |
|
|
|
features_df['problem_types'] = features_df.apply( |
|
lambda row: extract_keywords_dual_field(row['Description'], row.get('Description de l\'équipement', ''), problem_keywords), |
|
axis=1 |
|
) |
|
features_df['problem_count'] = features_df['problem_types'].str.len() |
|
|
|
features_df['actions_mentioned'] = features_df.apply( |
|
lambda row: extract_keywords_dual_field(row['Description'], row.get('Description de l\'équipement', ''), action_keywords), |
|
axis=1 |
|
) |
|
features_df['action_count'] = features_df['actions_mentioned'].str.len() |
|
|
|
features_df['urgency_indicators'] = features_df.apply( |
|
lambda row: extract_keywords_dual_field(row['Description'], row.get('Description de l\'équipement', ''), urgency_keywords), |
|
axis=1 |
|
) |
|
features_df['has_urgency'] = (features_df['urgency_indicators'].str.len() > 0).astype(int) |
|
|
|
|
|
features_df['has_structural_failure'] = features_df['combined_text_lower'].str.contains( |
|
'percement|éclatement|eclatement|fissure|rupture', regex=True, na=False |
|
).astype(int) |
|
|
|
features_df['has_equipment_malfunction'] = features_df['combined_text_lower'].str.contains( |
|
'sens inverse|dysfonctionnement|défaillance|défaut|panne', regex=True, na=False |
|
).astype(int) |
|
|
|
features_df['has_escalation'] = features_df['combined_text_lower'].str.contains( |
|
'aggravation|empiré|empire', regex=True, na=False |
|
).astype(int) |
|
|
|
features_df['has_safety_mention'] = features_df['Description'].str.contains('SAFETY', case=False, na=False).astype(int) |
|
|
|
|
|
features_df['electrical_cooling_issue'] = ( |
|
(features_df['equipment_type_class'].isin(['ELECTRICAL_CRITICAL', 'ELECTRICAL_STANDARD'])) & |
|
(features_df['combined_text_lower'].str.contains('refroidissement|ventilateur|température', regex=True, na=False)) |
|
).astype(int) |
|
|
|
features_df['turbine_oil_issue'] = ( |
|
(features_df['equipment_type_class'] == 'TURBINE_SYSTEMS') & |
|
(features_df['combined_text_lower'].str.contains('huile|fuite|graissage', regex=True, na=False)) |
|
).astype(int) |
|
|
|
features_df['main_equipment_failure'] = ( |
|
(features_df['equipment_redundancy_class'] == 'SINGLE_CRITICAL') & |
|
(features_df['has_structural_failure'] == 1) |
|
).astype(int) |
|
|
|
|
|
features_df['fuite_vapeur'] = features_df['combined_text_lower'].str.contains('fuite.*vapeur|vapeur.*fuite', regex=True, na=False).astype(int) |
|
features_df['fuite_huile'] = features_df['combined_text_lower'].str.contains('fuite.*huile|huile.*fuite', regex=True, na=False).astype(int) |
|
features_df['fuite_eau'] = features_df['combined_text_lower'].str.contains('fuite.*eau|eau.*fuite', regex=True, na=False).astype(int) |
|
features_df['bruit_anormal'] = features_df['combined_text_lower'].str.contains('bruit anormal', regex=True, na=False).astype(int) |
|
features_df['vibration_excessive'] = features_df['combined_text_lower'].str.contains('vibration.*excessive|vibration.*élevée', regex=True, na=False).astype(int) |
|
features_df['temperature_elevee'] = features_df['combined_text_lower'].str.contains('température élevée|temp élevée|temp elevee', regex=True, na=False).astype(int) |
|
features_df['maintenance_planning'] = features_df['combined_text_lower'].str.contains('prévoir|prévoire|planifier', regex=True, na=False).astype(int) |
|
features_df['is_recurring'] = features_df['combined_text_lower'].str.contains('fréquent|répétitif|souvent', regex=True, na=False).astype(int) |
|
|
|
|
|
features_df['has_measurements'] = features_df['combined_text_lower'].str.contains(r'\d+\s*°c|\d+\s*bar|\d+\s*%', regex=True, na=False).astype(int) |
|
features_df['has_equipment_codes'] = features_df['combined_text_lower'].str.contains(r'[A-Z0-9]{5,}', regex=True, na=False).astype(int) |
|
features_df['has_location_details'] = features_df['combined_text_lower'].str.contains('niveau|angle|côté|palier', regex=True, na=False).astype(int) |
|
|
|
|
|
severity_words = { |
|
'critique': 4, 'grave': 4, 'majeur': 4, 'important': 3, |
|
'total': 5, 'complet': 5, 'rupture': 5, 'éclatement': 5, |
|
'percement': 5, 'fissure': 4, 'aggravation': 4, 'urgent': 3 |
|
} |
|
|
|
def calculate_enhanced_severity_score(text): |
|
text = str(text).lower() |
|
max_score = 0 |
|
for word, weight in severity_words.items(): |
|
if word in text: |
|
max_score = max(max_score, weight) |
|
return max_score |
|
|
|
features_df['enhanced_severity_score'] = features_df['combined_text_lower'].apply(calculate_enhanced_severity_score) |
|
|
|
|
|
def calculate_equipment_problem_risk(equipment_type, problem_types, has_structural): |
|
base_risk = 1.0 |
|
|
|
if equipment_type in ['ELECTRICAL_CRITICAL', 'TURBINE_SYSTEMS', 'COOLING_CRITICAL']: |
|
base_risk = 1.5 |
|
elif equipment_type in ['ELECTRICAL_STANDARD', 'HEATING_SYSTEMS']: |
|
base_risk = 1.2 |
|
|
|
if has_structural: |
|
base_risk *= 2.0 |
|
|
|
if 'vibration' in problem_types: |
|
base_risk *= 1.3 |
|
if 'fuite' in problem_types: |
|
base_risk *= 1.2 |
|
|
|
return min(base_risk, 3.0) |
|
|
|
features_df['equipment_problem_risk'] = features_df.apply( |
|
lambda row: calculate_equipment_problem_risk( |
|
row['equipment_type_class'], |
|
row['problem_types'], |
|
row['has_structural_failure'] |
|
), axis=1 |
|
) |
|
|
|
|
|
features_df['technical_complexity'] = ( |
|
features_df['combined_word_count'] / 15 + |
|
features_df['equipment_count'] + |
|
features_df['problem_count'] + |
|
features_df['has_measurements'] + |
|
features_df['has_equipment_codes'] + |
|
features_df['has_location_details'] |
|
) |
|
|
|
|
|
numeric_columns = features_df.select_dtypes(include=[np.number]).columns |
|
features_df[numeric_columns] = features_df[numeric_columns].fillna(0) |
|
|
|
for col in features_df.select_dtypes(include=[np.integer, np.floating, bool]).columns: |
|
features_df[col] = pd.to_numeric(features_df[col], errors='coerce').fillna(0) |
|
|
|
return features_df |
|
|
|
def _predict_criticality(self, input_data: Dict) -> tuple: |
|
"""Make criticality predictions using enhanced models""" |
|
|
|
|
|
input_df = pd.DataFrame([input_data]) |
|
|
|
target_columns = self.model_metadata['target_columns'] |
|
predictions = {} |
|
confidences = {} |
|
probabilities = {} |
|
|
|
for target in target_columns: |
|
model = self.models[target] |
|
pred = model.predict(input_df)[0] |
|
pred_proba = model.predict_proba(input_df)[0] |
|
confidence = np.max(pred_proba) |
|
|
|
predictions[target] = int(pred) |
|
confidences[target] = float(confidence) |
|
probabilities[target] = [float(x) for x in pred_proba] |
|
|
|
return predictions, confidences, probabilities |
|
|
|
def _apply_safety_override_rules(self, enhanced_features: Dict, predictions: Dict) -> Dict: |
|
"""Apply safety override rules to predictions""" |
|
|
|
def _apply_safety_override_rules(self, enhanced_features: Dict, predictions: Dict) -> Dict: |
|
"""Apply safety override rules to predictions""" |
|
|
|
if not self.safety_rules: |
|
return predictions |
|
|
|
modified_predictions = predictions.copy() |
|
safety_override_applied = False |
|
|
|
|
|
if enhanced_features.get('has_structural_failure', 0) == 1: |
|
|
|
total_current = sum(modified_predictions.values()) |
|
if total_current < 9: |
|
|
|
if modified_predictions['Process Safety'] < 5: |
|
modified_predictions['Process Safety'] = 5 |
|
safety_override_applied = True |
|
|
|
|
|
total_after_safety = sum(modified_predictions.values()) |
|
if total_after_safety < 9: |
|
needed_boost = 9 - total_after_safety |
|
new_fiabilite = min(5, modified_predictions['Fiabilité Intégrité'] + needed_boost) |
|
modified_predictions['Fiabilité Intégrité'] = new_fiabilite |
|
safety_override_applied = True |
|
|
|
|
|
if enhanced_features.get('equipment_type_class', '') == 'COOLING_CRITICAL': |
|
|
|
total_current = sum(modified_predictions.values()) |
|
if total_current < 10: |
|
|
|
needed_boost = 10 - total_current |
|
for component in modified_predictions: |
|
if modified_predictions[component] < 5: |
|
boost = min(2, needed_boost // 3 + 1) |
|
modified_predictions[component] = min(5, modified_predictions[component] + boost) |
|
needed_boost -= boost |
|
safety_override_applied = True |
|
if needed_boost <= 0: |
|
break |
|
|
|
|
|
if enhanced_features.get('has_safety_mention', 0) == 1: |
|
|
|
if modified_predictions['Process Safety'] < 5: |
|
boost = min(2, 5 - modified_predictions['Process Safety']) |
|
modified_predictions['Process Safety'] += boost |
|
safety_override_applied = True |
|
|
|
|
|
if enhanced_features.get('turbine_oil_issue', 0) == 1: |
|
|
|
total_current = sum(modified_predictions.values()) |
|
if total_current < 8: |
|
|
|
needed_boost = 8 - total_current |
|
for component in ['Fiabilité Intégrité', 'Disponibilté']: |
|
if needed_boost > 0 and modified_predictions[component] < 4: |
|
boost = min(2, needed_boost) |
|
modified_predictions[component] = min(5, modified_predictions[component] + boost) |
|
needed_boost -= boost |
|
safety_override_applied = True |
|
|
|
|
|
if enhanced_features.get('equipment_type_class', '') == 'ELECTRICAL_CRITICAL': |
|
|
|
for component in modified_predictions: |
|
if modified_predictions[component] >= 3: |
|
boost = min(1, 5 - modified_predictions[component]) |
|
if boost > 0: |
|
modified_predictions[component] += boost |
|
safety_override_applied = True |
|
|
|
return modified_predictions |
|
|
|
def _determine_manual_review_need(self, enhanced_features: Dict, predictions: Dict, |
|
overall_confidence: float, confidence_threshold: float) -> bool: |
|
"""Enhanced logic to determine if manual review is needed""" |
|
|
|
|
|
if overall_confidence < confidence_threshold: |
|
return True |
|
|
|
|
|
if enhanced_features.get('equipment_type_class', '') in ['ELECTRICAL_CRITICAL', 'COOLING_CRITICAL', 'TURBINE_SYSTEMS']: |
|
if sum(predictions.values()) >= 8: |
|
return True |
|
|
|
|
|
if enhanced_features.get('has_structural_failure', 0) == 1: |
|
return True |
|
|
|
|
|
if enhanced_features.get('has_safety_mention', 0) == 1: |
|
return True |
|
|
|
|
|
if sum(predictions.values()) >= 10: |
|
return True |
|
|
|
|
|
if (enhanced_features.get('has_equipment_malfunction', 0) == 1 and |
|
enhanced_features.get('equipment_type_class', '') in ['ELECTRICAL_CRITICAL', 'TURBINE_SYSTEMS']): |
|
return True |
|
|
|
return False |
|
|
|
def _assess_equipment_risk(self, enhanced_features: Dict, predictions: Dict) -> Dict: |
|
"""Assess equipment-specific risk factors""" |
|
|
|
equipment_type = enhanced_features.get('equipment_type_class', 'UNKNOWN') |
|
total_criticality = sum(predictions.values()) |
|
|
|
risk_assessment = { |
|
'equipment_type': equipment_type, |
|
'redundancy_class': enhanced_features.get('equipment_redundancy_class', 'UNKNOWN'), |
|
'base_risk_score': enhanced_features.get('equipment_risk_score', 4.5), |
|
'risk_level': 'LOW', |
|
'risk_factors': [], |
|
'business_impact': 'MINOR' |
|
} |
|
|
|
|
|
if equipment_type == 'COOLING_CRITICAL': |
|
risk_assessment['risk_level'] = 'CRITICAL' |
|
risk_assessment['business_impact'] = 'SEVERE' |
|
risk_assessment['risk_factors'].append('Critical cooling system failure') |
|
elif equipment_type == 'ELECTRICAL_CRITICAL': |
|
if total_criticality >= 8: |
|
risk_assessment['risk_level'] = 'HIGH' |
|
risk_assessment['business_impact'] = 'MAJOR' |
|
else: |
|
risk_assessment['risk_level'] = 'MEDIUM' |
|
risk_assessment['business_impact'] = 'MODERATE' |
|
risk_assessment['risk_factors'].append('Electrical critical infrastructure') |
|
elif equipment_type == 'TURBINE_SYSTEMS': |
|
if total_criticality >= 8: |
|
risk_assessment['risk_level'] = 'HIGH' |
|
risk_assessment['business_impact'] = 'MAJOR' |
|
else: |
|
risk_assessment['risk_level'] = 'MEDIUM' |
|
risk_assessment['business_impact'] = 'MODERATE' |
|
risk_assessment['risk_factors'].append('Turbine system component') |
|
|
|
|
|
if enhanced_features.get('has_structural_failure', 0) == 1: |
|
risk_assessment['risk_factors'].append('Structural integrity compromise') |
|
risk_assessment['risk_level'] = 'HIGH' |
|
|
|
if enhanced_features.get('has_safety_mention', 0) == 1: |
|
risk_assessment['risk_factors'].append('Safety concern flagged') |
|
|
|
if enhanced_features.get('equipment_redundancy_class', '') == 'SINGLE_CRITICAL': |
|
risk_assessment['risk_factors'].append('Single point of failure') |
|
|
|
if enhanced_features.get('turbine_oil_issue', 0) == 1: |
|
risk_assessment['risk_factors'].append('Turbine lubrication system issue') |
|
|
|
if enhanced_features.get('electrical_cooling_issue', 0) == 1: |
|
risk_assessment['risk_factors'].append('Electrical equipment cooling problem') |
|
|
|
|
|
if total_criticality >= 12: |
|
risk_assessment['business_impact'] = 'SEVERE' |
|
elif total_criticality >= 10: |
|
risk_assessment['business_impact'] = 'MAJOR' |
|
elif total_criticality >= 8: |
|
risk_assessment['business_impact'] = 'MODERATE' |
|
|
|
return risk_assessment |
|
|
|
def _find_similar_anomalies(self, description: str, top_k: int = 3) -> List[Dict]: |
|
"""Find similar historical anomalies""" |
|
|
|
if not description or self.sentence_model is None or self.embeddings is None: |
|
return [] |
|
|
|
try: |
|
|
|
new_embedding = self.sentence_model.encode([description]) |
|
|
|
|
|
similarities = cosine_similarity(new_embedding, self.embeddings)[0] |
|
|
|
|
|
top_indices = np.argsort(similarities)[::-1] |
|
|
|
similar_anomalies = [] |
|
for idx in top_indices[:top_k*2]: |
|
similarity_score = float(similarities[idx]) |
|
|
|
|
|
if similarity_score > 0.99 or similarity_score < 0.15: |
|
continue |
|
|
|
if len(similar_anomalies) >= top_k: |
|
break |
|
|
|
similar_anomalies.append({ |
|
'description': self.embedding_metadata['descriptions'][idx], |
|
'criticality': int(self.embedding_metadata['criticality_scores'][idx]), |
|
'similarity_score': round(similarity_score, 3), |
|
'section': self.embedding_metadata.get('sections', ['Unknown'])[idx], |
|
'equipment_mentioned': self.embedding_metadata.get('equipment_mentioned', [[]])[idx] |
|
}) |
|
|
|
return similar_anomalies |
|
|
|
except Exception as e: |
|
print(f"Warning: Similarity search failed: {e}") |
|
return [] |
|
|
|
def _format_simple_response(self, anomaly_data: Dict, predictions: Dict, |
|
total_criticality: int, overall_confidence: float, |
|
needs_review: bool, equipment_risk_assessment: Dict) -> Dict: |
|
"""Format simple response for database insertion""" |
|
|
|
return { |
|
'timestamp': datetime.now().isoformat(), |
|
'input_description': anomaly_data.get('Description', ''), |
|
'input_section': anomaly_data.get('Section propriétaire', ''), |
|
'input_equipment': anomaly_data.get('Description de l\'équipement', ''), |
|
|
|
|
|
'predicted_criticite': total_criticality, |
|
'predicted_fiabilite': predictions['Fiabilité Intégrité'], |
|
'predicted_disponibilite': predictions['Disponibilté'], |
|
'predicted_safety': predictions['Process Safety'], |
|
|
|
|
|
'ai_confidence': round(overall_confidence, 3), |
|
'needs_manual_review': bool(needs_review), |
|
|
|
|
|
'equipment_type': equipment_risk_assessment['equipment_type'], |
|
'equipment_risk_level': equipment_risk_assessment['risk_level'], |
|
'business_impact': equipment_risk_assessment['business_impact'], |
|
'safety_override_applied': any(pred > 3 for pred in predictions.values()), |
|
|
|
|
|
'model_version': '2.0_enhanced', |
|
'processing_timestamp': datetime.now().isoformat() |
|
} |
|
|
|
def _format_rich_response(self, anomaly_data: Dict, predictions: Dict, |
|
confidences: Dict, total_criticality: int, |
|
overall_confidence: float, similar_anomalies: List, |
|
needs_review: bool, confidence_threshold: float, |
|
equipment_risk_assessment: Dict, enhanced_features: Dict) -> Dict: |
|
"""Format rich response for UI display""" |
|
|
|
|
|
reliability_score = self._calculate_reliability_score( |
|
confidences, enhanced_features, equipment_risk_assessment |
|
) |
|
|
|
return { |
|
'timestamp': datetime.now().isoformat(), |
|
'input_description': anomaly_data.get('Description', ''), |
|
'input_section': anomaly_data.get('Section propriétaire', ''), |
|
'input_equipment': anomaly_data.get('Description de l\'équipement', ''), |
|
|
|
'predictions': { |
|
'criticite_totale': total_criticality, |
|
'components': { |
|
'fiabilite_integrite': predictions['Fiabilité Intégrité'], |
|
'disponibilite': predictions['Disponibilté'], |
|
'process_safety': predictions['Process Safety'] |
|
} |
|
}, |
|
|
|
'confidence': { |
|
'overall_confidence': round(overall_confidence, 3), |
|
'reliability_score': round(reliability_score, 3), |
|
'component_confidence': { |
|
'fiabilite_integrite': round(confidences['Fiabilité Intégrité'], 3), |
|
'disponibilite': round(confidences['Disponibilté'], 3), |
|
'process_safety': round(confidences['Process Safety'], 3) |
|
}, |
|
'needs_manual_review': bool(needs_review), |
|
'confidence_threshold': confidence_threshold, |
|
'recommendation': self._get_confidence_recommendation(reliability_score) |
|
}, |
|
|
|
'equipment_intelligence': { |
|
'equipment_type': equipment_risk_assessment['equipment_type'], |
|
'redundancy_class': equipment_risk_assessment['redundancy_class'], |
|
'risk_level': equipment_risk_assessment['risk_level'], |
|
'business_impact': equipment_risk_assessment['business_impact'], |
|
'risk_factors': equipment_risk_assessment['risk_factors'], |
|
'base_risk_score': round(equipment_risk_assessment['base_risk_score'], 2) |
|
}, |
|
|
|
'safety_analysis': { |
|
'structural_failure_detected': bool(enhanced_features.get('has_structural_failure', 0)), |
|
'safety_mention_present': bool(enhanced_features.get('has_safety_mention', 0)), |
|
'equipment_malfunction_detected': bool(enhanced_features.get('has_equipment_malfunction', 0)), |
|
'escalation_detected': bool(enhanced_features.get('has_escalation', 0)), |
|
'safety_override_applied': any(pred > 3 for pred in predictions.values()), |
|
'urgency_level': self._determine_urgency_level(total_criticality, reliability_score, equipment_risk_assessment) |
|
}, |
|
|
|
'similar_anomalies': similar_anomalies, |
|
|
|
'analysis': { |
|
'problem_types_detected': enhanced_features.get('problem_types', []), |
|
'equipment_mentioned': enhanced_features.get('equipment_mentioned', []), |
|
'severity_score': enhanced_features.get('enhanced_severity_score', 0), |
|
'technical_complexity': round(enhanced_features.get('technical_complexity', 0), 2), |
|
'pattern_indicators': self._identify_critical_patterns(enhanced_features) |
|
}, |
|
|
|
'model_metadata': { |
|
'version': '2.0_enhanced', |
|
'features_used': len([k for k in enhanced_features.keys() if k != 'Description']), |
|
'equipment_intelligence_enabled': True, |
|
'safety_rules_enabled': bool(self.safety_rules) |
|
} |
|
} |
|
|
|
def _calculate_reliability_score(self, confidences: Dict, enhanced_features: Dict, |
|
equipment_risk_assessment: Dict) -> float: |
|
"""Calculate enhanced reliability score""" |
|
|
|
|
|
prediction_confidence = np.mean(list(confidences.values())) |
|
|
|
|
|
model_agreement = 1.0 - (np.std(list(confidences.values())) / max(np.mean(list(confidences.values())), 0.1)) |
|
|
|
|
|
has_description = len(enhanced_features.get('Description', '')) > 10 |
|
has_equipment = enhanced_features.get('equipment_type_class', 'UNKNOWN') != 'UNKNOWN' |
|
has_section = enhanced_features.get('Section propriétaire', 'Unknown') != 'Unknown' |
|
feature_completeness = (has_description + has_equipment + has_section) / 3 |
|
|
|
|
|
equipment_confidence_boost = 0.0 |
|
if equipment_risk_assessment['equipment_type'] != 'UNKNOWN': |
|
equipment_confidence_boost = 0.1 |
|
|
|
|
|
pattern_confidence = 0.0 |
|
if enhanced_features.get('has_safety_mention', 0) == 1: |
|
pattern_confidence += 0.1 |
|
if enhanced_features.get('has_structural_failure', 0) == 1: |
|
pattern_confidence += 0.15 |
|
if enhanced_features.get('equipment_problem_risk', 0) > 1.5: |
|
pattern_confidence += 0.1 |
|
|
|
|
|
reliability_score = ( |
|
prediction_confidence * 0.4 + |
|
model_agreement * 0.25 + |
|
feature_completeness * 0.2 + |
|
equipment_confidence_boost + |
|
pattern_confidence |
|
) |
|
|
|
return min(reliability_score, 1.0) |
|
|
|
def _get_confidence_recommendation(self, reliability_score: float) -> str: |
|
"""Get confidence-based recommendation""" |
|
if reliability_score >= 0.85: |
|
return "Very high confidence - Prediction highly reliable" |
|
elif reliability_score >= 0.75: |
|
return "High confidence - Prediction can be trusted" |
|
elif reliability_score >= 0.65: |
|
return "Medium confidence - Consider expert review for critical decisions" |
|
elif reliability_score >= 0.5: |
|
return "Low confidence - Manual review recommended" |
|
else: |
|
return "Very low confidence - Expert assessment required" |
|
|
|
def _determine_urgency_level(self, total_criticality: int, reliability_score: float, |
|
equipment_risk_assessment: Dict) -> str: |
|
"""Determine enhanced urgency level""" |
|
|
|
|
|
adjusted_criticality = total_criticality * reliability_score |
|
|
|
|
|
equipment_urgency_multiplier = 1.0 |
|
if equipment_risk_assessment['equipment_type'] in ['COOLING_CRITICAL', 'ELECTRICAL_CRITICAL']: |
|
equipment_urgency_multiplier = 1.3 |
|
elif equipment_risk_assessment['equipment_type'] in ['TURBINE_SYSTEMS']: |
|
equipment_urgency_multiplier = 1.2 |
|
|
|
final_urgency_score = adjusted_criticality * equipment_urgency_multiplier |
|
|
|
if final_urgency_score >= 14: |
|
return "EMERGENCY - Immediate shutdown may be required" |
|
elif final_urgency_score >= 12: |
|
return "CRITICAL - Immediate action required (within 1 hour)" |
|
elif final_urgency_score >= 9: |
|
return "HIGH - Action required within 24 hours" |
|
elif final_urgency_score >= 6: |
|
return "MEDIUM - Action required within 1 week" |
|
else: |
|
return "LOW - Routine maintenance scheduling" |
|
|
|
def _identify_critical_patterns(self, enhanced_features: Dict) -> List[str]: |
|
"""Identify critical patterns in the anomaly""" |
|
|
|
patterns = [] |
|
|
|
if enhanced_features.get('has_structural_failure', 0) == 1: |
|
patterns.append('Structural failure detected') |
|
|
|
if enhanced_features.get('has_safety_mention', 0) == 1: |
|
patterns.append('Safety concern explicitly mentioned') |
|
|
|
if enhanced_features.get('electrical_cooling_issue', 0) == 1: |
|
patterns.append('Electrical equipment cooling issue') |
|
|
|
if enhanced_features.get('turbine_oil_issue', 0) == 1: |
|
patterns.append('Turbine lubrication system problem') |
|
|
|
if enhanced_features.get('main_equipment_failure', 0) == 1: |
|
patterns.append('Critical single-point equipment failure') |
|
|
|
if enhanced_features.get('has_escalation', 0) == 1: |
|
patterns.append('Problem escalation indicated') |
|
|
|
if enhanced_features.get('vibration_excessive', 0) == 1: |
|
patterns.append('Excessive vibration detected') |
|
|
|
if enhanced_features.get('temperature_elevee', 0) == 1: |
|
patterns.append('High temperature condition') |
|
|
|
if enhanced_features.get('enhanced_severity_score', 0) >= 4: |
|
patterns.append('High severity language detected') |
|
|
|
return patterns |
|
|
|
|
|
|
|
|
|
|
|
_enhanced_ai_instance = None |
|
|
|
def get_enhanced_ai_instance(): |
|
"""Get singleton enhanced AI instance""" |
|
global _enhanced_ai_instance |
|
if _enhanced_ai_instance is None: |
|
_enhanced_ai_instance = EnhancedAnomalyIntelligence() |
|
return _enhanced_ai_instance |
|
|
|
def predict_anomaly_single_enhanced(anomaly_data: Dict, **kwargs) -> Dict: |
|
"""Convenience function for enhanced single prediction""" |
|
ai = get_enhanced_ai_instance() |
|
return ai.predict_single(anomaly_data, **kwargs) |
|
|
|
def predict_anomaly_batch_enhanced(anomaly_list: List[Dict], **kwargs) -> List[Dict]: |
|
"""Convenience function for enhanced batch prediction""" |
|
ai = get_enhanced_ai_instance() |
|
return ai.predict_batch(anomaly_list, **kwargs) |
|
|
|
def process_excel_upload_enhanced(excel_data: pd.DataFrame, |
|
confidence_threshold: float = 0.7) -> pd.DataFrame: |
|
""" |
|
Process Excel upload with enhanced AI predictions |
|
|
|
Args: |
|
excel_data: DataFrame from uploaded Excel |
|
confidence_threshold: Confidence threshold for manual review |
|
|
|
Returns: |
|
DataFrame with enhanced AI prediction columns |
|
""" |
|
|
|
|
|
anomaly_list = excel_data.to_dict('records') |
|
|
|
|
|
predictions = predict_anomaly_batch_enhanced( |
|
anomaly_list, |
|
confidence_threshold=confidence_threshold, |
|
include_similar=False, |
|
format_type='simple', |
|
apply_safety_rules=True |
|
) |
|
|
|
|
|
result_df = excel_data.copy() |
|
|
|
|
|
result_df['AI_Predicted_Criticite'] = [p.get('predicted_criticite', 0) for p in predictions] |
|
result_df['AI_Predicted_Fiabilite'] = [p.get('predicted_fiabilite', 0) for p in predictions] |
|
result_df['AI_Predicted_Disponibilite'] = [p.get('predicted_disponibilite', 0) for p in predictions] |
|
result_df['AI_Predicted_Safety'] = [p.get('predicted_safety', 0) for p in predictions] |
|
result_df['AI_Confidence'] = [p.get('ai_confidence', 0.0) for p in predictions] |
|
result_df['AI_Needs_Review'] = [bool(p.get('needs_manual_review', True)) for p in predictions] |
|
|
|
|
|
result_df['AI_Equipment_Type'] = [p.get('equipment_type', 'UNKNOWN') for p in predictions] |
|
result_df['AI_Risk_Level'] = [p.get('equipment_risk_level', 'LOW') for p in predictions] |
|
result_df['AI_Business_Impact'] = [p.get('business_impact', 'MINOR') for p in predictions] |
|
result_df['AI_Safety_Override'] = [bool(p.get('safety_override_applied', False)) for p in predictions] |
|
|
|
|
|
result_df['Human_Verified'] = False |
|
result_df['Human_Criticite'] = None |
|
result_df['Human_Fiabilite'] = None |
|
result_df['Human_Disponibilite'] = None |
|
result_df['Human_Safety'] = None |
|
result_df['Correction_Reason'] = '' |
|
result_df['Verified_At'] = None |
|
result_df['Verified_By'] = '' |
|
result_df['Expert_Notes'] = '' |
|
|
|
return result_df |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
print("="*70) |
|
print("TESTING ENHANCED SINGLE ANOMALY PREDICTION") |
|
print("="*70) |
|
|
|
single_anomaly = { |
|
'Description': 'SAFETY : fuite vapeur importante sur TRANSFO PRINCIPAL, température élevée detectée, vibration excessive', |
|
'Section propriétaire': '34EL', |
|
'Description de l\'équipement': 'TRANSFO PRINCIPAL' |
|
} |
|
|
|
result = predict_anomaly_single_enhanced( |
|
single_anomaly, |
|
format_type='rich', |
|
apply_safety_rules=True, |
|
include_similar=True |
|
) |
|
|
|
print("Enhanced rich format result:") |
|
print(f"Predicted Criticality: {result['predictions']['criticite_totale']}") |
|
print(f"Equipment Type: {result['equipment_intelligence']['equipment_type']}") |
|
print(f"Risk Level: {result['equipment_intelligence']['risk_level']}") |
|
print(f"Business Impact: {result['equipment_intelligence']['business_impact']}") |
|
print(f"Safety Override Applied: {result['safety_analysis']['safety_override_applied']}") |
|
print(f"Urgency Level: {result['safety_analysis']['urgency_level']}") |
|
print(f"Risk Factors: {result['equipment_intelligence']['risk_factors']}") |
|
|
|
|
|
print("\n" + "="*70) |
|
print("TESTING ENHANCED BATCH PREDICTION") |
|
print("="*70) |
|
|
|
batch_anomalies = [ |
|
{ |
|
'Description': 'vibration excessive ALTERNATEUR, bruit anormal détecté', |
|
'Section propriétaire': '34EL', |
|
'Description de l\'équipement': 'ALTERNATEUR' |
|
}, |
|
{ |
|
'Description': 'fuite huile système hydraulique TURBINE, pression basse', |
|
'Section propriétaire': '34MM', |
|
'Description de l\'équipement': 'TURBINE' |
|
}, |
|
{ |
|
'Description': 'maintenance préventive DECRASSEUR à prévoir', |
|
'Section propriétaire': '34MC', |
|
'Description de l\'équipement': 'DECRASSEUR' |
|
}, |
|
{ |
|
'Description': 'percement conduite vapeur VENTILATEUR DE REFROIDISSEMENT TP', |
|
'Section propriétaire': '34EL', |
|
'Description de l\'équipement': 'VENTILATEUR DE REFROIDISSEMENT TP' |
|
} |
|
] |
|
|
|
batch_results = predict_anomaly_batch_enhanced( |
|
batch_anomalies, |
|
confidence_threshold=0.7, |
|
format_type='simple', |
|
apply_safety_rules=True |
|
) |
|
|
|
print("Enhanced batch results:") |
|
for i, result in enumerate(batch_results): |
|
print(f"\nAnomaly {i+1}:") |
|
print(f" Equipment Type: {result.get('equipment_type', 'N/A')}") |
|
print(f" Criticité: {result.get('predicted_criticite', 'N/A')}") |
|
print(f" Risk Level: {result.get('equipment_risk_level', 'N/A')}") |
|
print(f" Business Impact: {result.get('business_impact', 'N/A')}") |
|
print(f" Confidence: {result.get('ai_confidence', 'N/A')}") |
|
print(f" Safety Override: {result.get('safety_override_applied', 'N/A')}") |
|
print(f" Needs Review: {result.get('needs_manual_review', 'N/A')}") |
|
|
|
|
|
print("\n" + "="*70) |
|
print("TESTING ENHANCED EXCEL PROCESSING") |
|
print("="*70) |
|
|
|
|
|
excel_df = pd.DataFrame([ |
|
{ |
|
'Description': 'problème refroidissement TRANSFO PRINCIPAL', |
|
'Section propriétaire': '34EL', |
|
'Description de l\'équipement': 'TRANSFO PRINCIPAL', |
|
'Date de détéction de l\'anomalie': '2025-01-15' |
|
}, |
|
{ |
|
'Description': 'SAFETY : éclatement tube chaudière, fissure détectée', |
|
'Section propriétaire': '34MD', |
|
'Description de l\'équipement': 'CHAUDIERE', |
|
'Date de détéction de l\'anomalie': '2025-01-16' |
|
}, |
|
{ |
|
'Description': 'maintenance POMPE A prévoir', |
|
'Section propriétaire': '34MC', |
|
'Description de l\'équipement': 'POMPE', |
|
'Date de détéction de l\'anomalie': '2025-01-17' |
|
} |
|
]) |
|
|
|
processed_df = process_excel_upload_enhanced(excel_df, confidence_threshold=0.7) |
|
|
|
print("Enhanced processed Excel columns:") |
|
enhanced_columns = [col for col in processed_df.columns if col.startswith('AI_')] |
|
print(enhanced_columns) |
|
|
|
print("\nSample of enhanced processed data:") |
|
display_cols = ['Description', 'AI_Predicted_Criticite', 'AI_Equipment_Type', |
|
'AI_Risk_Level', 'AI_Business_Impact', 'AI_Safety_Override', 'AI_Needs_Review'] |
|
print(processed_df[display_cols].to_string(index=False)) |
|
|
|
print("\n" + "🎯" + "="*68) |
|
print("ENHANCED ANOMALY INTELLIGENCE v2.0 TESTS COMPLETED SUCCESSFULLY!") |
|
print("="*70) |
|
print("✓ Equipment Intelligence Integration") |
|
print("✓ Safety Override Rules") |
|
print("✓ Enhanced Risk Assessment") |
|
print("✓ Conservative Prediction Bias") |
|
print("✓ Business Impact Analysis") |
|
print("✓ Production-Ready Performance") |
|
print("="*70) |