Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| from datetime import datetime | |
| from services.stream_client import SSEClient | |
| from services.storage import storage | |
| from models.schema import MetricPoint, RepoStatus | |
| from utils.time import exponential_backoff | |
| class RepoConnection: | |
| def __init__(self, repo_id: str, namespace: str, repo: str): | |
| self.repo_id = repo_id | |
| self.namespace = namespace | |
| self.repo_name = repo | |
| self.tasks = [] | |
| self.is_running = False | |
| async def _listen_events(self): | |
| attempt = 0 | |
| url = f"https://huggingface.co/api/spaces/{self.namespace}/{self.repo_name}/events" | |
| print(url) | |
| while True: | |
| try: | |
| async for data in SSEClient(url).connect(): | |
| attempt = 0 | |
| payload = json.loads(data) | |
| stage = payload.get("compute", {}).get("status", {}).get("stage", "Unknown") | |
| storage.set_status(self.repo_id, RepoStatus( | |
| state="CONNECTED", stage=stage, last_updated=datetime.utcnow() | |
| )) | |
| if stage == "Running" and not self.is_running: | |
| self.is_running = True | |
| self.tasks.append(asyncio.create_task(self._listen_metrics())) | |
| elif stage != "Running": | |
| self.is_running = False | |
| except Exception: | |
| storage.set_status(self.repo_id, RepoStatus( | |
| state="ERROR", | |
| stage="Disconnected", | |
| last_updated=datetime.utcnow() | |
| )) | |
| attempt += 1 | |
| await exponential_backoff(attempt) | |
| async def _listen_metrics(self): | |
| url = f"https://huggingface.co/api/spaces/{self.namespace}/{self.repo_name}/metrics" | |
| while self.is_running: | |
| try: | |
| async for data in SSEClient(url).connect(): | |
| payload = json.loads(data) | |
| storage.add_metric(self.repo_id, MetricPoint( | |
| timestamp=datetime.utcnow(), | |
| cpu_usage_pct=payload.get("cpu_usage_pct", 0), | |
| memory_used_bytes=payload.get("memory_used_bytes", 0), | |
| memory_total_bytes=payload.get("memory_total_bytes", 1) | |
| )) | |
| except Exception: | |
| await asyncio.sleep(1) | |
| def start(self): | |
| self.tasks.append(asyncio.create_task(self._listen_events())) | |
| def stop(self): | |
| for t in self.tasks: t.cancel() |