| |
| """ |
| SOFIA Self-Improving Learning System |
| Automatically monitors performance and improves embeddings through continuous learning |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| from torch.utils.data import DataLoader, TensorDataset |
| import numpy as np |
| import json |
| import os |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Tuple, Optional, Any |
| import logging |
| from collections import deque |
| import threading |
| import time |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class PerformanceMonitor: |
| """ |
| Monitors SOFIA's performance across different tasks and metrics |
| """ |
|
|
| def __init__(self, history_size: int = 1000): |
| self.history_size = history_size |
| self.performance_history = deque(maxlen=history_size) |
| self.current_metrics = {} |
| self.baseline_metrics = { |
| 'mteb_score': 58.2, |
| 'similarity_accuracy': 0.85, |
| 'retrieval_precision': 0.75, |
| 'response_time': 0.5 |
| } |
|
|
| def record_performance(self, task: str, metrics: Dict[str, float], timestamp: Optional[datetime] = None): |
| """Record performance metrics for a specific task""" |
| if timestamp is None: |
| timestamp = datetime.now() |
|
|
| record = { |
| 'timestamp': timestamp.isoformat(), |
| 'task': task, |
| 'metrics': metrics, |
| 'improvement': self._calculate_improvement(metrics) |
| } |
|
|
| self.performance_history.append(record) |
| self.current_metrics[task] = metrics |
|
|
| logger.info(f"Recorded performance for {task}: {metrics}") |
|
|
| def _calculate_improvement(self, metrics: Dict[str, float]) -> Dict[str, float]: |
| """Calculate improvement over baseline metrics""" |
| improvements = {} |
| for metric, value in metrics.items(): |
| if metric in self.baseline_metrics: |
| baseline = self.baseline_metrics[metric] |
| if metric in ['response_time']: |
| improvements[metric] = (baseline - value) / baseline |
| else: |
| improvements[metric] = (value - baseline) / baseline |
| else: |
| improvements[metric] = 0.0 |
|
|
| return improvements |
|
|
| def get_recent_performance(self, hours: int = 24) -> List[Dict]: |
| """Get performance records from the last N hours""" |
| cutoff = datetime.now() - timedelta(hours=hours) |
| recent = [] |
|
|
| for record in self.performance_history: |
| record_time = datetime.fromisoformat(record['timestamp']) |
| if record_time >= cutoff: |
| recent.append(record) |
|
|
| return recent |
|
|
| def get_performance_trend(self, metric: str, hours: int = 24) -> Dict[str, Any]: |
| """Analyze performance trend for a specific metric""" |
| recent_records = self.get_recent_performance(hours) |
|
|
| if not recent_records: |
| return {'trend': 'insufficient_data', 'change': 0.0} |
|
|
| values = [] |
| timestamps = [] |
|
|
| for record in recent_records: |
| if metric in record['metrics']: |
| values.append(record['metrics'][metric]) |
| timestamps.append(datetime.fromisoformat(record['timestamp'])) |
|
|
| if len(values) < 2: |
| return {'trend': 'insufficient_data', 'change': 0.0} |
|
|
| |
| start_value = values[0] |
| end_value = values[-1] |
| change = (end_value - start_value) / start_value if start_value != 0 else 0 |
|
|
| if metric in ['response_time']: |
| trend = 'improving' if change < -0.05 else 'stable' if abs(change) < 0.05 else 'degrading' |
| else: |
| trend = 'improving' if change > 0.05 else 'stable' if abs(change) < 0.05 else 'degrading' |
|
|
| return { |
| 'trend': trend, |
| 'change': change, |
| 'start_value': start_value, |
| 'end_value': end_value, |
| 'data_points': len(values) |
| } |
|
|
| def should_trigger_improvement(self, task: str) -> bool: |
| """Determine if performance improvement should be triggered""" |
| trend = self.get_performance_trend('mteb_score' if task == 'similarity' else 'similarity_accuracy') |
|
|
| |
| return trend['trend'] == 'degrading' or ( |
| trend['trend'] == 'stable' and trend['data_points'] > 10 |
| ) |
|
|
| class AdaptiveParameterTuner: |
| """ |
| Automatically tunes model parameters based on performance feedback |
| """ |
|
|
| def __init__(self, model, learning_rate_range: Tuple[float, float] = (1e-6, 1e-3), |
| batch_size_range: Tuple[int, int] = (8, 64)): |
| self.model = model |
| self.lr_range = learning_rate_range |
| self.batch_size_range = batch_size_range |
|
|
| |
| self.parameter_history = [] |
| self.best_parameters = { |
| 'learning_rate': 2e-5, |
| 'batch_size': 32, |
| 'performance': 0.0 |
| } |
|
|
| def tune_parameters(self, performance_metrics: Dict[str, float]) -> Dict[str, Any]: |
| """Tune parameters based on current performance""" |
| current_performance = performance_metrics.get('mteb_score', 0.0) |
|
|
| |
| new_params = {} |
|
|
| if current_performance > self.best_parameters['performance']: |
| |
| new_params['learning_rate'] = min( |
| self.best_parameters['learning_rate'] * 1.2, |
| self.lr_range[1] |
| ) |
| new_params['batch_size'] = min( |
| self.best_parameters['batch_size'] + 4, |
| self.batch_size_range[1] |
| ) |
| else: |
| |
| new_params['learning_rate'] = max( |
| self.best_parameters['learning_rate'] * 0.8, |
| self.lr_range[0] |
| ) |
| new_params['batch_size'] = max( |
| self.best_parameters['batch_size'] - 4, |
| self.batch_size_range[0] |
| ) |
|
|
| |
| if current_performance > self.best_parameters['performance']: |
| self.best_parameters.update({ |
| 'learning_rate': new_params['learning_rate'], |
| 'batch_size': new_params['batch_size'], |
| 'performance': current_performance |
| }) |
|
|
| |
| self.parameter_history.append({ |
| 'timestamp': datetime.now().isoformat(), |
| 'parameters': new_params.copy(), |
| 'performance': current_performance |
| }) |
|
|
| return new_params |
|
|
| class ContinuousLearner: |
| """ |
| Implements continuous learning capabilities for SOFIA |
| """ |
|
|
| def __init__(self, model, performance_monitor: PerformanceMonitor): |
| self.model = model |
| self.monitor = performance_monitor |
| self.adaptive_tuner = AdaptiveParameterTuner(model) |
|
|
| |
| self.is_learning = False |
| self.learning_thread = None |
| self.new_data_buffer = deque(maxlen=1000) |
|
|
| |
| self.learning_interval = 3600 |
| self.min_data_points = 50 |
|
|
| def add_training_data(self, text_pairs: List[Tuple[str, str]], labels: Optional[List[float]] = None): |
| """Add new training data to the buffer""" |
| if labels is None: |
| labels = [1.0] * len(text_pairs) |
|
|
| for (text1, text2), label in zip(text_pairs, labels): |
| self.new_data_buffer.append({ |
| 'text1': text1, |
| 'text2': text2, |
| 'label': label, |
| 'timestamp': datetime.now().isoformat() |
| }) |
|
|
| logger.info(f"Added {len(text_pairs)} new training examples") |
|
|
| def start_continuous_learning(self): |
| """Start the continuous learning process""" |
| if self.is_learning: |
| logger.warning("Continuous learning already running") |
| return |
|
|
| self.is_learning = True |
| self.learning_thread = threading.Thread(target=self._continuous_learning_loop) |
| self.learning_thread.daemon = True |
| self.learning_thread.start() |
|
|
| logger.info("Continuous learning started") |
|
|
| def stop_continuous_learning(self): |
| """Stop the continuous learning process""" |
| self.is_learning = False |
| if self.learning_thread: |
| self.learning_thread.join(timeout=5) |
| logger.info("Continuous learning stopped") |
|
|
| def _continuous_learning_loop(self): |
| """Main continuous learning loop""" |
| while self.is_learning: |
| try: |
| |
| if self._should_trigger_learning(): |
| self._perform_learning_update() |
|
|
| |
| time.sleep(self.learning_interval) |
|
|
| except Exception as e: |
| logger.error(f"Error in continuous learning loop: {e}") |
| time.sleep(60) |
|
|
| def _should_trigger_learning(self) -> bool: |
| """Determine if learning update should be triggered""" |
| |
| if len(self.new_data_buffer) < self.min_data_points: |
| return False |
|
|
| |
| return self.monitor.should_trigger_improvement('similarity') |
|
|
| def _perform_learning_update(self): |
| """Perform a learning update with new data""" |
| logger.info("Performing learning update...") |
|
|
| try: |
| |
| training_data = list(self.new_data_buffer) |
| self.new_data_buffer.clear() |
|
|
| |
| recent_perf = self.monitor.get_recent_performance(1) |
| if recent_perf: |
| latest_metrics = recent_perf[-1]['metrics'] |
| new_params = self.adaptive_tuner.tune_parameters(latest_metrics) |
| logger.info(f"Adapted parameters: {new_params}") |
|
|
| |
| self._fine_tune_on_new_data(training_data) |
|
|
| |
| self.monitor.record_performance( |
| 'continuous_learning', |
| {'data_points_used': len(training_data)}, |
| datetime.now() |
| ) |
|
|
| logger.info("Learning update completed successfully") |
|
|
| except Exception as e: |
| logger.error(f"Learning update failed: {e}") |
|
|
| def _fine_tune_on_new_data(self, training_data: List[Dict]): |
| """Fine-tune the model on new data (simplified implementation)""" |
| |
| |
| |
| |
| |
| |
|
|
| logger.info(f"Fine-tuning on {len(training_data)} examples") |
|
|
| |
| time.sleep(2) |
|
|
| |
| |
| |
| |
| |
| |
|
|
| logger.info("Fine-tuning simulation completed") |
|
|
| class SelfImprovingSOFIA: |
| """ |
| Main interface for SOFIA's self-improving capabilities |
| """ |
|
|
| def __init__(self, model): |
| self.model = model |
| self.performance_monitor = PerformanceMonitor() |
| self.continuous_learner = ContinuousLearner(model, self.performance_monitor) |
|
|
| |
| self.state_file = "sofia_self_improvement_state.json" |
| self.load_state() |
|
|
| def start_self_improvement(self): |
| """Start all self-improvement processes""" |
| self.continuous_learner.start_continuous_learning() |
| logger.info("SOFIA self-improvement system activated") |
|
|
| def stop_self_improvement(self): |
| """Stop all self-improvement processes""" |
| self.continuous_learner.stop_continuous_learning() |
| self.save_state() |
| logger.info("SOFIA self-improvement system deactivated") |
|
|
| def record_task_performance(self, task: str, metrics: Dict[str, float]): |
| """Record performance metrics for monitoring""" |
| self.performance_monitor.record_performance(task, metrics) |
|
|
| def add_feedback_data(self, text_pairs: List[Tuple[str, str]], quality_scores: List[float]): |
| """Add user feedback data for continuous learning""" |
| self.continuous_learner.add_training_data(text_pairs, quality_scores) |
|
|
| def get_system_status(self) -> Dict[str, Any]: |
| """Get current status of the self-improvement system""" |
| return { |
| 'is_learning': self.continuous_learner.is_learning, |
| 'buffered_data': len(self.continuous_learner.new_data_buffer), |
| 'recent_performance': self.performance_monitor.get_recent_performance(1), |
| 'performance_trends': { |
| 'mteb_score': self.performance_monitor.get_performance_trend('mteb_score'), |
| 'similarity_accuracy': self.performance_monitor.get_performance_trend('similarity_accuracy') |
| } |
| } |
|
|
| def save_state(self): |
| """Save the current state of the self-improvement system""" |
| state = { |
| 'performance_history': list(self.performance_monitor.performance_history), |
| 'parameter_history': self.continuous_learner.adaptive_tuner.parameter_history, |
| 'best_parameters': self.continuous_learner.adaptive_tuner.best_parameters, |
| 'saved_at': datetime.now().isoformat() |
| } |
|
|
| with open(self.state_file, 'w') as f: |
| json.dump(state, f, indent=2) |
|
|
| logger.info(f"Self-improvement state saved to {self.state_file}") |
|
|
| def load_state(self): |
| """Load the saved state of the self-improvement system""" |
| if os.path.exists(self.state_file): |
| try: |
| with open(self.state_file, 'r') as f: |
| state = json.load(f) |
|
|
| |
| self.performance_monitor.performance_history.extend(state.get('performance_history', [])) |
|
|
| |
| self.continuous_learner.adaptive_tuner.parameter_history = state.get('parameter_history', []) |
| self.continuous_learner.adaptive_tuner.best_parameters = state.get('best_parameters', self.continuous_learner.adaptive_tuner.best_parameters) |
|
|
| logger.info(f"Self-improvement state loaded from {self.state_file}") |
|
|
| except Exception as e: |
| logger.error(f"Failed to load state: {e}") |
|
|
|
|
| |
| if __name__ == "__main__": |
| |
| print("SOFIA Self-Improving System") |
| print("This module provides continuous learning and performance monitoring capabilities") |
|
|
| |
| """ |
| from sofia_model import SOFIAModel |
| from sofia_self_improving import SelfImprovingSOFIA |
| |
| # Initialize SOFIA |
| sofia_model = SOFIAModel() |
| |
| # Add self-improvement capabilities |
| self_improving_sofia = SelfImprovingSOFIA(sofia_model) |
| |
| # Start self-improvement |
| self_improving_sofia.start_self_improvement() |
| |
| # Record performance after tasks |
| self_improving_sofia.record_task_performance('similarity', { |
| 'mteb_score': 65.1, |
| 'similarity_accuracy': 0.92 |
| }) |
| |
| # Add user feedback for continuous learning |
| feedback_pairs = [("hello world", "hi there"), ("machine learning", "AI algorithms")] |
| quality_scores = [0.9, 0.8] |
| self_improving_sofia.add_feedback_data(feedback_pairs, quality_scores) |
| |
| # Get system status |
| status = self_improving_sofia.get_system_status() |
| print("System status:", status) |
| """ |
|
|