Ahmedik95316's picture
Update model/retrain.py
9cbcf89
raw
history blame
83.7 kB
# Enhanced version with comprehensive cross-validation and advanced feature engineering
import json
import shutil
import joblib
import asyncio
import logging
import hashlib
import schedule
import threading
import numpy as np
import pandas as pd
from scipy import stats
from pathlib import Path
import time as time_module
from datetime import datetime, timedelta
from typing import Dict, Tuple, Optional, Any, List
from monitor.monitor_drift import AdvancedDriftMonitor
import warnings
warnings.filterwarnings('ignore')
# Scikit-learn imports
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, confusion_matrix, classification_report
)
from sklearn.model_selection import (
cross_val_score, StratifiedKFold, cross_validate, train_test_split
)
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer
from sklearn.feature_selection import SelectKBest, chi2
# Import enhanced feature engineering components
try:
from features.feature_engineer import AdvancedFeatureEngineer, create_enhanced_pipeline, analyze_feature_importance
from features.sentiment_analyzer import SentimentAnalyzer
from features.readability_analyzer import ReadabilityAnalyzer
from features.entity_analyzer import EntityAnalyzer
from features.linguistic_analyzer import LinguisticAnalyzer
ENHANCED_FEATURES_AVAILABLE = True
except ImportError as e:
ENHANCED_FEATURES_AVAILABLE = False
logging.warning(f"Enhanced features not available in retrain.py, falling back to basic TF-IDF: {e}")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/tmp/model_retraining.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Log enhanced feature availability
if ENHANCED_FEATURES_AVAILABLE:
logger.info("Enhanced feature engineering components loaded for retraining")
else:
logger.warning("Enhanced features not available - using standard TF-IDF for retraining")
def preprocess_text_function(texts):
"""Standalone function for text preprocessing - pickle-safe"""
import re
def clean_single_text(text):
text = str(text)
text = re.sub(r'http\S+|www\S+|https\S+', '', text)
text = re.sub(r'\S+@\S+', '', text)
text = re.sub(r'[!]{2,}', '!', text)
text = re.sub(r'[?]{2,}', '?', text)
text = re.sub(r'[.]{3,}', '...', text)
text = re.sub(r'[^a-zA-Z\s.!?]', '', text)
text = re.sub(r'\s+', ' ', text)
return text.strip().lower()
processed = []
for text in texts:
processed.append(clean_single_text(text))
return processed
class CVModelComparator:
"""Advanced model comparison using cross-validation and statistical tests with enhanced features"""
def __init__(self, cv_folds: int = 5, random_state: int = 42):
self.cv_folds = cv_folds
self.random_state = random_state
def create_cv_strategy(self, X, y) -> StratifiedKFold:
"""Create appropriate CV strategy based on data characteristics"""
n_samples = len(X)
min_samples_per_fold = 3
max_folds = n_samples // min_samples_per_fold
unique_classes = np.unique(y)
min_class_count = min([np.sum(y == cls) for cls in unique_classes])
max_folds_by_class = min_class_count
actual_folds = max(2, min(self.cv_folds, max_folds, max_folds_by_class))
logger.info(f"Using {actual_folds} CV folds for enhanced model comparison")
return StratifiedKFold(
n_splits=actual_folds,
shuffle=True,
random_state=self.random_state
)
def perform_model_cv_evaluation(self, model, X, y, cv_strategy=None) -> Dict:
"""Perform comprehensive CV evaluation of a model with enhanced features"""
if cv_strategy is None:
cv_strategy = self.create_cv_strategy(X, y)
logger.info(f"Performing enhanced CV evaluation with {cv_strategy.n_splits} folds...")
scoring_metrics = {
'accuracy': 'accuracy',
'precision': 'precision_weighted',
'recall': 'recall_weighted',
'f1': 'f1_weighted',
'roc_auc': 'roc_auc'
}
try:
cv_scores = cross_validate(
model, X, y,
cv=cv_strategy,
scoring=scoring_metrics,
return_train_score=True,
n_jobs=1,
verbose=0
)
cv_results = {
'n_splits': cv_strategy.n_splits,
'test_scores': {},
'train_scores': {},
'fold_results': [],
'feature_engineering_type': self._detect_feature_type(model)
}
# Process results for each metric
for metric_name in scoring_metrics.keys():
test_key = f'test_{metric_name}'
train_key = f'train_{metric_name}'
if test_key in cv_scores:
test_scores = cv_scores[test_key]
cv_results['test_scores'][metric_name] = {
'mean': float(np.mean(test_scores)),
'std': float(np.std(test_scores)),
'min': float(np.min(test_scores)),
'max': float(np.max(test_scores)),
'scores': test_scores.tolist()
}
if train_key in cv_scores:
train_scores = cv_scores[train_key]
cv_results['train_scores'][metric_name] = {
'mean': float(np.mean(train_scores)),
'std': float(np.std(train_scores)),
'scores': train_scores.tolist()
}
# Individual fold results
for fold_idx in range(cv_strategy.n_splits):
fold_result = {
'fold': fold_idx + 1,
'test_scores': {},
'train_scores': {}
}
for metric_name in scoring_metrics.keys():
test_key = f'test_{metric_name}'
train_key = f'train_{metric_name}'
if test_key in cv_scores:
fold_result['test_scores'][metric_name] = float(cv_scores[test_key][fold_idx])
if train_key in cv_scores:
fold_result['train_scores'][metric_name] = float(cv_scores[train_key][fold_idx])
cv_results['fold_results'].append(fold_result)
# Calculate overfitting and stability scores
if 'accuracy' in cv_results['test_scores'] and 'accuracy' in cv_results['train_scores']:
train_mean = cv_results['train_scores']['accuracy']['mean']
test_mean = cv_results['test_scores']['accuracy']['mean']
cv_results['overfitting_score'] = float(train_mean - test_mean)
test_std = cv_results['test_scores']['accuracy']['std']
cv_results['stability_score'] = float(1 - (test_std / test_mean)) if test_mean > 0 else 0
return cv_results
except Exception as e:
logger.error(f"Enhanced CV evaluation failed: {e}")
return {'error': str(e), 'n_splits': cv_strategy.n_splits}
def _detect_feature_type(self, model) -> str:
"""Detect whether model uses enhanced or standard features"""
try:
if hasattr(model, 'named_steps'):
if 'enhanced_features' in model.named_steps:
return 'enhanced'
elif 'vectorize' in model.named_steps:
return 'standard_tfidf'
return 'unknown'
except:
return 'unknown'
def compare_models_with_cv(self, model1, model2, X, y, model1_name="Production", model2_name="Candidate") -> Dict:
"""Compare two models using cross-validation with enhanced feature awareness"""
logger.info(f"Comparing {model1_name} vs {model2_name} models using enhanced CV...")
try:
cv_strategy = self.create_cv_strategy(X, y)
# Evaluate both models with same CV folds
results1 = self.perform_model_cv_evaluation(model1, X, y, cv_strategy)
results2 = self.perform_model_cv_evaluation(model2, X, y, cv_strategy)
if 'error' in results1 or 'error' in results2:
return {
'error': 'One or both models failed CV evaluation',
'model1_results': results1,
'model2_results': results2
}
# Statistical comparison with feature type awareness
comparison_results = {
'model1_name': model1_name,
'model2_name': model2_name,
'cv_folds': cv_strategy.n_splits,
'model1_cv_results': results1,
'model2_cv_results': results2,
'statistical_tests': {},
'metric_comparisons': {},
'feature_engineering_comparison': {
'model1_features': results1.get('feature_engineering_type', 'unknown'),
'model2_features': results2.get('feature_engineering_type', 'unknown'),
'feature_upgrade': self._assess_feature_upgrade(results1, results2)
}
}
# Compare each metric
for metric in ['accuracy', 'f1', 'precision', 'recall']:
if (metric in results1['test_scores'] and
metric in results2['test_scores']):
scores1 = results1['test_scores'][metric]['scores']
scores2 = results2['test_scores'][metric]['scores']
metric_comparison = self._compare_metric_scores(
scores1, scores2, metric, model1_name, model2_name
)
comparison_results['metric_comparisons'][metric] = metric_comparison
# Enhanced promotion decision logic
promotion_decision = self._make_enhanced_promotion_decision(comparison_results)
comparison_results['promotion_decision'] = promotion_decision
logger.info(f"Enhanced model comparison completed: {promotion_decision['reason']}")
return comparison_results
except Exception as e:
logger.error(f"Enhanced model comparison failed: {e}")
return {'error': str(e)}
def _assess_feature_upgrade(self, results1: Dict, results2: Dict) -> Dict:
"""Assess if there's a feature engineering upgrade"""
feature1 = results1.get('feature_engineering_type', 'unknown')
feature2 = results2.get('feature_engineering_type', 'unknown')
upgrade_assessment = {
'is_upgrade': False,
'upgrade_type': 'none',
'description': 'No feature engineering change detected'
}
if feature1 == 'standard_tfidf' and feature2 == 'enhanced':
upgrade_assessment.update({
'is_upgrade': True,
'upgrade_type': 'standard_to_enhanced',
'description': 'Upgrade from standard TF-IDF to enhanced feature engineering'
})
elif feature1 == 'enhanced' and feature2 == 'standard_tfidf':
upgrade_assessment.update({
'is_upgrade': False,
'upgrade_type': 'enhanced_to_standard',
'description': 'Downgrade from enhanced features to standard TF-IDF'
})
elif feature1 == feature2 and feature1 != 'unknown':
upgrade_assessment.update({
'is_upgrade': False,
'upgrade_type': 'same_features',
'description': f'Both models use {feature1} features'
})
return upgrade_assessment
def _make_enhanced_promotion_decision(self, comparison_results: Dict) -> Dict:
"""Enhanced promotion decision that considers feature engineering upgrades"""
f1_comparison = comparison_results['metric_comparisons'].get('f1', {})
accuracy_comparison = comparison_results['metric_comparisons'].get('accuracy', {})
feature_comparison = comparison_results['feature_engineering_comparison']
promote_candidate = False
promotion_reason = ""
confidence = 0.0
# Factor in feature engineering improvements
feature_upgrade = feature_comparison.get('feature_upgrade', {})
is_feature_upgrade = feature_upgrade.get('is_upgrade', False)
# Enhanced decision logic
if f1_comparison.get('significant_improvement', False):
promote_candidate = True
promotion_reason = f"Significant F1 improvement: {f1_comparison.get('improvement', 0):.4f}"
confidence = 0.8
if is_feature_upgrade:
promotion_reason += " with enhanced feature engineering"
confidence = 0.9
elif is_feature_upgrade and f1_comparison.get('improvement', 0) > 0.005:
# Lower threshold for promotion when upgrading features
promote_candidate = True
promotion_reason = f"Feature engineering upgrade with F1 improvement: {f1_comparison.get('improvement', 0):.4f}"
confidence = 0.7
elif (f1_comparison.get('improvement', 0) > 0.01 and
accuracy_comparison.get('improvement', 0) > 0.01):
promote_candidate = True
promotion_reason = "Practical improvement in both F1 and accuracy"
confidence = 0.6
if is_feature_upgrade:
promotion_reason += " with enhanced features"
confidence = 0.75
elif f1_comparison.get('improvement', 0) > 0.02:
promote_candidate = True
promotion_reason = f"Large F1 improvement: {f1_comparison.get('improvement', 0):.4f}"
confidence = 0.7
else:
if is_feature_upgrade:
promotion_reason = f"Feature upgrade available but insufficient performance gain ({f1_comparison.get('improvement', 0):.4f})"
else:
promotion_reason = "No significant improvement detected"
confidence = 0.3
return {
'promote_candidate': promote_candidate,
'reason': promotion_reason,
'confidence': confidence,
'feature_engineering_factor': is_feature_upgrade,
'feature_upgrade_details': feature_upgrade
}
def _compare_metric_scores(self, scores1: list, scores2: list, metric: str,
model1_name: str, model2_name: str) -> Dict:
"""Compare metric scores between two models using statistical tests"""
try:
# Basic statistics
mean1, mean2 = np.mean(scores1), np.mean(scores2)
std1, std2 = np.std(scores1), np.std(scores2)
improvement = mean2 - mean1
comparison = {
'metric': metric,
f'{model1_name.lower()}_mean': float(mean1),
f'{model2_name.lower()}_mean': float(mean2),
f'{model1_name.lower()}_std': float(std1),
f'{model2_name.lower()}_std': float(std2),
'improvement': float(improvement),
'relative_improvement': float(improvement / mean1 * 100) if mean1 > 0 else 0,
'tests': {}
}
# Paired t-test
try:
t_stat, p_value = stats.ttest_rel(scores2, scores1)
comparison['tests']['paired_ttest'] = {
't_statistic': float(t_stat),
'p_value': float(p_value),
'significant': p_value < 0.05
}
except Exception as e:
logger.warning(f"Paired t-test failed for {metric}: {e}")
# Wilcoxon signed-rank test (non-parametric alternative)
try:
w_stat, w_p_value = stats.wilcoxon(scores2, scores1, alternative='greater')
comparison['tests']['wilcoxon'] = {
'statistic': float(w_stat),
'p_value': float(w_p_value),
'significant': w_p_value < 0.05
}
except Exception as e:
logger.warning(f"Wilcoxon test failed for {metric}: {e}")
# Effect size (Cohen's d)
try:
pooled_std = np.sqrt(((len(scores1) - 1) * std1**2 + (len(scores2) - 1) * std2**2) /
(len(scores1) + len(scores2) - 2))
cohens_d = improvement / pooled_std if pooled_std > 0 else 0
comparison['effect_size'] = float(cohens_d)
except Exception:
comparison['effect_size'] = 0
# Practical significance
practical_threshold = 0.01 # 1% improvement threshold
comparison['practical_significance'] = abs(improvement) > practical_threshold
comparison['significant_improvement'] = (
improvement > practical_threshold and
comparison['tests'].get('paired_ttest', {}).get('significant', False)
)
return comparison
except Exception as e:
logger.error(f"Metric comparison failed for {metric}: {e}")
return {'metric': metric, 'error': str(e)}
def _calculate_decision_confidence(self, comparison_results: Dict) -> float:
"""Calculate confidence in the promotion decision with enhanced features consideration"""
try:
confidence_factors = []
# Check F1 improvement significance
f1_comp = comparison_results['metric_comparisons'].get('f1', {})
if f1_comp.get('significant_improvement', False):
confidence_factors.append(0.4)
elif f1_comp.get('improvement', 0) > 0.01:
confidence_factors.append(0.2)
# Feature engineering upgrade bonus
feature_upgrade = comparison_results.get('feature_engineering_comparison', {}).get('feature_upgrade', {})
if feature_upgrade.get('is_upgrade', False):
confidence_factors.append(0.3) # Significant bonus for feature upgrades
# Check consistency across metrics
improved_metrics = 0
total_metrics = 0
for metric_comp in comparison_results['metric_comparisons'].values():
if isinstance(metric_comp, dict) and 'improvement' in metric_comp:
total_metrics += 1
if metric_comp['improvement'] > 0:
improved_metrics += 1
if total_metrics > 0:
consistency_score = improved_metrics / total_metrics
confidence_factors.append(consistency_score * 0.3)
# Check effect sizes
effect_sizes = []
for metric_comp in comparison_results['metric_comparisons'].values():
if isinstance(metric_comp, dict) and 'effect_size' in metric_comp:
effect_sizes.append(abs(metric_comp['effect_size']))
if effect_sizes:
avg_effect_size = np.mean(effect_sizes)
if avg_effect_size > 0.5: # Large effect
confidence_factors.append(0.2)
elif avg_effect_size > 0.2: # Medium effect
confidence_factors.append(0.1)
# Calculate final confidence
total_confidence = sum(confidence_factors)
return min(1.0, max(0.0, total_confidence))
except Exception as e:
logger.warning(f"Confidence calculation failed: {e}")
return 0.5
class EnhancedModelRetrainer:
"""Production-ready model retraining with enhanced feature engineering and comprehensive CV"""
def __init__(self):
self.setup_paths()
self.setup_retraining_config()
self.setup_statistical_tests()
self.cv_comparator = CVModelComparator()
# Enhanced feature engineering settings
self.enhanced_features_available = ENHANCED_FEATURES_AVAILABLE
self.use_enhanced_features = ENHANCED_FEATURES_AVAILABLE # Default to enhanced if available
logger.info(f"Enhanced retraining initialized with features: {'enhanced' if self.use_enhanced_features else 'standard'}")
def setup_paths(self):
"""Setup all necessary paths"""
self.base_dir = Path("/tmp")
self.data_dir = self.base_dir / "data"
self.model_dir = self.base_dir / "model"
self.logs_dir = self.base_dir / "logs"
self.backup_dir = self.base_dir / "backups"
self.features_dir = self.base_dir / "features" # For enhanced features
# Create directories
for dir_path in [self.data_dir, self.model_dir, self.logs_dir, self.backup_dir, self.features_dir]:
dir_path.mkdir(parents=True, exist_ok=True)
# Current production files
self.prod_model_path = self.model_dir / "model.pkl"
self.prod_vectorizer_path = self.model_dir / "vectorizer.pkl"
self.prod_pipeline_path = self.model_dir / "pipeline.pkl"
self.prod_feature_engineer_path = self.features_dir / "feature_engineer.pkl"
# Candidate files
self.candidate_model_path = self.model_dir / "model_candidate.pkl"
self.candidate_vectorizer_path = self.model_dir / "vectorizer_candidate.pkl"
self.candidate_pipeline_path = self.model_dir / "pipeline_candidate.pkl"
self.candidate_feature_engineer_path = self.features_dir / "feature_engineer_candidate.pkl"
# Data files
self.combined_data_path = self.data_dir / "combined_dataset.csv"
self.scraped_data_path = self.data_dir / "scraped_real.csv"
self.generated_data_path = self.data_dir / "generated_fake.csv"
# Metadata and logs
self.metadata_path = Path("/tmp/metadata.json")
self.retraining_log_path = self.logs_dir / "retraining_log.json"
self.comparison_log_path = self.logs_dir / "model_comparison.json"
self.feature_analysis_log_path = self.logs_dir / "feature_analysis.json"
def setup_retraining_config(self):
"""Setup enhanced retraining configuration"""
self.min_new_samples = 50
self.improvement_threshold = 0.01 # 1% improvement required
self.significance_level = 0.05
self.cv_folds = 5
self.test_size = 0.2
self.random_state = 42
self.max_retries = 3
self.backup_retention_days = 30
# Enhanced feature configuration
self.enhanced_feature_config = {
'enable_sentiment': True,
'enable_readability': True,
'enable_entities': True,
'enable_linguistic': True,
'feature_selection_k': 3000,
'tfidf_max_features': 7500,
'ngram_range': (1, 3),
'min_df': 2,
'max_df': 0.95
}
def setup_statistical_tests(self):
"""Setup statistical test configurations"""
self.statistical_tests = {
'paired_ttest': {'alpha': 0.05, 'name': "Paired T-Test"},
'wilcoxon': {'alpha': 0.05, 'name': "Wilcoxon Signed-Rank Test"},
'mcnemar': {'alpha': 0.05, 'name': "McNemar's Test"}
}
def detect_production_feature_type(self) -> str:
"""Detect what type of features the production model uses"""
try:
# Check if enhanced feature engineer exists
if self.prod_feature_engineer_path.exists():
return 'enhanced'
# Check pipeline structure
if self.prod_pipeline_path.exists():
pipeline = joblib.load(self.prod_pipeline_path)
if hasattr(pipeline, 'named_steps'):
if 'enhanced_features' in pipeline.named_steps:
return 'enhanced'
elif 'vectorize' in pipeline.named_steps:
return 'standard_tfidf'
# Check metadata
if self.metadata_path.exists():
with open(self.metadata_path, 'r') as f:
metadata = json.load(f)
feature_info = metadata.get('feature_engineering', {})
if feature_info.get('type') == 'enhanced':
return 'enhanced'
return 'standard_tfidf'
except Exception as e:
logger.warning(f"Could not detect production feature type: {e}")
return 'unknown'
def load_existing_metadata(self) -> Optional[Dict]:
"""Load existing model metadata with enhanced feature information"""
try:
if self.metadata_path.exists():
with open(self.metadata_path, 'r') as f:
metadata = json.load(f)
# Log feature engineering information
feature_info = metadata.get('feature_engineering', {})
logger.info(f"Loaded metadata: {metadata.get('model_version', 'Unknown')} with {feature_info.get('type', 'unknown')} features")
return metadata
else:
logger.warning("No existing metadata found")
return None
except Exception as e:
logger.error(f"Failed to load metadata: {str(e)}")
return None
def load_production_model(self) -> Tuple[bool, Optional[Any], str]:
"""Load current production model with enhanced feature support"""
try:
# Detect production feature type
prod_feature_type = self.detect_production_feature_type()
logger.info(f"Production model uses: {prod_feature_type} features")
# Try to load pipeline first (preferred)
if self.prod_pipeline_path.exists():
model = joblib.load(self.prod_pipeline_path)
logger.info("Loaded production pipeline")
return True, model, f"Pipeline loaded successfully ({prod_feature_type} features)"
# Fallback to individual components
elif self.prod_model_path.exists() and self.prod_vectorizer_path.exists():
model = joblib.load(self.prod_model_path)
vectorizer = joblib.load(self.prod_vectorizer_path)
logger.info("Loaded production model and vectorizer components")
return True, (model, vectorizer), f"Model components loaded successfully ({prod_feature_type} features)"
else:
return False, None, "No production model found"
except Exception as e:
error_msg = f"Failed to load production model: {str(e)}"
logger.error(error_msg)
return False, None, error_msg
def load_new_data(self) -> Tuple[bool, Optional[pd.DataFrame], str]:
"""Load and combine all available data"""
try:
logger.info("Loading training data for enhanced retraining...")
dataframes = []
# Load combined dataset (base)
if self.combined_data_path.exists():
df_combined = pd.read_csv(self.combined_data_path)
dataframes.append(df_combined)
logger.info(f"Loaded combined dataset: {len(df_combined)} samples")
# Load scraped real news
if self.scraped_data_path.exists():
df_scraped = pd.read_csv(self.scraped_data_path)
if 'label' not in df_scraped.columns:
df_scraped['label'] = 0 # Real news
dataframes.append(df_scraped)
logger.info(f"Loaded scraped data: {len(df_scraped)} samples")
# Load generated fake news
if self.generated_data_path.exists():
df_generated = pd.read_csv(self.generated_data_path)
if 'label' not in df_generated.columns:
df_generated['label'] = 1 # Fake news
dataframes.append(df_generated)
logger.info(f"Loaded generated data: {len(df_generated)} samples")
if not dataframes:
return False, None, "No data files found"
# Combine all data
df = pd.concat(dataframes, ignore_index=True)
# Data cleaning and validation
df = self.clean_and_validate_data(df)
if len(df) < 100:
return False, None, f"Insufficient data after cleaning: {len(df)} samples"
logger.info(f"Total training data: {len(df)} samples")
return True, df, f"Successfully loaded {len(df)} samples"
except Exception as e:
error_msg = f"Failed to load data: {str(e)}"
logger.error(error_msg)
return False, None, error_msg
def clean_and_validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Clean and validate the training data"""
initial_count = len(df)
# Remove duplicates
df = df.drop_duplicates(subset=['text'], keep='first')
# Remove null values
df = df.dropna(subset=['text', 'label'])
# Validate text quality
df = df[df['text'].astype(str).str.len() > 10]
# Validate labels
df = df[df['label'].isin([0, 1])]
# Remove excessive length texts
df = df[df['text'].astype(str).str.len() < 10000]
logger.info(f"Data cleaning: {initial_count} -> {len(df)} samples")
return df
def create_enhanced_pipeline(self, use_enhanced_features: bool = None) -> Pipeline:
"""Create enhanced ML pipeline with feature engineering"""
if use_enhanced_features is None:
use_enhanced_features = self.use_enhanced_features
if use_enhanced_features and ENHANCED_FEATURES_AVAILABLE:
logger.info("Creating enhanced feature engineering pipeline for retraining...")
# Create enhanced feature engineer with configuration
feature_engineer = AdvancedFeatureEngineer(
enable_sentiment=self.enhanced_feature_config['enable_sentiment'],
enable_readability=self.enhanced_feature_config['enable_readability'],
enable_entities=self.enhanced_feature_config['enable_entities'],
enable_linguistic=self.enhanced_feature_config['enable_linguistic'],
feature_selection_k=self.enhanced_feature_config['feature_selection_k'],
tfidf_max_features=self.enhanced_feature_config['tfidf_max_features'],
ngram_range=self.enhanced_feature_config['ngram_range'],
min_df=self.enhanced_feature_config['min_df'],
max_df=self.enhanced_feature_config['max_df']
)
# Create pipeline with enhanced features
pipeline = Pipeline([
('enhanced_features', feature_engineer),
('model', LogisticRegression(
max_iter=1000,
class_weight='balanced',
random_state=self.random_state
))
])
return pipeline
else:
logger.info("Creating standard TF-IDF pipeline for retraining...")
def preprocess_text(texts):
return preprocess_text_function(texts)
# Create standard pipeline
pipeline = Pipeline([
('preprocess', FunctionTransformer(preprocess_text, validate=False)),
('vectorize', TfidfVectorizer(
max_features=10000,
min_df=2,
max_df=0.95,
ngram_range=(1, 3),
stop_words='english',
sublinear_tf=True
)),
('feature_select', SelectKBest(chi2, k=5000)),
('model', LogisticRegression(
max_iter=1000,
class_weight='balanced',
random_state=self.random_state
))
])
return pipeline
def train_candidate_model(self, df: pd.DataFrame) -> Tuple[bool, Optional[Any], Dict]:
"""Train candidate model with enhanced features and comprehensive CV evaluation"""
try:
logger.info("Training candidate model with enhanced feature engineering...")
# Prepare data
X = df['text'].values
y = df['label'].values
# Determine feature type to use for candidate
candidate_feature_type = 'enhanced' if self.use_enhanced_features else 'standard'
prod_feature_type = self.detect_production_feature_type()
logger.info(f"Training candidate with {candidate_feature_type} features (production uses {prod_feature_type})")
# Create and train pipeline
pipeline = self.create_enhanced_pipeline(self.use_enhanced_features)
# Perform cross-validation before final training
logger.info("Performing cross-validation on candidate model...")
cv_results = self.cv_comparator.perform_model_cv_evaluation(pipeline, X, y)
# Train on full dataset for final model
pipeline.fit(X, y)
# Additional holdout evaluation
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=self.test_size, stratify=y, random_state=self.random_state
)
pipeline_holdout = self.create_enhanced_pipeline(self.use_enhanced_features)
pipeline_holdout.fit(X_train, y_train)
# Evaluate on holdout
y_pred = pipeline_holdout.predict(X_test)
y_pred_proba = pipeline_holdout.predict_proba(X_test)[:, 1]
holdout_metrics = {
'accuracy': float(accuracy_score(y_test, y_pred)),
'precision': float(precision_score(y_test, y_pred, average='weighted')),
'recall': float(recall_score(y_test, y_pred, average='weighted')),
'f1': float(f1_score(y_test, y_pred, average='weighted')),
'roc_auc': float(roc_auc_score(y_test, y_pred_proba))
}
# Extract feature information if using enhanced features
feature_analysis = {}
if self.use_enhanced_features and hasattr(pipeline, 'named_steps'):
feature_engineer = pipeline.named_steps.get('enhanced_features')
if feature_engineer and hasattr(feature_engineer, 'get_feature_metadata'):
try:
feature_analysis = {
'feature_metadata': feature_engineer.get_feature_metadata(),
'feature_importance': feature_engineer.get_feature_importance(top_k=20) if hasattr(feature_engineer, 'get_feature_importance') else {},
'total_features': len(feature_engineer.get_feature_names()) if hasattr(feature_engineer, 'get_feature_names') else 0
}
logger.info(f"Enhanced features extracted: {feature_analysis['total_features']} total features")
except Exception as e:
logger.warning(f"Could not extract feature analysis: {e}")
# Combine CV and holdout results
evaluation_results = {
'cross_validation': cv_results,
'holdout_evaluation': holdout_metrics,
'feature_analysis': feature_analysis,
'feature_type': candidate_feature_type,
'training_samples': len(X),
'test_samples': len(X_test)
}
# Save candidate model
joblib.dump(pipeline, self.candidate_pipeline_path)
if hasattr(pipeline, 'named_steps'):
if 'model' in pipeline.named_steps:
joblib.dump(pipeline.named_steps['model'], self.candidate_model_path)
# Save enhanced features or vectorizer
if 'enhanced_features' in pipeline.named_steps:
feature_engineer = pipeline.named_steps['enhanced_features']
if hasattr(feature_engineer, 'save_pipeline'):
feature_engineer.save_pipeline(self.candidate_feature_engineer_path)
# Save reference as vectorizer for compatibility
enhanced_ref = {
'type': 'enhanced_features',
'feature_engineer_path': str(self.candidate_feature_engineer_path),
'metadata': feature_analysis.get('feature_metadata', {})
}
joblib.dump(enhanced_ref, self.candidate_vectorizer_path)
elif 'vectorize' in pipeline.named_steps:
joblib.dump(pipeline.named_steps['vectorize'], self.candidate_vectorizer_path)
# Log results
if 'test_scores' in cv_results and 'f1' in cv_results['test_scores']:
cv_f1_mean = cv_results['test_scores']['f1']['mean']
cv_f1_std = cv_results['test_scores']['f1']['std']
logger.info(f"Candidate model CV F1: {cv_f1_mean:.4f}{cv_f1_std:.4f})")
logger.info(f"Candidate model holdout F1: {holdout_metrics['f1']:.4f}")
logger.info(f"Candidate model training completed with {candidate_feature_type} features")
return True, pipeline, evaluation_results
except Exception as e:
error_msg = f"Candidate model training failed: {str(e)}"
logger.error(error_msg)
return False, None, {'error': error_msg}
def compare_models_with_enhanced_cv_validation(self, prod_model, candidate_model, X, y) -> Dict:
"""Compare models using comprehensive cross-validation with enhanced feature awareness"""
logger.info("Performing comprehensive model comparison with enhanced CV...")
try:
# Use the enhanced CV comparator for detailed analysis
comparison_results = self.cv_comparator.compare_models_with_cv(
prod_model, candidate_model, X, y, "Production", "Candidate"
)
if 'error' in comparison_results:
return comparison_results
# Additional legacy format for backward compatibility
legacy_comparison = {
'production_cv_results': comparison_results['model1_cv_results'],
'candidate_cv_results': comparison_results['model2_cv_results'],
'statistical_tests': comparison_results['statistical_tests'],
'promotion_decision': comparison_results['promotion_decision']
}
# Extract key metrics for legacy format
prod_cv = comparison_results['model1_cv_results']
cand_cv = comparison_results['model2_cv_results']
if 'test_scores' in prod_cv and 'test_scores' in cand_cv:
if 'accuracy' in prod_cv['test_scores'] and 'accuracy' in cand_cv['test_scores']:
legacy_comparison.update({
'production_accuracy': prod_cv['test_scores']['accuracy']['mean'],
'candidate_accuracy': cand_cv['test_scores']['accuracy']['mean'],
'absolute_improvement': (cand_cv['test_scores']['accuracy']['mean'] -
prod_cv['test_scores']['accuracy']['mean']),
'relative_improvement': ((cand_cv['test_scores']['accuracy']['mean'] -
prod_cv['test_scores']['accuracy']['mean']) /
prod_cv['test_scores']['accuracy']['mean'] * 100)
})
# Merge detailed and legacy formats
final_results = {**comparison_results, **legacy_comparison}
# Log summary with enhanced feature information
f1_comp = comparison_results.get('metric_comparisons', {}).get('f1', {})
feature_comp = comparison_results.get('feature_engineering_comparison', {})
if f1_comp:
logger.info(f"F1 improvement: {f1_comp.get('improvement', 0):.4f}")
logger.info(f"Significant improvement: {f1_comp.get('significant_improvement', False)}")
if feature_comp:
feature_upgrade = feature_comp.get('feature_upgrade', {})
logger.info(f"Feature engineering: {feature_upgrade.get('description', 'No change')}")
promotion_decision = comparison_results.get('promotion_decision', {})
logger.info(f"Promotion recommendation: {promotion_decision.get('promote_candidate', False)}")
logger.info(f"Reason: {promotion_decision.get('reason', 'Unknown')}")
return final_results
except Exception as e:
logger.error(f"Enhanced model comparison failed: {str(e)}")
return {'error': str(e)}
def create_backup(self) -> bool:
"""Create backup of current production model with enhanced features"""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_dir = self.backup_dir / f"backup_{timestamp}"
backup_dir.mkdir(parents=True, exist_ok=True)
# Backup files
files_to_backup = [
(self.prod_model_path, backup_dir / "model.pkl"),
(self.prod_vectorizer_path, backup_dir / "vectorizer.pkl"),
(self.prod_pipeline_path, backup_dir / "pipeline.pkl"),
(self.metadata_path, backup_dir / "metadata.json"),
(self.prod_feature_engineer_path, backup_dir / "feature_engineer.pkl") # Enhanced features
]
for source, dest in files_to_backup:
if source.exists():
shutil.copy2(source, dest)
logger.info(f"Backup created: {backup_dir}")
return True
except Exception as e:
logger.error(f"Backup creation failed: {str(e)}")
return False
def promote_candidate_model(self, candidate_model, candidate_metrics: Dict, comparison_results: Dict) -> bool:
"""Promote candidate model to production with enhanced metadata and feature support"""
try:
logger.info("Promoting candidate model to production with enhanced features...")
# Create backup first
if not self.create_backup():
logger.error("Backup creation failed, aborting promotion")
return False
# Copy candidate files to production
shutil.copy2(self.candidate_model_path, self.prod_model_path)
shutil.copy2(self.candidate_vectorizer_path, self.prod_vectorizer_path)
shutil.copy2(self.candidate_pipeline_path, self.prod_pipeline_path)
# Copy enhanced feature engineer if it exists
if self.candidate_feature_engineer_path.exists():
shutil.copy2(self.candidate_feature_engineer_path, self.prod_feature_engineer_path)
logger.info("Enhanced feature engineer promoted to production")
# Update metadata with comprehensive enhanced feature information
metadata = self.load_existing_metadata() or {}
# Increment version
old_version = metadata.get('model_version', 'v1.0')
if old_version.startswith('v'):
try:
major, minor = map(int, old_version[1:].split('.'))
new_version = f"v{major}.{minor + 1}"
except:
new_version = f"v1.{int(datetime.now().timestamp()) % 1000}"
else:
new_version = f"v1.{int(datetime.now().timestamp()) % 1000}"
# Extract metrics from candidate evaluation
cv_results = candidate_metrics.get('cross_validation', {})
holdout_results = candidate_metrics.get('holdout_evaluation', {})
feature_analysis = candidate_metrics.get('feature_analysis', {})
# Update metadata with comprehensive information
metadata.update({
'model_version': new_version,
'model_type': 'enhanced_retrained_pipeline_cv',
'previous_version': old_version,
'promotion_timestamp': datetime.now().isoformat(),
'retrain_trigger': 'enhanced_cv_validated_retrain',
'training_samples': candidate_metrics.get('training_samples', 'Unknown'),
'test_samples': candidate_metrics.get('test_samples', 'Unknown')
})
# Enhanced feature engineering metadata
feature_type = candidate_metrics.get('feature_type', 'unknown')
metadata['feature_engineering'] = {
'type': feature_type,
'enhanced_features_available': ENHANCED_FEATURES_AVAILABLE,
'enhanced_features_used': feature_type == 'enhanced',
'feature_upgrade': comparison_results.get('feature_engineering_comparison', {}).get('feature_upgrade', {})
}
# Add feature analysis if available
if feature_analysis:
feature_metadata = feature_analysis.get('feature_metadata', {})
if feature_metadata:
metadata['enhanced_features'] = {
'total_features': feature_analysis.get('total_features', 0),
'feature_types': feature_metadata.get('feature_types', {}),
'configuration': feature_metadata.get('configuration', {})
}
# Add top features
top_features = feature_analysis.get('feature_importance', {})
if top_features:
metadata['top_features'] = dict(list(top_features.items())[:10])
# Save detailed feature analysis
try:
feature_analysis_data = {
'top_features': top_features,
'feature_metadata': feature_metadata,
'model_version': new_version,
'timestamp': datetime.now().isoformat(),
'feature_type': feature_type
}
with open(self.feature_analysis_log_path, 'w') as f:
json.dump(feature_analysis_data, f, indent=2)
logger.info(f"Feature analysis saved to {self.feature_analysis_log_path}")
except Exception as e:
logger.warning(f"Could not save feature analysis: {e}")
# Add holdout evaluation results
if holdout_results:
metadata.update({
'test_accuracy': holdout_results.get('accuracy', 'Unknown'),
'test_f1': holdout_results.get('f1', 'Unknown'),
'test_precision': holdout_results.get('precision', 'Unknown'),
'test_recall': holdout_results.get('recall', 'Unknown'),
'test_roc_auc': holdout_results.get('roc_auc', 'Unknown')
})
# Add comprehensive CV results
if cv_results and 'test_scores' in cv_results:
metadata['cross_validation'] = {
'n_splits': cv_results.get('n_splits', self.cv_folds),
'test_scores': cv_results['test_scores'],
'train_scores': cv_results.get('train_scores', {}),
'overfitting_score': cv_results.get('overfitting_score', 'Unknown'),
'stability_score': cv_results.get('stability_score', 'Unknown'),
'individual_fold_results': cv_results.get('fold_results', []),
'feature_engineering_type': cv_results.get('feature_engineering_type', feature_type)
}
# Add CV summary statistics
if 'f1' in cv_results['test_scores']:
metadata.update({
'cv_f1_mean': cv_results['test_scores']['f1']['mean'],
'cv_f1_std': cv_results['test_scores']['f1']['std'],
'cv_f1_min': cv_results['test_scores']['f1']['min'],
'cv_f1_max': cv_results['test_scores']['f1']['max']
})
# Add enhanced model comparison results
promotion_decision = comparison_results.get('promotion_decision', {})
metadata['promotion_validation'] = {
'decision_confidence': promotion_decision.get('confidence', 'Unknown'),
'promotion_reason': promotion_decision.get('reason', 'Unknown'),
'comparison_method': 'enhanced_cv_statistical_tests',
'feature_engineering_factor': promotion_decision.get('feature_engineering_factor', False),
'feature_upgrade_details': promotion_decision.get('feature_upgrade_details', {})
}
# Add enhanced statistical test results
metric_comparisons = comparison_results.get('metric_comparisons', {})
if metric_comparisons:
metadata['statistical_validation'] = {}
for metric, comparison in metric_comparisons.items():
if isinstance(comparison, dict):
metadata['statistical_validation'][metric] = {
'improvement': comparison.get('improvement', 0),
'significant_improvement': comparison.get('significant_improvement', False),
'effect_size': comparison.get('effect_size', 0),
'tests': comparison.get('tests', {})
}
# Save updated metadata
with open(self.metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
# Log promotion summary
feature_info = ""
if feature_type == 'enhanced':
total_features = feature_analysis.get('total_features', 0)
feature_info = f" with {total_features} enhanced features"
logger.info(f"Model promoted successfully to {new_version}{feature_info}")
logger.info(f"Promotion reason: {promotion_decision.get('reason', 'Enhanced CV validation passed')}")
return True
except Exception as e:
logger.error(f"Enhanced model promotion failed: {str(e)}")
return False
def log_retraining_session(self, results: Dict):
"""Log comprehensive retraining session results with enhanced feature information"""
try:
log_entry = {
'timestamp': datetime.now().isoformat(),
'results': results,
'session_id': hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8],
'retraining_type': 'enhanced_cv_features',
'enhanced_features_used': self.use_enhanced_features,
'enhanced_features_available': ENHANCED_FEATURES_AVAILABLE
}
# Load existing logs
logs = []
if self.retraining_log_path.exists():
try:
with open(self.retraining_log_path, 'r') as f:
logs = json.load(f)
except:
logs = []
# Add new log
logs.append(log_entry)
# Keep only last 100 entries
if len(logs) > 100:
logs = logs[-100:]
# Save logs
with open(self.retraining_log_path, 'w') as f:
json.dump(logs, f, indent=2)
# Also save detailed comparison results
if 'comparison_results' in results:
comparison_logs = []
if self.comparison_log_path.exists():
try:
with open(self.comparison_log_path, 'r') as f:
comparison_logs = json.load(f)
except:
comparison_logs = []
comparison_entry = {
'timestamp': datetime.now().isoformat(),
'session_id': log_entry['session_id'],
'comparison_details': results['comparison_results'],
'enhanced_features_info': {
'used': self.use_enhanced_features,
'available': ENHANCED_FEATURES_AVAILABLE,
'feature_comparison': results['comparison_results'].get('feature_engineering_comparison', {})
}
}
comparison_logs.append(comparison_entry)
if len(comparison_logs) > 50:
comparison_logs = comparison_logs[-50:]
with open(self.comparison_log_path, 'w') as f:
json.dump(comparison_logs, f, indent=2)
except Exception as e:
logger.error(f"Failed to log enhanced retraining session: {str(e)}")
def retrain_model(self) -> Tuple[bool, str]:
"""Main retraining function with enhanced feature engineering and comprehensive CV validation"""
try:
logger.info("Starting enhanced model retraining with advanced feature engineering...")
# Load existing metadata
existing_metadata = self.load_existing_metadata()
# Load production model
prod_success, prod_model, prod_msg = self.load_production_model()
if not prod_success:
logger.warning(f"No production model found: {prod_msg}")
# Fall back to initial training
try:
from model.train import main as train_main
train_main()
return True, "Initial enhanced training completed"
except ImportError:
return False, "No production model and cannot import training module"
# Load new data
data_success, df, data_msg = self.load_new_data()
if not data_success:
return False, data_msg
# Check if we have enough new data
if len(df) < self.min_new_samples:
return False, f"Insufficient new data: {len(df)} < {self.min_new_samples}"
# Determine optimal feature engineering strategy
prod_feature_type = self.detect_production_feature_type()
candidate_feature_type = 'enhanced' if self.use_enhanced_features else 'standard'
logger.info(f"Retraining strategy: {prod_feature_type} -> {candidate_feature_type}")
# Train candidate model with enhanced features
candidate_success, candidate_model, candidate_metrics = self.train_candidate_model(df)
if not candidate_success:
return False, f"Enhanced candidate training failed: {candidate_metrics.get('error', 'Unknown error')}"
# Prepare data for model comparison
X = df['text'].values
y = df['label'].values
# Comprehensive model comparison with enhanced CV
comparison_results = self.compare_models_with_enhanced_cv_validation(
prod_model, candidate_model, X, y
)
# Log results with enhanced information
session_results = {
'candidate_metrics': candidate_metrics,
'comparison_results': comparison_results,
'data_size': len(df),
'cv_folds': self.cv_folds,
'retraining_method': 'enhanced_cv_features',
'feature_engineering': {
'production_type': prod_feature_type,
'candidate_type': candidate_feature_type,
'feature_upgrade': comparison_results.get('feature_engineering_comparison', {})
}
}
self.log_retraining_session(session_results)
# Enhanced decision based on CV comparison
promotion_decision = comparison_results.get('promotion_decision', {})
should_promote = promotion_decision.get('promote_candidate', False)
if should_promote:
# Promote candidate model
promotion_success = self.promote_candidate_model(
candidate_model, candidate_metrics, comparison_results
)
if promotion_success:
# Extract improvement information
f1_comp = comparison_results.get('metric_comparisons', {}).get('f1', {})
improvement = f1_comp.get('improvement', 0)
confidence = promotion_decision.get('confidence', 0)
feature_upgrade = promotion_decision.get('feature_engineering_factor', False)
feature_info = ""
if feature_upgrade:
feature_info = " with enhanced feature engineering upgrade"
elif candidate_feature_type == 'enhanced':
feature_info = " using enhanced features"
success_msg = (
f"Enhanced model promoted successfully{feature_info}! "
f"F1 improvement: {improvement:.4f}, "
f"Confidence: {confidence:.2f}, "
f"Reason: {promotion_decision.get('reason', 'Enhanced CV validation passed')}"
)
logger.info(success_msg)
return True, success_msg
else:
return False, "Enhanced model promotion failed"
else:
# Keep current model
reason = promotion_decision.get('reason', 'No significant improvement detected')
confidence = promotion_decision.get('confidence', 0)
keep_msg = (
f"Keeping current model based on enhanced CV analysis. "
f"Reason: {reason}, "
f"Confidence: {confidence:.2f}"
)
logger.info(keep_msg)
return True, keep_msg
except Exception as e:
error_msg = f"Enhanced model retraining failed: {str(e)}"
logger.error(error_msg)
return False, error_msg
def automated_retrain_with_validation(self) -> Tuple[bool, str]:
"""Automated retraining with enhanced validation and feature engineering"""
try:
logger.info("Starting automated enhanced retraining with validation...")
# Use the main enhanced retraining method
success, message = self.retrain_model()
if success:
logger.info("Automated enhanced retraining completed successfully")
return True, f"Enhanced automated retraining: {message}"
else:
logger.error(f"Automated enhanced retraining failed: {message}")
return False, f"Enhanced automated retraining failed: {message}"
except Exception as e:
logger.error(f"Automated enhanced retraining failed: {e}")
return False, f"Automated enhanced retraining failed: {str(e)}"
class AutomatedRetrainingManager:
"""Manages automated retraining triggers and scheduling with enhanced features"""
def __init__(self, base_dir: Path = None):
self.base_dir = base_dir or Path("/tmp")
self.setup_automation_paths()
self.setup_automation_config()
self.drift_monitor = AdvancedDriftMonitor()
self.retraining_active = False
self.automation_thread = None
self.last_check_time = None
# Enhanced feature settings
self.enhanced_features_available = ENHANCED_FEATURES_AVAILABLE
self.automation_status = {
'enabled': True,
'last_automated_training': None,
'total_automated_trainings': 0,
'failed_attempts': 0,
'enhanced_features_used': self.enhanced_features_available
}
logger.info(f"Automated retraining manager initialized with enhanced features: {self.enhanced_features_available}")
def setup_automation_paths(self):
"""Setup automation-specific paths"""
self.automation_dir = self.base_dir / "automation"
self.automation_dir.mkdir(parents=True, exist_ok=True)
self.automation_log_path = self.automation_dir / "automation_log.json"
self.retraining_queue_path = self.automation_dir / "retraining_queue.json"
self.automation_config_path = self.automation_dir / "automation_config.json"
def setup_automation_config(self):
"""Setup automation configuration with enhanced feature considerations"""
self.automation_config = {
'monitoring_schedule': {
'check_interval_minutes': 360, # 6 hours
'force_check_interval_hours': 24,
'max_daily_retrainings': 3,
'cooldown_hours_after_training': 6
},
'retraining_conditions': {
'require_data_quality_check': True,
'min_time_between_trainings': timedelta(hours=6),
'max_consecutive_failures': 3,
'emergency_override': True,
'prefer_enhanced_features': True # New setting
},
'notification_settings': {
'notify_on_trigger': True,
'notify_on_completion': True,
'notify_on_failure': True,
'notify_on_feature_upgrade': True # New setting
}
}
self.load_automation_config()
def load_automation_config(self):
"""Load automation configuration from file"""
try:
if self.automation_config_path.exists():
with open(self.automation_config_path, 'r') as f:
saved_config = json.load(f)
# Update with saved settings
self.automation_config.update(saved_config)
logger.info("Loaded enhanced automation configuration")
except Exception as e:
logger.warning(f"Failed to load automation config: {e}")
def save_automation_config(self):
"""Save automation configuration to file"""
try:
with open(self.automation_config_path, 'w') as f:
# Convert timedelta objects to strings for JSON serialization
config_to_save = json.loads(json.dumps(self.automation_config, default=str))
json.dump(config_to_save, f, indent=2)
except Exception as e:
logger.error(f"Failed to save automation config: {e}")
def start_automated_monitoring(self):
"""Start the automated monitoring and retraining system with enhanced features"""
if self.retraining_active:
logger.warning("Automated monitoring already active")
return
self.retraining_active = True
# Schedule periodic checks
check_interval = self.automation_config['monitoring_schedule']['check_interval_minutes']
schedule.every(check_interval).minutes.do(self.perform_scheduled_check)
# Schedule daily forced check
schedule.every().day.at("02:00").do(self.perform_forced_check)
# Start background thread
self.automation_thread = threading.Thread(target=self.automation_loop, daemon=True)
self.automation_thread.start()
logger.info("Enhanced automated retraining monitoring started")
self.log_automation_event("monitoring_started", "Enhanced automated monitoring system started")
def stop_automated_monitoring(self):
"""Stop the automated monitoring system"""
self.retraining_active = False
schedule.clear()
if self.automation_thread:
self.automation_thread.join(timeout=10)
logger.info("Enhanced automated retraining monitoring stopped")
self.log_automation_event("monitoring_stopped", "Enhanced automated monitoring system stopped")
def automation_loop(self):
"""Main automation loop"""
while self.retraining_active:
try:
schedule.run_pending()
time_module.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Enhanced automation loop error: {e}")
time_module.sleep(300) # Wait 5 minutes on error
def perform_scheduled_check(self):
"""Perform scheduled retraining trigger check"""
try:
logger.info("Performing scheduled enhanced retraining trigger check")
# Check if we're in cooldown period
if self.is_in_cooldown_period():
logger.info("Skipping check - in cooldown period after recent training")
return
# Check daily limit
if self.exceeded_daily_retraining_limit():
logger.info("Skipping check - daily retraining limit exceeded")
return
# Perform trigger evaluation
trigger_results = self.drift_monitor.check_retraining_triggers()
self.last_check_time = datetime.now()
# Log the check
self.log_automation_event("scheduled_check", "Performed scheduled enhanced trigger check", {
'trigger_results': trigger_results,
'should_retrain': trigger_results.get('should_retrain', False),
'enhanced_features_available': self.enhanced_features_available
})
# Trigger retraining if needed
if trigger_results.get('should_retrain', False):
self.queue_retraining(trigger_results)
except Exception as e:
logger.error(f"Scheduled enhanced check failed: {e}")
self.log_automation_event("check_failed", f"Scheduled enhanced check failed: {str(e)}")
def perform_forced_check(self):
"""Perform forced daily check regardless of other conditions"""
try:
logger.info("Performing forced daily enhanced retraining check")
# Always perform drift monitoring
trigger_results = self.drift_monitor.check_retraining_triggers()
self.log_automation_event("forced_check", "Performed forced daily enhanced check", {
'trigger_results': trigger_results,
'enhanced_features_available': self.enhanced_features_available
})
# Only trigger retraining for urgent cases during forced check
if trigger_results.get('urgency') in ['critical', 'high']:
self.queue_retraining(trigger_results, forced=True)
except Exception as e:
logger.error(f"Forced enhanced check failed: {e}")
self.log_automation_event("forced_check_failed", f"Forced enhanced check failed: {str(e)}")
def queue_retraining(self, trigger_results: Dict, forced: bool = False):
"""Queue a retraining job with enhanced feature considerations"""
try:
retraining_job = {
'queued_at': datetime.now().isoformat(),
'trigger_results': trigger_results,
'urgency': trigger_results.get('urgency', 'medium'),
'forced': forced,
'status': 'queued',
'attempts': 0,
'enhanced_features_available': self.enhanced_features_available,
'prefer_enhanced_features': self.automation_config['retraining_conditions'].get('prefer_enhanced_features', True)
}
# Load existing queue
queue = self.load_retraining_queue()
queue.append(retraining_job)
# Save queue
self.save_retraining_queue(queue)
logger.info(f"Enhanced retraining queued with urgency: {trigger_results.get('urgency', 'medium')}")
self.log_automation_event("retraining_queued", "Enhanced retraining job queued", retraining_job)
# Execute immediately for critical cases
if trigger_results.get('urgency') == 'critical' or forced:
self.execute_queued_retraining()
except Exception as e:
logger.error(f"Failed to queue enhanced retraining: {e}")
self.log_automation_event("queue_failed", f"Failed to queue enhanced retraining: {str(e)}")
def execute_queued_retraining(self):
"""Execute queued retraining jobs with enhanced features"""
try:
queue = self.load_retraining_queue()
# Sort by urgency (critical > high > medium > low)
urgency_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
queue.sort(key=lambda x: urgency_order.get(x.get('urgency', 'medium'), 2))
executed_jobs = []
for job in queue:
if job['status'] == 'queued':
success = self.execute_single_retraining(job)
if success:
job['status'] = 'completed'
job['completed_at'] = datetime.now().isoformat()
self.automation_status['last_automated_training'] = datetime.now().isoformat()
self.automation_status['total_automated_trainings'] += 1
executed_jobs.append(job)
break # Only execute one job at a time
else:
job['attempts'] += 1
if job['attempts'] >= 3:
job['status'] = 'failed'
job['failed_at'] = datetime.now().isoformat()
self.automation_status['failed_attempts'] += 1
# Update queue
remaining_queue = [job for job in queue if job['status'] == 'queued']
self.save_retraining_queue(remaining_queue)
return len(executed_jobs) > 0
except Exception as e:
logger.error(f"Failed to execute queued enhanced retraining: {e}")
return False
def execute_single_retraining(self, job: Dict) -> bool:
"""Execute a single retraining job with enhanced features"""
try:
logger.info(f"Starting automated enhanced retraining with urgency: {job.get('urgency', 'medium')}")
job['started_at'] = datetime.now().isoformat()
job['status'] = 'running'
# Create enhanced retraining manager
retrainer = EnhancedModelRetrainer()
# Configure enhanced features based on job preferences
prefer_enhanced = job.get('prefer_enhanced_features', True)
retrainer.use_enhanced_features = prefer_enhanced and ENHANCED_FEATURES_AVAILABLE
# Perform enhanced retraining with validation
success, result = retrainer.automated_retrain_with_validation()
if success:
logger.info("Automated enhanced retraining completed successfully")
self.log_automation_event("retraining_success", "Automated enhanced retraining completed", {
'job': job,
'result': result,
'enhanced_features_used': retrainer.use_enhanced_features
})
return True
else:
logger.error(f"Automated enhanced retraining failed: {result}")
self.log_automation_event("retraining_failed", f"Automated enhanced retraining failed: {result}", {
'job': job,
'enhanced_features_used': retrainer.use_enhanced_features
})
return False
except Exception as e:
logger.error(f"Enhanced retraining execution failed: {e}")
self.log_automation_event("retraining_error", f"Enhanced retraining execution error: {str(e)}", {'job': job})
return False
def is_in_cooldown_period(self) -> bool:
"""Check if we're in cooldown period after recent training"""
try:
if not self.automation_status['last_automated_training']:
return False
last_training = datetime.fromisoformat(self.automation_status['last_automated_training'])
cooldown_hours = self.automation_config['retraining_conditions']['cooldown_hours_after_training']
cooldown_period = timedelta(hours=cooldown_hours)
return datetime.now() - last_training < cooldown_period
except Exception as e:
logger.warning(f"Failed to check cooldown period: {e}")
return False
def exceeded_daily_retraining_limit(self) -> bool:
"""Check if daily retraining limit has been exceeded"""
try:
if not self.automation_status['last_automated_training']:
return False
last_training = datetime.fromisoformat(self.automation_status['last_automated_training'])
# Count trainings in last 24 hours
training_logs = self.get_recent_automation_logs(hours=24)
training_count = len([log for log in training_logs if log.get('event') == 'retraining_success'])
max_daily = self.automation_config['monitoring_schedule']['max_daily_retrainings']
return training_count >= max_daily
except Exception as e:
logger.warning(f"Failed to check daily limit: {e}")
return False
def load_retraining_queue(self) -> List[Dict]:
"""Load retraining queue from file"""
try:
if self.retraining_queue_path.exists():
with open(self.retraining_queue_path, 'r') as f:
return json.load(f)
return []
except Exception as e:
logger.error(f"Failed to load retraining queue: {e}")
return []
def save_retraining_queue(self, queue: List[Dict]):
"""Save retraining queue to file"""
try:
with open(self.retraining_queue_path, 'w') as f:
json.dump(queue, f, indent=2)
except Exception as e:
logger.error(f"Failed to save retraining queue: {e}")
def log_automation_event(self, event: str, message: str, details: Dict = None):
"""Log automation events with enhanced feature information"""
try:
log_entry = {
'timestamp': datetime.now().isoformat(),
'event': event,
'message': message,
'details': details or {},
'enhanced_features_available': self.enhanced_features_available
}
# Load existing logs
logs = []
if self.automation_log_path.exists():
try:
with open(self.automation_log_path, 'r') as f:
logs = json.load(f)
except:
logs = []
logs.append(log_entry)
# Keep only last 1000 entries
if len(logs) > 1000:
logs = logs[-1000:]
# Save logs
with open(self.automation_log_path, 'w') as f:
json.dump(logs, f, indent=2)
except Exception as e:
logger.error(f"Failed to log automation event: {e}")
def get_recent_automation_logs(self, hours: int = 24) -> List[Dict]:
"""Get recent automation logs"""
try:
if not self.automation_log_path.exists():
return []
with open(self.automation_log_path, 'r') as f:
logs = json.load(f)
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_logs = [
log for log in logs
if datetime.fromisoformat(log['timestamp']) > cutoff_time
]
return recent_logs
except Exception as e:
logger.error(f"Failed to get recent logs: {e}")
return []
def get_automation_status(self) -> Dict:
"""Get current automation status with enhanced feature information"""
try:
status = {
**self.automation_status,
'monitoring_active': self.retraining_active,
'last_check_time': self.last_check_time.isoformat() if self.last_check_time else None,
'in_cooldown': self.is_in_cooldown_period(),
'daily_limit_exceeded': self.exceeded_daily_retraining_limit(),
'queued_jobs': len(self.load_retraining_queue()),
'recent_logs': self.get_recent_automation_logs(hours=6),
'enhanced_features_status': {
'available': self.enhanced_features_available,
'preference': self.automation_config['retraining_conditions'].get('prefer_enhanced_features', True)
}
}
return status
except Exception as e:
logger.error(f"Failed to get automation status: {e}")
return {'error': str(e)}
def trigger_manual_retraining(self, reason: str = "manual_trigger", use_enhanced: bool = None) -> Dict:
"""Manually trigger retraining with enhanced feature options"""
try:
# Use enhanced features by default if available
if use_enhanced is None:
use_enhanced = self.enhanced_features_available
# Create manual trigger results
trigger_results = {
'should_retrain': True,
'urgency': 'high',
'trigger_reason': reason,
'triggers_detected': [{
'type': 'manual_trigger',
'severity': 'high',
'message': f'Manual enhanced retraining triggered: {reason}'
}],
'recommendations': ['Manual enhanced retraining requested'],
'enhanced_features_requested': use_enhanced
}
# Queue the retraining
self.queue_retraining(trigger_results, forced=True)
feature_info = " with enhanced features" if use_enhanced else " with standard features"
return {
'success': True,
'message': f'Manual enhanced retraining queued{feature_info}',
'enhanced_features': use_enhanced
}
except Exception as e:
logger.error(f"Manual enhanced retraining trigger failed: {e}")
return {'success': False, 'error': str(e)}
def start_automation_system():
"""Start the enhanced automated retraining system"""
try:
automation_manager = AutomatedRetrainingManager()
automation_manager.start_automated_monitoring()
return automation_manager
except Exception as e:
logger.error(f"Failed to start enhanced automation system: {e}")
return None
def get_automation_manager() -> Optional[AutomatedRetrainingManager]:
"""Get or create enhanced automation manager instance"""
global _automation_manager
if '_automation_manager' not in globals():
_automation_manager = AutomatedRetrainingManager()
return _automation_manager
def main():
"""Main execution function with enhanced CV and feature engineering"""
retrainer = EnhancedModelRetrainer()
success, message = retrainer.retrain_model()
if success:
print(f"✅ {message}")
else:
print(f"❌ {message}")
exit(1)
if __name__ == "__main__":
main()