diffusers-pr-api / src /slop_farmer /data /ghreplica_api.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
import urllib.error
import urllib.request
from collections.abc import Callable, Iterable
from typing import Any
from slop_farmer.data.http import urlopen_with_retry
class GhReplicaApiRequestError(RuntimeError):
"""Raised when ghreplica returns a non-recoverable HTTP response."""
def __init__(self, status_code: int, path: str, detail: str):
self.status_code = status_code
self.path = path
self.detail = detail
super().__init__(f"ghreplica API request failed: {status_code} {path} {detail}")
class GhReplicaProbeUnavailableError(RuntimeError):
"""Raised when ghreplica cannot yet serve a live probe payload."""
def __init__(self, detail: str, *, status_code: int = 503):
self.status_code = status_code
super().__init__(detail)
class GhrProbeClient:
provider = "ghreplica"
def __init__(
self,
*,
base_url: str,
timeout: int = 180,
max_retries: int = 5,
log: Callable[[str], None] | None = None,
):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.max_retries = max_retries
self.log = log
def _request_json(self, path: str) -> Any:
request = urllib.request.Request(f"{self.base_url}{path}")
request.add_header("Accept", "application/json")
try:
with urlopen_with_retry(
request,
timeout=self.timeout,
max_retries=self.max_retries,
log=self.log,
label=path,
) as response:
payload = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise GhReplicaApiRequestError(exc.code, path, detail) from exc
return json.loads(payload)
def _request_json_or_none(self, path: str) -> Any | None:
try:
return self._request_json(path)
except GhReplicaApiRequestError as exc:
if exc.status_code == 404:
return None
raise
def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]:
try:
payload = self._request_json(f"/v1/github/repos/{owner}/{repo}/pulls/{number}")
except GhReplicaApiRequestError as exc:
if exc.status_code == 404:
raise GhReplicaProbeUnavailableError(
f"PR #{number} was not found in ghreplica.",
status_code=404,
) from exc
raise
if not isinstance(payload, dict):
raise RuntimeError(f"Expected dict payload for pull request, got {type(payload)!r}")
return payload
def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]:
try:
payload = self._request_json(f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/files")
except GhReplicaApiRequestError as exc:
if exc.status_code != 404:
raise
status = self.get_pull_request_status(owner, repo, number)
if isinstance(status, dict):
detail_bits = []
for key in (
"indexed",
"backfill_in_progress",
"changed_files",
"indexed_file_count",
):
if key in status:
detail_bits.append(f"{key}={status[key]}")
suffix = f" ({', '.join(detail_bits)})" if detail_bits else ""
raise GhReplicaProbeUnavailableError(
f"PR #{number} is not available in ghreplica yet{suffix}.",
status_code=503,
) from exc
raise GhReplicaProbeUnavailableError(
f"PR #{number} was not found in ghreplica changed-file replica.",
status_code=404,
) from exc
rows = payload if isinstance(payload, list) else payload.get("files")
if not isinstance(rows, list):
raise RuntimeError(
f"Expected list payload for pull request files, got {type(payload)!r}"
)
for row in rows:
if not isinstance(row, dict):
continue
additions = int(row.get("additions") or 0)
deletions = int(row.get("deletions") or 0)
yield {
"sha": row.get("sha"),
"filename": row.get("filename") or row.get("path"),
"status": row.get("status"),
"additions": additions,
"deletions": deletions,
"changes": row.get("changes") or additions + deletions,
"blob_url": row.get("blob_url"),
"raw_url": row.get("raw_url"),
"contents_url": row.get("contents_url"),
"previous_filename": row.get("previous_filename"),
"patch": row.get("patch"),
}
def get_pull_request_status(self, owner: str, repo: str, number: int) -> dict[str, Any] | None:
payload = self._request_json_or_none(
f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/status"
)
if payload is None:
return None
if not isinstance(payload, dict):
raise RuntimeError(
f"Expected dict payload for pull request status, got {type(payload)!r}"
)
return payload