| |
| """ |
| Autonomous Self-Healing System for Hugging Face Spaces |
| Monitors, diagnoses, fixes errors, and minimizes costs automatically. |
| |
| Usage: |
| python healer.py --daemon # Run continuously |
| python healer.py --once # Single check cycle |
| python healer.py --space <id> # Check specific space |
| python healer.py --report # Generate cost report |
| """ |
|
|
| import os |
| import sys |
| import time |
| import json |
| import argparse |
| import traceback |
| from datetime import datetime, timedelta |
| from dataclasses import dataclass, asdict |
| from typing import List, Dict, Optional, Tuple |
| from collections import defaultdict |
|
|
| from huggingface_hub import HfApi, SpaceHardware, SpaceRuntime |
|
|
|
|
| |
|
|
| HEALER_CONFIG = { |
| "poll_interval_seconds": 60, |
| "max_restarts_per_hour": 5, |
| "oom_downgrade_hw": "cpu-basic", |
| "default_sleep_seconds": 300, |
| "cost_per_hour": { |
| "cpu-basic": 0.0, |
| "cpu-upgrade": 0.03, |
| "t4-small": 0.40, |
| "t4-medium": 0.60, |
| "a10g-small": 1.00, |
| "a10g-large": 1.50, |
| "a10g-largex2": 3.00, |
| "a100-large": 2.50, |
| "l4x1": 0.80, |
| "l40sx1": 1.80, |
| }, |
| "auto_pause_on_error": True, |
| "auto_sleep_idle": True, |
| "idle_sleep_threshold_seconds": 300, |
| "cost_cutting": { |
| "pause_broken_spaces": True, |
| "downgrade_oom_to_cpu": True, |
| "set_auto_sleep_on_paid_hw": True, |
| "pause_during_night_hours": False, |
| "night_hours": {"start": 2, "end": 8}, |
| }, |
| } |
|
|
| |
| MONITORED_SPACES: List[str] = [] |
|
|
| |
| FIX_PLAYBOOK = { |
| "BUILD_ERROR": { |
| "action": "restart", |
| "description": "Build failed β restart to retry", |
| "cost_action": "none", |
| }, |
| "RUNTIME_ERROR": { |
| "action": "restart_then_pause_if_repeated", |
| "description": "App crashed β restart, pause if keeps failing", |
| "cost_action": "pause_after_3_failures", |
| }, |
| "OOM": { |
| "action": "downgrade_and_restart", |
| "description": "Out of memory β downgrade to CPU, restart", |
| "cost_action": "downgrade_to_cpu", |
| }, |
| "PAUSED": { |
| "action": "restart_if_should_be_active", |
| "description": "Space paused β restart if in active hours", |
| "cost_action": "none", |
| }, |
| "SLEEPING": { |
| "action": "restart_on_demand_only", |
| "description": "Space sleeping β let visitors wake it", |
| "cost_action": "none", |
| }, |
| "NO_APP_FILE": { |
| "action": "alert_human", |
| "description": "Missing app file β requires code fix", |
| "cost_action": "pause", |
| }, |
| "HARDWARE_PENDING": { |
| "action": "wait", |
| "description": "Hardware change pending β wait for provisioning", |
| "cost_action": "none", |
| }, |
| } |
|
|
|
|
| |
|
|
| @dataclass |
| class SpaceState: |
| repo_id: str |
| stage: str |
| hardware: Optional[str] |
| requested_hardware: Optional[str] |
| sleep_time: Optional[int] |
| last_checked: str |
| error_message: Optional[str] = None |
| restart_count_1h: int = 0 |
| total_uptime_minutes: float = 0.0 |
| estimated_cost_today: float = 0.0 |
|
|
| @dataclass |
| class HealAction: |
| action: str |
| description: str |
| executed: bool |
| result: str |
| timestamp: str |
|
|
|
|
| |
|
|
| class SpaceHealer: |
| def __init__(self, token: Optional[str] = None): |
| self.api = HfApi(token=token or os.getenv("HF_TOKEN")) |
| self.history: Dict[str, List[Dict]] = defaultdict(list) |
| self.state_cache: Dict[str, SpaceState] = {} |
| self.fix_log: List[Dict] = [] |
|
|
| |
|
|
| def discover_spaces(self, author: str = "ScottzillaSystems") -> List[str]: |
| """Auto-discover all Spaces under a namespace.""" |
| try: |
| import requests |
| resp = requests.get( |
| f"https://huggingface.co/api/spaces?author={author}", |
| headers={"Authorization": f"Bearer {self.api.token}"} |
| ) |
| spaces = resp.json() |
| return [s["id"] for s in spaces] |
| except Exception as e: |
| print(f"[Healer] Discovery failed: {e}") |
| return MONITORED_SPACES |
|
|
| |
|
|
| def check_space(self, repo_id: str) -> Tuple[SpaceState, Optional[HealAction]]: |
| """Check a single space and return state + action taken.""" |
| now = datetime.utcnow().isoformat() |
|
|
| try: |
| runtime = self.api.get_space_runtime(repo_id) |
| except Exception as e: |
| return SpaceState( |
| repo_id=repo_id, stage="UNKNOWN", hardware=None, |
| requested_hardware=None, sleep_time=None, |
| last_checked=now, error_message=str(e) |
| ), None |
|
|
| |
| state = SpaceState( |
| repo_id=repo_id, |
| stage=runtime.stage, |
| hardware=runtime.hardware, |
| requested_hardware=runtime.requested_hardware, |
| sleep_time=runtime.sleep_time, |
| last_checked=now, |
| error_message=getattr(runtime, "errorMessage", None), |
| ) |
|
|
| |
| hw = (runtime.hardware or "cpu-basic").lower() |
| cost_rate = HEALER_CONFIG["cost_per_hour"].get(hw, 0.0) |
| state.estimated_cost_today = cost_rate * 24 |
|
|
| |
| prev = self.state_cache.get(repo_id) |
| if prev and prev.stage != "RUNNING" and runtime.stage == "RUNNING": |
| self.history[repo_id].append({"event": "restart", "time": now}) |
|
|
| |
| cutoff = (datetime.utcnow() - timedelta(hours=1)).isoformat() |
| state.restart_count_1h = sum( |
| 1 for h in self.history[repo_id] |
| if h["event"] == "restart" and h["time"] > cutoff |
| ) |
|
|
| self.state_cache[repo_id] = state |
|
|
| |
| diagnosis = self._diagnose(state) |
| if diagnosis: |
| action = self._heal(repo_id, state, diagnosis) |
| return state, action |
|
|
| return state, None |
|
|
| def _diagnose(self, state: SpaceState) -> Optional[str]: |
| """Classify the problem.""" |
| stage = state.stage |
|
|
| if stage == "BUILD_ERROR": |
| return "BUILD_ERROR" |
| if stage == "RUNTIME_ERROR": |
| |
| if state.error_message and any(k in (state.error_message or "").lower() |
| for k in ["killed", "oom", "out of memory", "cuda out of memory"]): |
| return "OOM" |
| return "RUNTIME_ERROR" |
| if stage == "PAUSED": |
| return "PAUSED" |
| if stage == "SLEEPING": |
| return "SLEEPING" |
| if stage == "NO_APP_FILE": |
| return "NO_APP_FILE" |
| if state.requested_hardware and state.requested_hardware != state.hardware: |
| return "HARDWARE_PENDING" |
| return None |
|
|
| def _heal(self, repo_id: str, state: SpaceState, diagnosis: str) -> HealAction: |
| """Execute fix from playbook.""" |
| now = datetime.utcnow().isoformat() |
| playbook = FIX_PLAYBOOK.get(diagnosis, {"action": "alert_human", "description": "Unknown issue", "cost_action": "none"}) |
| action_name = playbook["action"] |
| result = "skipped" |
| executed = False |
|
|
| try: |
| if action_name == "restart": |
| self.api.restart_space(repo_id) |
| result = "restarted" |
| executed = True |
|
|
| elif action_name == "restart_then_pause_if_repeated": |
| if state.restart_count_1h >= HEALER_CONFIG["max_restarts_per_hour"]: |
| try: |
| self.api.pause_space(repo_id) |
| result = f"paused_after_{state.restart_count_1h}_restarts" |
| except Exception as e: |
| result = f"pause_failed: {str(e)[:80]}" |
| else: |
| try: |
| self.api.restart_space(repo_id) |
| result = "restarted" |
| except Exception as e: |
| result = f"restart_failed: {str(e)[:80]}" |
| executed = True |
|
|
| elif action_name == "downgrade_and_restart": |
| try: |
| self.api.request_space_hardware(repo_id, hardware=SpaceHardware.CPU_BASIC) |
| time.sleep(3) |
| self.api.restart_space(repo_id) |
| result = "downgraded_to_cpu_and_restarted" |
| except Exception as e: |
| result = f"downgrade_failed: {str(e)[:80]}" |
| executed = True |
|
|
| elif action_name == "restart_if_should_be_active": |
| try: |
| self.api.restart_space(repo_id) |
| result = "restarted" |
| except Exception as e: |
| result = f"restart_failed: {str(e)[:80]}" |
| executed = True |
|
|
| elif action_name == "restart_on_demand_only": |
| result = "left_sleeping" |
| executed = False |
|
|
| elif action_name == "alert_human": |
| if HEALER_CONFIG["auto_pause_on_error"]: |
| try: |
| self.api.pause_space(repo_id) |
| result = "paused_for_human_review" |
| except Exception as e: |
| result = f"pause_failed: {str(e)[:80]}" |
| else: |
| result = "alerted_human" |
| executed = True |
|
|
| elif action_name == "wait": |
| result = "waiting_for_provisioning" |
| executed = False |
|
|
| except Exception as e: |
| result = f"error: {str(e)[:100]}" |
| executed = False |
|
|
| action = HealAction( |
| action=action_name, |
| description=playbook["description"], |
| executed=executed, |
| result=result, |
| timestamp=now, |
| ) |
|
|
| self.fix_log.append({ |
| "repo_id": repo_id, |
| "diagnosis": diagnosis, |
| **asdict(action), |
| }) |
|
|
| return action |
|
|
| |
|
|
| def optimize_costs(self, repo_id: str): |
| """Apply aggressive cost-saving measures.""" |
| state = self.state_cache.get(repo_id) |
| if not state: |
| return |
|
|
| hw = (state.hardware or "cpu-basic").lower() |
| cost_rate = HEALER_CONFIG["cost_per_hour"].get(hw, 0.0) |
| cc = HEALER_CONFIG["cost_cutting"] |
| saved = [] |
|
|
| |
| if cc["pause_broken_spaces"] and cost_rate > 0 and state.stage in ("RUNTIME_ERROR", "BUILD_ERROR", "NO_APP_FILE"): |
| try: |
| self.api.pause_space(repo_id) |
| saved.append(f"paused broken space (${cost_rate}/hr)") |
| except Exception: |
| pass |
|
|
| |
| if cc["downgrade_oom_to_cpu"] and state.stage == "RUNTIME_ERROR" and state.error_message: |
| if any(k in state.error_message.lower() for k in ["killed", "oom", "out of memory"]): |
| try: |
| self.api.request_space_hardware(repo_id, hardware=SpaceHardware.CPU_BASIC) |
| saved.append("downgraded OOM to CPU") |
| except Exception: |
| pass |
|
|
| |
| if cc["set_auto_sleep_on_paid_hw"] and cost_rate > 0 and state.sleep_time is None: |
| try: |
| self.api.set_space_sleep_time(repo_id, sleep_time=HEALER_CONFIG["default_sleep_seconds"]) |
| saved.append(f"auto-sleep {HEALER_CONFIG['default_sleep_seconds']}s") |
| except Exception: |
| pass |
|
|
| |
| if cc["pause_during_night_hours"]: |
| hour = datetime.utcnow().hour |
| night_start = cc["night_hours"]["start"] |
| night_end = cc["night_hours"]["end"] |
| is_night = (hour >= night_start or hour < night_end) |
| if is_night and cost_rate > 0 and state.stage == "RUNNING": |
| |
| protected = ["Cydonia-24B-Chat", "Qwen3.5-27B-Claude-4.6-Opus-Reasoning-Distilled"] |
| if not any(p in repo_id for p in protected): |
| try: |
| self.api.pause_space(repo_id) |
| saved.append("night-pause") |
| except Exception: |
| pass |
|
|
| if saved: |
| print(f"[Healer] π° {repo_id}: {', '.join(saved)}") |
|
|
| |
|
|
| def generate_report(self) -> Dict: |
| """Generate health and cost report.""" |
| report = { |
| "generated_at": datetime.utcnow().isoformat(), |
| "spaces": [], |
| "total_estimated_daily_cost": 0.0, |
| "actions_today": len(self.fix_log), |
| "fix_log": self.fix_log[-50:], |
| } |
|
|
| for repo_id, state in self.state_cache.items(): |
| report["spaces"].append(asdict(state)) |
| report["total_estimated_daily_cost"] += state.estimated_cost_today |
|
|
| return report |
|
|
| def print_report(self): |
| """Print formatted report to console.""" |
| report = self.generate_report() |
| print("\n" + "=" * 70) |
| print(f"π©Ί SPACE HEALER REPORT β {report['generated_at']}") |
| print("=" * 70) |
| print(f"\nπ° Total estimated daily cost: ${report['total_estimated_daily_cost']:.2f}") |
| print(f"π§ Auto-heal actions today: {report['actions_today']}\n") |
|
|
| for s in report["spaces"]: |
| status_emoji = "π’" if s['stage'] == "RUNNING" else "π΄" if s['stage'] in ("RUNTIME_ERROR", "BUILD_ERROR") else "π‘" |
| print(f"{status_emoji} {s['repo_id']}") |
| print(f" Stage: {s['stage']} | HW: {s['hardware']} | Restarts/hr: {s['restart_count_1h']}") |
| print(f" Est. daily cost: ${s['estimated_cost_today']:.2f}") |
| if s['error_message']: |
| print(f" Error: {s['error_message'][:120]}...") |
| print() |
|
|
| if report["fix_log"]: |
| print("Recent heal actions:") |
| for a in report["fix_log"][-5:]: |
| emoji = "β
" if a["executed"] else "βΈοΈ" |
| print(f" {emoji} [{a['timestamp'][:19]}] {a['repo_id']}: {a['action']} β {a['result']}") |
| print("=" * 70 + "\n") |
|
|
| |
|
|
| def run_cycle(self, spaces: Optional[List[str]] = None): |
| """Run one monitoring/healing cycle.""" |
| if spaces is None: |
| spaces = self.discover_spaces() |
|
|
| print(f"[Healer] π Checking {len(spaces)} spaces at {datetime.utcnow().isoformat()}") |
|
|
| for repo_id in spaces: |
| state, action = self.check_space(repo_id) |
|
|
| if action and action.executed: |
| print(f"[Healer] π§ {repo_id}: {action.action} β {action.result}") |
| elif action: |
| print(f"[Healer] βΈοΈ {repo_id}: {action.action} β {action.result}") |
| else: |
| print(f"[Healer] π’ {repo_id}: {state.stage}") |
|
|
| |
| self.optimize_costs(repo_id) |
|
|
| def run_daemon(self, spaces: Optional[List[str]] = None): |
| """Run continuous monitoring loop.""" |
| print("[Healer] π€ Autonomous self-healing daemon started") |
| print(f"[Healer] Poll interval: {HEALER_CONFIG['poll_interval_seconds']}s") |
|
|
| while True: |
| try: |
| self.run_cycle(spaces) |
| self.print_report() |
| except Exception as e: |
| print(f"[Healer] β Cycle error: {e}") |
| traceback.print_exc() |
|
|
| time.sleep(HEALER_CONFIG["poll_interval_seconds"]) |
|
|
|
|
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Autonomous Space Self-Healer") |
| parser.add_argument("--daemon", action="store_true", help="Run continuous monitoring") |
| parser.add_argument("--once", action="store_true", help="Single check cycle") |
| parser.add_argument("--space", help="Check specific space only") |
| parser.add_argument("--report", action="store_true", help="Generate report") |
| parser.add_argument("--discover", default="ScottzillaSystems", help="Namespace to discover") |
| args = parser.parse_args() |
|
|
| healer = SpaceHealer() |
|
|
| if args.space: |
| state, action = healer.check_space(args.space) |
| healer.print_report() |
| elif args.report: |
| healer.print_report() |
| elif args.once: |
| spaces = healer.discover_spaces(args.discover) |
| healer.run_cycle(spaces) |
| healer.print_report() |
| elif args.daemon: |
| spaces = healer.discover_spaces(args.discover) |
| healer.run_daemon(spaces) |
| else: |
| parser.print_help() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|