SpaceProbe1 / services /connection_manager.py
a9's picture
Upload 61 files
9582187 verified
import asyncio
import json
from datetime import datetime
from services.stream_client import SSEClient
from services.storage import storage
from models.schema import MetricPoint, RepoStatus
from utils.time import exponential_backoff
class RepoConnection:
def __init__(self, repo_id: str, namespace: str, repo: str):
self.repo_id = repo_id
self.namespace = namespace
self.repo_name = repo
self.tasks = []
self.is_running = False
async def _listen_events(self):
attempt = 0
url = f"https://huggingface.co/api/spaces/{self.namespace}/{self.repo_name}/events"
print(url)
while True:
try:
async for data in SSEClient(url).connect():
attempt = 0
payload = json.loads(data)
stage = payload.get("compute", {}).get("status", {}).get("stage", "Unknown")
storage.set_status(self.repo_id, RepoStatus(
state="CONNECTED", stage=stage, last_updated=datetime.utcnow()
))
if stage == "Running" and not self.is_running:
self.is_running = True
self.tasks.append(asyncio.create_task(self._listen_metrics()))
elif stage != "Running":
self.is_running = False
except Exception:
storage.set_status(self.repo_id, RepoStatus(
state="ERROR",
stage="Disconnected",
last_updated=datetime.utcnow()
))
attempt += 1
await exponential_backoff(attempt)
async def _listen_metrics(self):
url = f"https://huggingface.co/api/spaces/{self.namespace}/{self.repo_name}/metrics"
while self.is_running:
try:
async for data in SSEClient(url).connect():
payload = json.loads(data)
storage.add_metric(self.repo_id, MetricPoint(
timestamp=datetime.utcnow(),
cpu_usage_pct=payload.get("cpu_usage_pct", 0),
memory_used_bytes=payload.get("memory_used_bytes", 0),
memory_total_bytes=payload.get("memory_total_bytes", 1)
))
except Exception:
await asyncio.sleep(1)
def start(self):
self.tasks.append(asyncio.create_task(self._listen_events()))
def stop(self):
for t in self.tasks: t.cancel()