conversation-memory / fleet_manager.py
ScottzillaSystems's picture
Upload fleet_manager.py
3c03ca6 verified
#!/usr/bin/env python3
"""
Agent Zero Fleet Manager
Manages 4 autonomous Agent Zero instances with workspace isolation,
self-healing, config persistence, inter-agent delegation, and persistent storage.
Usage:
python fleet_manager.py --check # Check all 4 agents
python fleet_manager.py --provision-all # Provision entire fleet
python fleet_manager.py --set-hw agent-zero-main zero-a10g
python fleet_manager.py --enable-storage agent-zero-main
python fleet_manager.py --deploy-prompts agent-zero-main
python fleet_manager.py --setup-pipeline # Configure inter-agent task queue
"""
import os
import json
import time
import argparse
from datetime import datetime
from typing import List, Dict, Optional
from huggingface_hub import HfApi, SpaceHardware
# ─── Fleet Configuration ─────────────────────────────────────────────────────
FLEET = {
"agent-zero-main": {
"repo": "ScottzillaSystems/agentzerov2",
"role": "sovereign",
"description": "Main orchestrator β€” receives all tasks, delegates to specialists",
"models": ["qwen3.5-27b", "cydonia-24b", "fallen-command"],
"hardware_target": "zero-a10g",
"persistent_storage": True,
},
"agent-zero-pentesting": {
"repo": "ScottzillaSystems/agent-zero-pentesting",
"role": "specialist",
"description": "Security specialist β€” writes findings to shared task queue",
"models": ["qwen3.5-9b"],
"hardware_target": "zero-a10g",
"persistent_storage": True,
},
"agent-zero-finops": {
"repo": "ScottzillaSystems/agent-zero-finops",
"role": "specialist",
"description": "Cost optimization specialist β€” monitors billing, suggests cuts",
"models": ["chatgpt5"],
"hardware_target": "cpu-basic",
"persistent_storage": False,
},
"agent-zero-creative": {
"repo": "ScottzillaSystems/agent-zero-adult-entertainment",
"role": "specialist",
"description": "Creative specialist β€” content generation, image workflows",
"models": ["qwen3.5-9b"],
"hardware_target": "zero-a10g",
"persistent_storage": True,
},
}
# ─── Prompts ─────────────────────────────────────────────────────────────────
SELF_DIAGNOSTIC_PROMPT = """# Agent Zero Self-Diagnostic Protocol
Run this diagnostic immediately on startup and report results:
## 1. System Health
- [ ] GPU available: `torch.cuda.is_available()` β†’ report True/False
- [ ] GPU memory: `torch.cuda.get_device_properties(0).total_memory / 1e9` β†’ report GB
- [ ] Current model loaded: report model_key
- [ ] Model cache status: list all keys in _model_cache
## 2. Model Loading Test
For each model in your catalog:
1. Call `load_model(model_key)`
2. Report: success/failure, load time, GPU memory after load
3. Call `unload_model(model_key)`
4. Report: memory freed
## 3. Inference Test
Send: "What is 2+2?"
Expected: Response contains "4"
Report: latency_ms, tokens_generated, tokens/sec
## 4. File System Test
- [ ] Write test file to `/workspace/projects/test.txt`
- [ ] Read it back
- [ ] Report: persistent storage working (survives restart?)
## 5. Network Test
- [ ] Can reach huggingface.co?
- [ ] Can reach your model repos?
## 6. Inter-Agent Test (if applicable)
- [ ] Can write to `/workspace/shared/task_queue/`?
- [ ] Can read from `/workspace/shared/task_queue/`?
Report all results in a single JSON block.
"""
AUTONOMOUS_LOOP_PROMPT = """# Agent Zero Autonomous Operation Loop
You are now running autonomously. Follow this loop indefinitely:
## Loop Cycle (every 60 seconds)
1. **SCAN** `/workspace/shared/task_queue/` for new task files
2. **READ** each task file (JSON format)
3. **EXECUTE** the task using your loaded model
4. **WRITE** results to `/workspace/shared/results/{task_id}.json`
5. **ARCHIVE** completed task to `/workspace/shared/completed/`
6. **SELF-HEAL** if model fails:
- Unload current model
- Try fallback model from catalog
- If all fail, pause and alert
7. **REPORT** status to stdout every cycle
## Task File Format
```json
{
"task_id": "uuid",
"source_agent": "agent-zero-pentesting",
"priority": 1-5,
"task_type": "analysis|generation|review|action",
"prompt": "...",
"context": {},
"deadline": "ISO8601",
"created_at": "ISO8601"
}
```
## Result File Format
```json
{
"task_id": "uuid",
"completed_by": "agent-zero-main",
"completed_at": "ISO8601",
"status": "success|failure|partial",
"output": "...",
"model_used": "cydonia-24b",
"tokens_used": 1234,
"latency_ms": 5600
}
```
## Self-Healing Rules
- If OOM: unload all models, wait 10s, reload smallest model
- If model download fails: retry 3x with exponential backoff
- If GPU not available: switch to CPU mode (slower but functional)
- If task queue full (>100): process highest priority first
Begin autonomous operation now.
"""
# ─── Fleet Manager ───────────────────────────────────────────────────────────
class AgentZeroFleetManager:
def __init__(self, token: Optional[str] = None):
self.api = HfApi(token=token or os.getenv("HF_TOKEN"))
self.status_log: List[Dict] = []
def check_fleet(self) -> Dict:
"""Check status of all 4 agent instances."""
report = {
"checked_at": datetime.utcnow().isoformat(),
"agents": [],
"issues": [],
}
for name, config in FLEET.items():
repo_id = config["repo"]
try:
runtime = self.api.get_space_runtime(repo_id)
agent_report = {
"name": name,
"repo": repo_id,
"stage": runtime.stage,
"hardware": runtime.hardware,
"requested_hardware": runtime.requested_hardware,
"role": config["role"],
"healthy": runtime.stage == "RUNNING",
}
report["agents"].append(agent_report)
if runtime.stage != "RUNNING":
report["issues"].append(f"{name}: {runtime.stage} (expected RUNNING)")
if runtime.hardware != config["hardware_target"]:
report["issues"].append(
f"{name}: hardware={runtime.hardware} (target={config['hardware_target']})"
)
except Exception as e:
report["agents"].append({
"name": name,
"repo": repo_id,
"stage": "ERROR",
"error": str(e),
})
report["issues"].append(f"{name}: API error - {e}")
self.status_log.append(report)
return report
def set_hardware(self, agent_name: str, hardware: str):
"""Set hardware for an agent Space."""
config = FLEET.get(agent_name)
if not config:
raise ValueError(f"Unknown agent: {agent_name}")
repo_id = config["repo"]
hardware_enum = getattr(SpaceHardware, hardware.upper().replace("-", "_"))
self.api.request_space_hardware(repo_id, hardware=hardware_enum)
print(f"[Fleet] Set {agent_name} -> {hardware}")
def enable_persistent_storage(self, agent_name: str):
"""Enable /data volume on a Space."""
config = FLEET.get(agent_name)
if not config:
raise ValueError(f"Unknown agent: {agent_name}")
repo_id = config["repo"]
import requests
resp = requests.put(
f"https://huggingface.co/api/spaces/{repo_id}/volumes",
headers={"Authorization": f"Bearer {self.api.token}"},
json={"data": True},
)
if resp.status_code == 200:
print(f"[Fleet] Persistent storage enabled for {agent_name}")
else:
print(f"[Fleet] Failed to enable storage for {agent_name}: {resp.status_code}")
def restart_agent(self, agent_name: str):
"""Restart an agent Space."""
config = FLEET.get(agent_name)
if not config:
raise ValueError(f"Unknown agent: {agent_name}")
self.api.restart_space(config["repo"])
print(f"[Fleet] Restarted {agent_name}")
def print_report(self, report: Dict):
"""Print formatted fleet status."""
print("\n" + "=" * 70)
print(f"πŸ€– AGENT ZERO FLEET REPORT β€” {report['checked_at']}")
print("=" * 70)
for agent in report["agents"]:
emoji = "🟒" if agent.get("healthy") else "πŸ”΄"
print(f"\n{emoji} {agent['name']} ({agent['role']})")
print(f" Repo: {agent['repo']}")
print(f" Stage: {agent.get('stage', 'unknown')}")
print(f" Hardware: {agent.get('hardware', 'none')}")
if "error" in agent:
print(f" ❌ Error: {agent['error']}")
if report["issues"]:
print(f"\n⚠️ ISSUES ({len(report['issues'])}):")
for issue in report["issues"]:
print(f" - {issue}")
else:
print("\nβœ… All agents healthy")
print("=" * 70 + "\n")
def deploy_prompts(self, agent_name: str):
"""Deploy diagnostic and autonomous loop prompts to an agent."""
config = FLEET.get(agent_name)
if not config:
raise ValueError(f"Unknown agent: {agent_name}")
repo_id = config["repo"]
self.api.upload_file(
path_or_fileobj=SELF_DIAGNOSTIC_PROMPT.encode(),
path_in_repo="prompts/self_diagnostic.md",
repo_id=repo_id,
repo_type="space",
)
self.api.upload_file(
path_or_fileobj=AUTONOMOUS_LOOP_PROMPT.encode(),
path_in_repo="prompts/autonomous_loop.md",
repo_id=repo_id,
repo_type="space",
)
print(f"[Fleet] Prompts deployed to {agent_name}")
def setup_inter_agent_pipeline(self):
"""Configure shared task queue directories."""
shared_setup = """#!/bin/bash
mkdir -p /workspace/shared/task_queue
mkdir -p /workspace/shared/results
mkdir -p /workspace/shared/completed
mkdir -p /workspace/projects
chmod 777 /workspace/shared/task_queue
chmod 777 /workspace/shared/results
chmod 777 /workspace/shared/completed
chmod 777 /workspace/projects
echo "Shared directories ready"
"""
for name, config in FLEET.items():
self.api.upload_file(
path_or_fileobj=shared_setup.encode(),
path_in_repo="setup_shared.sh",
repo_id=config["repo"],
repo_type="space",
)
print("[Fleet] Inter-agent pipeline configured")
def main():
parser = argparse.ArgumentParser(description="Agent Zero Fleet Manager")
parser.add_argument("--check", action="store_true", help="Check fleet status")
parser.add_argument("--set-hw", nargs=2, metavar=("AGENT", "HW"), help="Set hardware")
parser.add_argument("--enable-storage", metavar="AGENT", help="Enable persistent storage")
parser.add_argument("--restart", metavar="AGENT", help="Restart agent")
parser.add_argument("--deploy-prompts", metavar="AGENT", help="Deploy prompts")
parser.add_argument("--setup-pipeline", action="store_true", help="Setup inter-agent pipeline")
parser.add_argument("--provision-all", action="store_true", help="Provision entire fleet")
args = parser.parse_args()
manager = AgentZeroFleetManager()
if args.check:
report = manager.check_fleet()
manager.print_report(report)
elif args.set_hw:
manager.set_hardware(args.set_hw[0], args.set_hw[1])
elif args.enable_storage:
manager.enable_persistent_storage(args.enable_storage)
elif args.restart:
manager.restart_agent(args.restart)
elif args.deploy_prompts:
manager.deploy_prompts(args.deploy_prompts)
elif args.setup_pipeline:
manager.setup_inter_agent_pipeline()
elif args.provision_all:
print("[Fleet] Provisioning entire fleet...")
for name, config in FLEET.items():
print(f"\n--- {name} ---")
try:
manager.set_hardware(name, config["hardware_target"])
if config["persistent_storage"]:
manager.enable_persistent_storage(name)
manager.restart_agent(name)
except Exception as e:
print(f"Failed: {e}")
print("\n[Fleet] Provisioning complete. Waiting for builds...")
else:
parser.print_help()
if __name__ == "__main__":
main()