| |
| """ |
| Agent Zero Fleet Manager |
| Manages 4 autonomous Agent Zero instances with workspace isolation, |
| self-healing, config persistence, inter-agent delegation, and persistent storage. |
| |
| Usage: |
| python fleet_manager.py --check # Check all 4 agents |
| python fleet_manager.py --provision-all # Provision entire fleet |
| python fleet_manager.py --set-hw agent-zero-main zero-a10g |
| python fleet_manager.py --enable-storage agent-zero-main |
| python fleet_manager.py --deploy-prompts agent-zero-main |
| python fleet_manager.py --setup-pipeline # Configure inter-agent task queue |
| """ |
|
|
| import os |
| import json |
| import time |
| import argparse |
| from datetime import datetime |
| from typing import List, Dict, Optional |
|
|
| from huggingface_hub import HfApi, SpaceHardware |
|
|
|
|
| |
|
|
| FLEET = { |
| "agent-zero-main": { |
| "repo": "ScottzillaSystems/agentzerov2", |
| "role": "sovereign", |
| "description": "Main orchestrator β receives all tasks, delegates to specialists", |
| "models": ["qwen3.5-27b", "cydonia-24b", "fallen-command"], |
| "hardware_target": "zero-a10g", |
| "persistent_storage": True, |
| }, |
| "agent-zero-pentesting": { |
| "repo": "ScottzillaSystems/agent-zero-pentesting", |
| "role": "specialist", |
| "description": "Security specialist β writes findings to shared task queue", |
| "models": ["qwen3.5-9b"], |
| "hardware_target": "zero-a10g", |
| "persistent_storage": True, |
| }, |
| "agent-zero-finops": { |
| "repo": "ScottzillaSystems/agent-zero-finops", |
| "role": "specialist", |
| "description": "Cost optimization specialist β monitors billing, suggests cuts", |
| "models": ["chatgpt5"], |
| "hardware_target": "cpu-basic", |
| "persistent_storage": False, |
| }, |
| "agent-zero-creative": { |
| "repo": "ScottzillaSystems/agent-zero-adult-entertainment", |
| "role": "specialist", |
| "description": "Creative specialist β content generation, image workflows", |
| "models": ["qwen3.5-9b"], |
| "hardware_target": "zero-a10g", |
| "persistent_storage": True, |
| }, |
| } |
|
|
|
|
| |
|
|
| SELF_DIAGNOSTIC_PROMPT = """# Agent Zero Self-Diagnostic Protocol |
| |
| Run this diagnostic immediately on startup and report results: |
| |
| ## 1. System Health |
| - [ ] GPU available: `torch.cuda.is_available()` β report True/False |
| - [ ] GPU memory: `torch.cuda.get_device_properties(0).total_memory / 1e9` β report GB |
| - [ ] Current model loaded: report model_key |
| - [ ] Model cache status: list all keys in _model_cache |
| |
| ## 2. Model Loading Test |
| For each model in your catalog: |
| 1. Call `load_model(model_key)` |
| 2. Report: success/failure, load time, GPU memory after load |
| 3. Call `unload_model(model_key)` |
| 4. Report: memory freed |
| |
| ## 3. Inference Test |
| Send: "What is 2+2?" |
| Expected: Response contains "4" |
| Report: latency_ms, tokens_generated, tokens/sec |
| |
| ## 4. File System Test |
| - [ ] Write test file to `/workspace/projects/test.txt` |
| - [ ] Read it back |
| - [ ] Report: persistent storage working (survives restart?) |
| |
| ## 5. Network Test |
| - [ ] Can reach huggingface.co? |
| - [ ] Can reach your model repos? |
| |
| ## 6. Inter-Agent Test (if applicable) |
| - [ ] Can write to `/workspace/shared/task_queue/`? |
| - [ ] Can read from `/workspace/shared/task_queue/`? |
| |
| Report all results in a single JSON block. |
| """ |
|
|
| AUTONOMOUS_LOOP_PROMPT = """# Agent Zero Autonomous Operation Loop |
| |
| You are now running autonomously. Follow this loop indefinitely: |
| |
| ## Loop Cycle (every 60 seconds) |
| |
| 1. **SCAN** `/workspace/shared/task_queue/` for new task files |
| 2. **READ** each task file (JSON format) |
| 3. **EXECUTE** the task using your loaded model |
| 4. **WRITE** results to `/workspace/shared/results/{task_id}.json` |
| 5. **ARCHIVE** completed task to `/workspace/shared/completed/` |
| 6. **SELF-HEAL** if model fails: |
| - Unload current model |
| - Try fallback model from catalog |
| - If all fail, pause and alert |
| 7. **REPORT** status to stdout every cycle |
| |
| ## Task File Format |
| ```json |
| { |
| "task_id": "uuid", |
| "source_agent": "agent-zero-pentesting", |
| "priority": 1-5, |
| "task_type": "analysis|generation|review|action", |
| "prompt": "...", |
| "context": {}, |
| "deadline": "ISO8601", |
| "created_at": "ISO8601" |
| } |
| ``` |
| |
| ## Result File Format |
| ```json |
| { |
| "task_id": "uuid", |
| "completed_by": "agent-zero-main", |
| "completed_at": "ISO8601", |
| "status": "success|failure|partial", |
| "output": "...", |
| "model_used": "cydonia-24b", |
| "tokens_used": 1234, |
| "latency_ms": 5600 |
| } |
| ``` |
| |
| ## Self-Healing Rules |
| - If OOM: unload all models, wait 10s, reload smallest model |
| - If model download fails: retry 3x with exponential backoff |
| - If GPU not available: switch to CPU mode (slower but functional) |
| - If task queue full (>100): process highest priority first |
| |
| Begin autonomous operation now. |
| """ |
|
|
|
|
| |
|
|
| class AgentZeroFleetManager: |
| def __init__(self, token: Optional[str] = None): |
| self.api = HfApi(token=token or os.getenv("HF_TOKEN")) |
| self.status_log: List[Dict] = [] |
|
|
| def check_fleet(self) -> Dict: |
| """Check status of all 4 agent instances.""" |
| report = { |
| "checked_at": datetime.utcnow().isoformat(), |
| "agents": [], |
| "issues": [], |
| } |
|
|
| for name, config in FLEET.items(): |
| repo_id = config["repo"] |
| try: |
| runtime = self.api.get_space_runtime(repo_id) |
| agent_report = { |
| "name": name, |
| "repo": repo_id, |
| "stage": runtime.stage, |
| "hardware": runtime.hardware, |
| "requested_hardware": runtime.requested_hardware, |
| "role": config["role"], |
| "healthy": runtime.stage == "RUNNING", |
| } |
| report["agents"].append(agent_report) |
|
|
| if runtime.stage != "RUNNING": |
| report["issues"].append(f"{name}: {runtime.stage} (expected RUNNING)") |
| if runtime.hardware != config["hardware_target"]: |
| report["issues"].append( |
| f"{name}: hardware={runtime.hardware} (target={config['hardware_target']})" |
| ) |
|
|
| except Exception as e: |
| report["agents"].append({ |
| "name": name, |
| "repo": repo_id, |
| "stage": "ERROR", |
| "error": str(e), |
| }) |
| report["issues"].append(f"{name}: API error - {e}") |
|
|
| self.status_log.append(report) |
| return report |
|
|
| def set_hardware(self, agent_name: str, hardware: str): |
| """Set hardware for an agent Space.""" |
| config = FLEET.get(agent_name) |
| if not config: |
| raise ValueError(f"Unknown agent: {agent_name}") |
|
|
| repo_id = config["repo"] |
| hardware_enum = getattr(SpaceHardware, hardware.upper().replace("-", "_")) |
|
|
| self.api.request_space_hardware(repo_id, hardware=hardware_enum) |
| print(f"[Fleet] Set {agent_name} -> {hardware}") |
|
|
| def enable_persistent_storage(self, agent_name: str): |
| """Enable /data volume on a Space.""" |
| config = FLEET.get(agent_name) |
| if not config: |
| raise ValueError(f"Unknown agent: {agent_name}") |
|
|
| repo_id = config["repo"] |
| import requests |
| resp = requests.put( |
| f"https://huggingface.co/api/spaces/{repo_id}/volumes", |
| headers={"Authorization": f"Bearer {self.api.token}"}, |
| json={"data": True}, |
| ) |
| if resp.status_code == 200: |
| print(f"[Fleet] Persistent storage enabled for {agent_name}") |
| else: |
| print(f"[Fleet] Failed to enable storage for {agent_name}: {resp.status_code}") |
|
|
| def restart_agent(self, agent_name: str): |
| """Restart an agent Space.""" |
| config = FLEET.get(agent_name) |
| if not config: |
| raise ValueError(f"Unknown agent: {agent_name}") |
|
|
| self.api.restart_space(config["repo"]) |
| print(f"[Fleet] Restarted {agent_name}") |
|
|
| def print_report(self, report: Dict): |
| """Print formatted fleet status.""" |
| print("\n" + "=" * 70) |
| print(f"π€ AGENT ZERO FLEET REPORT β {report['checked_at']}") |
| print("=" * 70) |
|
|
| for agent in report["agents"]: |
| emoji = "π’" if agent.get("healthy") else "π΄" |
| print(f"\n{emoji} {agent['name']} ({agent['role']})") |
| print(f" Repo: {agent['repo']}") |
| print(f" Stage: {agent.get('stage', 'unknown')}") |
| print(f" Hardware: {agent.get('hardware', 'none')}") |
| if "error" in agent: |
| print(f" β Error: {agent['error']}") |
|
|
| if report["issues"]: |
| print(f"\nβ οΈ ISSUES ({len(report['issues'])}):") |
| for issue in report["issues"]: |
| print(f" - {issue}") |
| else: |
| print("\nβ
All agents healthy") |
|
|
| print("=" * 70 + "\n") |
|
|
| def deploy_prompts(self, agent_name: str): |
| """Deploy diagnostic and autonomous loop prompts to an agent.""" |
| config = FLEET.get(agent_name) |
| if not config: |
| raise ValueError(f"Unknown agent: {agent_name}") |
|
|
| repo_id = config["repo"] |
|
|
| self.api.upload_file( |
| path_or_fileobj=SELF_DIAGNOSTIC_PROMPT.encode(), |
| path_in_repo="prompts/self_diagnostic.md", |
| repo_id=repo_id, |
| repo_type="space", |
| ) |
|
|
| self.api.upload_file( |
| path_or_fileobj=AUTONOMOUS_LOOP_PROMPT.encode(), |
| path_in_repo="prompts/autonomous_loop.md", |
| repo_id=repo_id, |
| repo_type="space", |
| ) |
|
|
| print(f"[Fleet] Prompts deployed to {agent_name}") |
|
|
| def setup_inter_agent_pipeline(self): |
| """Configure shared task queue directories.""" |
| shared_setup = """#!/bin/bash |
| mkdir -p /workspace/shared/task_queue |
| mkdir -p /workspace/shared/results |
| mkdir -p /workspace/shared/completed |
| mkdir -p /workspace/projects |
| chmod 777 /workspace/shared/task_queue |
| chmod 777 /workspace/shared/results |
| chmod 777 /workspace/shared/completed |
| chmod 777 /workspace/projects |
| echo "Shared directories ready" |
| """ |
| for name, config in FLEET.items(): |
| self.api.upload_file( |
| path_or_fileobj=shared_setup.encode(), |
| path_in_repo="setup_shared.sh", |
| repo_id=config["repo"], |
| repo_type="space", |
| ) |
|
|
| print("[Fleet] Inter-agent pipeline configured") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Agent Zero Fleet Manager") |
| parser.add_argument("--check", action="store_true", help="Check fleet status") |
| parser.add_argument("--set-hw", nargs=2, metavar=("AGENT", "HW"), help="Set hardware") |
| parser.add_argument("--enable-storage", metavar="AGENT", help="Enable persistent storage") |
| parser.add_argument("--restart", metavar="AGENT", help="Restart agent") |
| parser.add_argument("--deploy-prompts", metavar="AGENT", help="Deploy prompts") |
| parser.add_argument("--setup-pipeline", action="store_true", help="Setup inter-agent pipeline") |
| parser.add_argument("--provision-all", action="store_true", help="Provision entire fleet") |
| args = parser.parse_args() |
|
|
| manager = AgentZeroFleetManager() |
|
|
| if args.check: |
| report = manager.check_fleet() |
| manager.print_report(report) |
|
|
| elif args.set_hw: |
| manager.set_hardware(args.set_hw[0], args.set_hw[1]) |
|
|
| elif args.enable_storage: |
| manager.enable_persistent_storage(args.enable_storage) |
|
|
| elif args.restart: |
| manager.restart_agent(args.restart) |
|
|
| elif args.deploy_prompts: |
| manager.deploy_prompts(args.deploy_prompts) |
|
|
| elif args.setup_pipeline: |
| manager.setup_inter_agent_pipeline() |
|
|
| elif args.provision_all: |
| print("[Fleet] Provisioning entire fleet...") |
| for name, config in FLEET.items(): |
| print(f"\n--- {name} ---") |
| try: |
| manager.set_hardware(name, config["hardware_target"]) |
| if config["persistent_storage"]: |
| manager.enable_persistent_storage(name) |
| manager.restart_agent(name) |
| except Exception as e: |
| print(f"Failed: {e}") |
| print("\n[Fleet] Provisioning complete. Waiting for builds...") |
|
|
| else: |
| parser.print_help() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|