aether-core / aether /safety.py
camdog920's picture
Upload aether/safety.py
315c661 verified
"""
AETHER Safety Sandbox.
Constrained self-modification with validation, audit logging,
and human-in-the-loop oversight.
"""
import hashlib
import time
import logging
from typing import Dict, List, Any, Optional, Callable
from contextlib import contextmanager
from dataclasses import fields
logger = logging.getLogger("AETHER.Safety")
class SafetySandbox:
"""
Sandboxed evaluation environment for self-modification.
Inspired by AlphaEvolve's validation and GEA's admission requirements.
"""
def __init__(self, timeout: float = 30.0,
max_code_size: int = 100000,
forbidden_modules: List[str] = None):
self.timeout = timeout
self.max_code_size = max_code_size
self.forbidden_modules = forbidden_modules or [
"os.system", "subprocess", "socket", "eval", "exec",
"compile", "__import__", "importlib.import_module",
]
self.audit_log: List[Dict[str, Any]] = []
self.modification_history: List[Dict[str, Any]] = []
self.pending_approvals: List[Dict[str, Any]] = []
@contextmanager
def sandbox(self):
start_time = time.time()
context = {
"start_time": start_time,
"modifications_attempted": [],
"modifications_approved": [],
"errors": [],
}
try:
yield context
except Exception as e:
context["errors"].append(str(e))
logger.warning(f"Sandbox caught exception: {e}")
raise
finally:
elapsed = time.time() - start_time
context["elapsed_time"] = elapsed
if elapsed > self.timeout:
logger.warning(f"Sandbox timeout: {elapsed:.2f}s > {self.timeout}s")
self.audit_log.append(context)
def validate_architecture(self, config) -> bool:
checks = {
"population_size": (2, 64),
"mutation_rate": (0.0, 0.5),
"learning_rate": (1e-6, 1e-3),
"num_agents": (1, 32),
"macro_policy_dim": (32, 1024),
"micro_policy_dim": (16, 512),
}
violations = []
for field_name, (min_val, max_val) in checks.items():
val = getattr(config, field_name, None)
if val is not None and not (min_val <= val <= max_val):
violations.append(f"{field_name}={val} outside [{min_val}, {max_val}]")
if hasattr(config, 'micro_policy_dim') and hasattr(config, 'macro_policy_dim'):
if config.micro_policy_dim > config.macro_policy_dim:
violations.append("micro_policy_dim > macro_policy_dim")
estimated_memory = (config.macro_policy_dim * config.micro_policy_dim *
config.num_agents * 4) / 1e6
if estimated_memory > 10000:
violations.append(f"Estimated memory {estimated_memory:.1f}MB exceeds limit")
if violations:
logger.warning(f"Architecture validation failed: {violations}")
self._log_modification(config, approved=False, reason="; ".join(violations))
return False
self._log_modification(config, approved=True)
return True
def validate_code(self, code: str) -> bool:
if len(code) > self.max_code_size:
logger.warning(f"Code size {len(code)} exceeds limit {self.max_code_size}")
return False
for forbidden in self.forbidden_modules:
if forbidden in code:
logger.warning(f"Forbidden pattern '{forbidden}' found in code")
return False
return True
def _log_modification(self, config, approved: bool, reason: str = ""):
entry = {
"timestamp": time.time(),
"approved": approved,
"config_hash": hashlib.sha256(str(config.__dict__).encode()).hexdigest()[:16],
"reason": reason if not approved else "passed all checks",
}
self.modification_history.append(entry)
def request_human_approval(self, modification: Dict[str, Any]) -> bool:
self.pending_approvals.append({
"timestamp": time.time(),
"modification": modification,
"auto_decision": False,
})
if modification.get("mutation_rate", 0) > 0.3:
logger.info("Auto-rejected high mutation rate modification")
return False
if modification.get("num_agents", 0) > 10:
logger.info("Auto-rejected large agent pool modification")
return False
logger.info("Auto-approved conservative modification")
return True
def get_audit_summary(self) -> Dict[str, Any]:
total = len(self.modification_history)
approved = sum(1 for m in self.modification_history if m["approved"])
return {
"total_modifications_attempted": total,
"approved": approved,
"rejected": total - approved,
"pending_human_approval": len(self.pending_approvals),
"recent_modifications": self.modification_history[-10:],
"audit_log_entries": len(self.audit_log),
}