| """ |
| AETHER Safety Sandbox. |
| Constrained self-modification with validation, audit logging, |
| and human-in-the-loop oversight. |
| """ |
|
|
| import hashlib |
| import time |
| import logging |
| from typing import Dict, List, Any, Optional, Callable |
| from contextlib import contextmanager |
| from dataclasses import fields |
|
|
| logger = logging.getLogger("AETHER.Safety") |
|
|
|
|
| class SafetySandbox: |
| """ |
| Sandboxed evaluation environment for self-modification. |
| Inspired by AlphaEvolve's validation and GEA's admission requirements. |
| """ |
| |
| def __init__(self, timeout: float = 30.0, |
| max_code_size: int = 100000, |
| forbidden_modules: List[str] = None): |
| self.timeout = timeout |
| self.max_code_size = max_code_size |
| self.forbidden_modules = forbidden_modules or [ |
| "os.system", "subprocess", "socket", "eval", "exec", |
| "compile", "__import__", "importlib.import_module", |
| ] |
| |
| self.audit_log: List[Dict[str, Any]] = [] |
| self.modification_history: List[Dict[str, Any]] = [] |
| self.pending_approvals: List[Dict[str, Any]] = [] |
| |
| @contextmanager |
| def sandbox(self): |
| start_time = time.time() |
| context = { |
| "start_time": start_time, |
| "modifications_attempted": [], |
| "modifications_approved": [], |
| "errors": [], |
| } |
| |
| try: |
| yield context |
| except Exception as e: |
| context["errors"].append(str(e)) |
| logger.warning(f"Sandbox caught exception: {e}") |
| raise |
| finally: |
| elapsed = time.time() - start_time |
| context["elapsed_time"] = elapsed |
| |
| if elapsed > self.timeout: |
| logger.warning(f"Sandbox timeout: {elapsed:.2f}s > {self.timeout}s") |
| |
| self.audit_log.append(context) |
| |
| def validate_architecture(self, config) -> bool: |
| checks = { |
| "population_size": (2, 64), |
| "mutation_rate": (0.0, 0.5), |
| "learning_rate": (1e-6, 1e-3), |
| "num_agents": (1, 32), |
| "macro_policy_dim": (32, 1024), |
| "micro_policy_dim": (16, 512), |
| } |
| |
| violations = [] |
| for field_name, (min_val, max_val) in checks.items(): |
| val = getattr(config, field_name, None) |
| if val is not None and not (min_val <= val <= max_val): |
| violations.append(f"{field_name}={val} outside [{min_val}, {max_val}]") |
| |
| if hasattr(config, 'micro_policy_dim') and hasattr(config, 'macro_policy_dim'): |
| if config.micro_policy_dim > config.macro_policy_dim: |
| violations.append("micro_policy_dim > macro_policy_dim") |
| |
| estimated_memory = (config.macro_policy_dim * config.micro_policy_dim * |
| config.num_agents * 4) / 1e6 |
| if estimated_memory > 10000: |
| violations.append(f"Estimated memory {estimated_memory:.1f}MB exceeds limit") |
| |
| if violations: |
| logger.warning(f"Architecture validation failed: {violations}") |
| self._log_modification(config, approved=False, reason="; ".join(violations)) |
| return False |
| |
| self._log_modification(config, approved=True) |
| return True |
| |
| def validate_code(self, code: str) -> bool: |
| if len(code) > self.max_code_size: |
| logger.warning(f"Code size {len(code)} exceeds limit {self.max_code_size}") |
| return False |
| |
| for forbidden in self.forbidden_modules: |
| if forbidden in code: |
| logger.warning(f"Forbidden pattern '{forbidden}' found in code") |
| return False |
| |
| return True |
| |
| def _log_modification(self, config, approved: bool, reason: str = ""): |
| entry = { |
| "timestamp": time.time(), |
| "approved": approved, |
| "config_hash": hashlib.sha256(str(config.__dict__).encode()).hexdigest()[:16], |
| "reason": reason if not approved else "passed all checks", |
| } |
| self.modification_history.append(entry) |
| |
| def request_human_approval(self, modification: Dict[str, Any]) -> bool: |
| self.pending_approvals.append({ |
| "timestamp": time.time(), |
| "modification": modification, |
| "auto_decision": False, |
| }) |
| |
| if modification.get("mutation_rate", 0) > 0.3: |
| logger.info("Auto-rejected high mutation rate modification") |
| return False |
| |
| if modification.get("num_agents", 0) > 10: |
| logger.info("Auto-rejected large agent pool modification") |
| return False |
| |
| logger.info("Auto-approved conservative modification") |
| return True |
| |
| def get_audit_summary(self) -> Dict[str, Any]: |
| total = len(self.modification_history) |
| approved = sum(1 for m in self.modification_history if m["approved"]) |
| |
| return { |
| "total_modifications_attempted": total, |
| "approved": approved, |
| "rejected": total - approved, |
| "pending_human_approval": len(self.pending_approvals), |
| "recent_modifications": self.modification_history[-10:], |
| "audit_log_entries": len(self.audit_log), |
| } |
|
|