Spaces:
Sleeping
Sleeping
| """ | |
| Hugging Face Spaces Cluster - Controller | |
| ========================================= | |
| Koordiniert Worker Spaces und verteilt Tasks. | |
| Deployment: | |
| 1. Diese Datei auf Hugging Face Space hochladen | |
| 2. requirements.txt hochladen | |
| 3. Space startet automatisch | |
| """ | |
| import os | |
| import time | |
| import json | |
| import uuid | |
| import threading | |
| import numpy as np | |
| from collections import defaultdict | |
| from datetime import datetime | |
| import gradio as gr | |
| # Hugging Face Konfiguration | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| CONTROLLER_ID = os.getenv("CONTROLLER_ID", "controller") | |
| SPACE_NAME = os.getenv("SPACE_NAME", "") | |
| # ============================================ | |
| # Cluster Management | |
| # ============================================ | |
| class ClusterController: | |
| """Verwaltet Worker und verteilt Tasks""" | |
| def __init__(self): | |
| self.workers = {} # worker_id -> {status, last_seen, tasks_completed} | |
| self.tasks = {} # task_id -> {status, result, worker_id} | |
| self.results = {} # task_id -> result | |
| self.lock = threading.Lock() | |
| def register_worker(self, worker_id): | |
| """Registriert einen Worker""" | |
| with self.lock: | |
| self.workers[worker_id] = { | |
| "status": "ready", | |
| "last_seen": datetime.now(), | |
| "tasks_completed": 0 | |
| } | |
| print(f"✅ Worker registriert: {worker_id}") | |
| return {"status": "ok"} | |
| def get_available_worker(self): | |
| """Findet verfügbaren Worker""" | |
| with self.lock: | |
| for worker_id, info in self.workers.items(): | |
| if info["status"] == "ready": | |
| # Worker als busy markieren | |
| info["status"] = "busy" | |
| return worker_id | |
| return None | |
| def submit_task(self, task_type, data): | |
| """Submit一个新 Task""" | |
| task_id = str(uuid.uuid4()) | |
| with self.lock: | |
| self.tasks[task_id] = { | |
| "type": task_type, | |
| "data": data, | |
| "status": "pending", | |
| "created": datetime.now(), | |
| "worker_id": None, | |
| "result": None | |
| } | |
| # Task an Worker verteilen | |
| self._distribute_task(task_id) | |
| return task_id | |
| def _distribute_task(self, task_id): | |
| """Verteilt Task an verfügbaren Worker""" | |
| worker_id = self.get_available_worker() | |
| if worker_id is None: | |
| # Kein Worker verfügbar, Task bleibt pending | |
| return None | |
| with self.lock: | |
| task = self.tasks[task_id] | |
| task["worker_id"] = worker_id | |
| task["status"] = "assigned" | |
| print(f"📤 Task {task_id[:8]} → Worker {worker_id}") | |
| return worker_id | |
| def submit_result(self, worker_id, task_id, result): | |
| """Speichert Ergebnis von Worker""" | |
| with self.lock: | |
| if task_id in self.tasks: | |
| self.tasks[task_id]["result"] = result | |
| self.tasks[task_id]["status"] = "completed" | |
| if worker_id in self.workers: | |
| self.workers[worker_id]["status"] = "ready" | |
| self.workers[worker_id]["tasks_completed"] += 1 | |
| print(f"✅ Task {task_id[:8]} abgeschlossen von {worker_id}") | |
| return {"status": "ok"} | |
| def get_task_status(self, task_id): | |
| """Gibt Task-Status zurück""" | |
| with self.lock: | |
| if task_id in self.tasks: | |
| task = self.tasks[task_id] | |
| return { | |
| "task_id": task_id, | |
| "status": task["status"], | |
| "result": task["result"], | |
| "worker_id": task["worker_id"] | |
| } | |
| return {"status": "not_found"} | |
| def get_cluster_status(self): | |
| """Gibt Cluster-Übersicht""" | |
| with self.lock: | |
| total_workers = len(self.workers) | |
| ready_workers = sum(1 for w in self.workers.values() if w["status"] == "ready") | |
| busy_workers = sum(1 for w in self.workers.values() if w["status"] == "busy") | |
| total_tasks = len(self.tasks) | |
| pending_tasks = sum(1 for t in self.tasks.values() if t["status"] == "pending") | |
| completed_tasks = sum(1 for t in self.tasks.values() if t["status"] == "completed") | |
| return { | |
| "workers": { | |
| "total": total_workers, | |
| "ready": ready_workers, | |
| "busy": busy_workers | |
| }, | |
| "tasks": { | |
| "total": total_tasks, | |
| "pending": pending_tasks, | |
| "completed": completed_tasks | |
| }, | |
| "worker_list": [ | |
| {"id": wid, "status": info["status"], "tasks": info["tasks_completed"]} | |
| for wid, info in self.workers.items() | |
| ] | |
| } | |
| def process_batch(self, task_type, data_chunks): | |
| """Verarbeitet Batch von Daten-Chunks parallel""" | |
| task_ids = [] | |
| # Tasks für alle Chunks erstellen | |
| for chunk in data_chunks: | |
| task_id = self.submit_task(task_type, chunk) | |
| task_ids.append(task_id) | |
| # Auf Ergebnisse warten | |
| results = [] | |
| start_time = time.time() | |
| timeout = 60 # 60 Sekunden Timeout | |
| for task_id in task_ids: | |
| remaining = timeout - (time.time() - start_time) | |
| if remaining <= 0: | |
| results.append({"error": "timeout"}) | |
| continue | |
| while True: | |
| status = self.get_task_status(task_id) | |
| if status["status"] == "completed": | |
| results.append(status["result"]) | |
| break | |
| elif status["status"] == "pending": | |
| # Retry Task-Verteilung | |
| self._distribute_task(task_id) | |
| time.sleep(0.5) | |
| if time.time() - start_time > timeout: | |
| results.append({"error": "timeout"}) | |
| break | |
| return results | |
| # Globaler Controller | |
| controller = ClusterController() | |
| # ============================================ | |
| # Gradio Interface | |
| # ============================================ | |
| def ui_submit_task(task_type, data_str): | |
| """UI: Task submit""" | |
| import numpy as np | |
| try: | |
| data = json.loads(data_str) | |
| if isinstance(data, list): | |
| data = np.array(data) | |
| task_id = controller.submit_task(task_type, data) | |
| return f"✅ Task submitted: `{task_id[:8]}`" | |
| except Exception as e: | |
| return f"❌ Error: {e}" | |
| def ui_get_status(): | |
| """UI: Cluster Status anzeigen""" | |
| status = controller.get_cluster_status() | |
| workers_html = "<br>".join([ | |
| f" • {w['id']}: {'🟢' if w['status'] == 'ready' else '🔴'} ({w['tasks']} Tasks)" | |
| for w in status["worker_list"] | |
| ]) or " Keine Worker registriert" | |
| return f""" | |
| ## Cluster Status | |
| ### Workers | |
| - Gesamt: {status['workers']['total']} | |
| - Bereit: {status['workers']['ready']} 🟢 | |
| - Beschäftigt: {status['workers']['busy']} 🔴 | |
| ### Tasks | |
| - Gesamt: {status['tasks']['total']} | |
| - Pending: {status['tasks']['pending']} | |
| - Abgeschlossen: {status['tasks']['completed']} | |
| ### Worker Liste | |
| {workers_html} | |
| """ | |
| def ui_check_task(task_id): | |
| """UI: Task-Status prüfen""" | |
| status = controller.get_task_status(task_id) | |
| return json.dumps(status, indent=2, default=str) | |
| def ui_process_batch(num_chunks): | |
| """UI: Batch Processing Demo""" | |
| import numpy as np | |
| # Daten in Chunks teilen | |
| data = np.random.random(10000) | |
| chunks = np.array_split(data, int(num_chunks)) | |
| # Batch verarbeiten | |
| results = controller.process_batch("sum", chunks) | |
| # Ergebnisse aggregieren | |
| valid_results = [r for r in results if isinstance(r, (int, float))] | |
| total = sum(valid_results) | |
| return f""" | |
| ### Batch-Ergebnis | |
| - Chunks: {len(chunks)} | |
| - Ergebnisse: {len(valid_results)}/{len(results)} | |
| - Summe: {total:.4f} | |
| - Durchschnitte: {[f'{r:.4f}' for r in valid_results[:5]]}{'...' if len(valid_results) > 5 else ''} | |
| """ | |
| # Gradio UI | |
| with gr.Blocks(title="Cluster Controller") as demo: | |
| gr.Markdown("# 🤗 Hugging Face Spaces Cluster Controller") | |
| with gr.Tabs(): | |
| with gr.Tab("Cluster Status"): | |
| status_btn = gr.Button("Status aktualisieren") | |
| status_output = gr.Markdown(ui_get_status()) | |
| status_btn.click(ui_get_status, outputs=status_output) | |
| with gr.Tab("Task Submit"): | |
| task_type = gr.Dropdown( | |
| choices=["sum", "mean", "matrix_multiply", "inference"], | |
| value="sum", | |
| label="Task Typ" | |
| ) | |
| data_input = gr.Textbox( | |
| label="Daten (JSON)", | |
| placeholder="[1, 2, 3, 4, 5]", | |
| value="[1, 2, 3, 4, 5]" | |
| ) | |
| submit_btn = gr.Button("Task absenden") | |
| task_result = gr.Textbox(label="Ergebnis") | |
| submit_btn.click(ui_submit_task, inputs=[task_type, data_input], outputs=task_result) | |
| with gr.Tab("Batch Processing"): | |
| num_chunks = gr.Slider(1, 10, value=3, step=1, label="Anzahl Chunks") | |
| batch_btn = gr.Button("Batch starten") | |
| batch_output = gr.Markdown() | |
| batch_btn.click(ui_process_batch, inputs=num_chunks, outputs=batch_output) | |
| with gr.Tab("API Info"): | |
| gr.Markdown(""" | |
| ## API Endpoints | |
| ``` | |
| POST /api/register | |
| {"worker_id": "worker-1"} | |
| GET /api/get_task?worker_id=worker-1 | |
| POST /api/submit_result | |
| {"worker_id": "worker-1", "task_id": "...", "result": 42} | |
| GET /api/task_status?task_id=... | |
| GET /api/cluster_status | |
| ``` | |
| """) | |
| # Auto-refresh alle 5 Sekunden | |
| demo.load(ui_get_status, outputs=status_output, every=5) | |
| # ============================================ | |
| # FastAPI Backend (für Worker-Kommunikation) | |
| # ============================================ | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse | |
| app = FastAPI() | |
| async def register_worker(request: Request): | |
| data = await request.json() | |
| return controller.register_worker(data["worker_id"]) | |
| async def get_task(worker_id: str): | |
| # Einfache Implementierung - in Produktion besser queue-basiert | |
| with controller.lock: | |
| for task_id, task in controller.tasks.items(): | |
| if task["status"] == "pending": | |
| task["status"] = "assigned" | |
| task["worker_id"] = worker_id | |
| return {"id": task_id, "type": task["type"], "data": task["data"]} | |
| return {} | |
| async def submit_result(request: Request): | |
| data = await request.json() | |
| return controller.submit_result( | |
| data["worker_id"], | |
| data["task_id"], | |
| data["result"] | |
| ) | |
| async def task_status(task_id: str): | |
| return controller.get_task_status(task_id) | |
| async def cluster_status(): | |
| return controller.get_cluster_status() | |
| # ============================================ | |
| # Main | |
| # ============================================ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print(f"🚀 Starte Cluster Controller: {CONTROLLER_ID}") | |
| print(f" Space: {SPACE_NAME}") | |
| # Gradio + FastAPI starten | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |