diffusers-pr-api / src /slop_farmer /app /dataset_status.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import tempfile
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from huggingface_hub import HfApi
from slop_farmer.config import DatasetStatusOptions
from slop_farmer.data.hf_dataset_repo import (
list_remote_paths,
load_remote_file,
load_remote_json_file,
stable_snapshot_candidates,
)
from slop_farmer.data.parquet_io import read_json
from slop_farmer.data.snapshot_paths import (
CONTRIBUTOR_ARTIFACT_FILENAMES,
CURRENT_ANALYSIS_MANIFEST_PATH,
PR_SCOPE_CLUSTERS_FILENAME,
SNAPSHOTS_LATEST_PATH,
load_current_analysis_manifest,
repo_relative_path_to_local,
)
def _coerce_datetime(value: Any) -> datetime | None:
if not isinstance(value, str) or not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
def _age_summary(value: str | None) -> dict[str, Any]:
timestamp = _coerce_datetime(value)
if timestamp is None:
return {"seconds": None, "summary": "unknown", "staleness": "unknown"}
age_seconds = max(int((datetime.now(tz=UTC) - timestamp).total_seconds()), 0)
if age_seconds <= 6 * 3600:
staleness = "fresh"
elif age_seconds <= 24 * 3600:
staleness = "aging"
else:
staleness = "stale"
if age_seconds < 3600:
summary = f"{age_seconds // 60}m"
elif age_seconds < 24 * 3600:
summary = f"{age_seconds // 3600}h"
else:
summary = f"{age_seconds // 86400}d"
return {"seconds": age_seconds, "summary": summary, "staleness": staleness}
def _local_status(output_dir: Path) -> dict[str, Any] | None:
latest_path = output_dir.resolve() / "snapshots" / "latest.json"
if not latest_path.exists():
return None
payload = read_json(latest_path)
snapshot_dir_raw = payload.get("snapshot_dir")
manifest: dict[str, Any] = {}
snapshot_dir: Path | None = None
if isinstance(snapshot_dir_raw, str) and snapshot_dir_raw:
snapshot_dir = Path(snapshot_dir_raw).resolve()
manifest_path = snapshot_dir / "manifest.json"
if manifest_path.exists():
manifest = read_json(manifest_path)
current_analysis = _local_current_analysis(snapshot_dir)
return {
"latest_path": str(latest_path),
"latest_pointer": payload,
"snapshot_dir": snapshot_dir_raw,
"snapshot_id": manifest.get("snapshot_id") or payload.get("latest_snapshot_id"),
"current_analysis": current_analysis,
}
def _local_current_analysis(snapshot_dir: Path | None) -> dict[str, Any]:
if snapshot_dir is None:
return {"present": False}
manifest_path = repo_relative_path_to_local(snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH)
if not manifest_path.exists():
return {"present": False}
try:
manifest = load_current_analysis_manifest(manifest_path)
except ValueError as exc:
return {"present": True, "valid": False, "detail": str(exc)}
return {
"present": True,
"valid": True,
"snapshot_id": manifest["snapshot_id"],
"analysis_id": manifest["analysis_id"],
"variant": manifest["variant"],
"published_at": manifest["published_at"],
}
def _remote_status(repo_id: str, revision: str | None) -> dict[str, Any]:
api = HfApi()
with tempfile.TemporaryDirectory(prefix="slop-farmer-dataset-status-") as tmp:
root = Path(tmp)
remote_paths = list_remote_paths(api, repo_id, revision=revision)
latest_pointer = load_remote_json_file(
api,
repo_id,
SNAPSHOTS_LATEST_PATH,
root,
revision=revision,
)
watermark = load_remote_json_file(
api,
repo_id,
"state/watermark.json",
root,
revision=revision,
)
manifest = None
if latest_pointer is not None:
for candidate in stable_snapshot_candidates(latest_pointer, "manifest.json"):
downloaded = load_remote_file(
api,
repo_id,
candidate,
root,
revision=revision,
)
if downloaded is None:
continue
manifest = read_json(downloaded)
break
current_analysis = _remote_current_analysis(
api,
repo_id,
root,
revision=revision,
remote_paths=remote_paths,
latest_pointer=latest_pointer,
)
latest_snapshot_id = (
str(latest_pointer.get("latest_snapshot_id"))
if isinstance(latest_pointer, dict)
else None
)
archived_run_manifests = sorted(
path
for path in remote_paths
if path.startswith("snapshots/")
and "/analysis-runs/" in path
and path.endswith("/manifest.json")
)
current_snapshot_run_count = 0
if latest_snapshot_id:
current_snapshot_run_count = sum(
1
for path in archived_run_manifests
if path.startswith(f"snapshots/{latest_snapshot_id}/analysis-runs/")
)
extracted_at = manifest.get("extracted_at") if manifest else None
return {
"dataset_id": repo_id,
"revision": revision,
"latest_pointer": latest_pointer,
"watermark": watermark,
"manifest": manifest,
"cheap_artifacts": {
"pr_scope_clusters": _remote_has_latest_artifact(
remote_paths,
latest_pointer,
PR_SCOPE_CLUSTERS_FILENAME,
),
"contributors": all(
_remote_has_latest_artifact(remote_paths, latest_pointer, filename)
for filename in CONTRIBUTOR_ARTIFACT_FILENAMES
),
},
"current_analysis": current_analysis,
"archived_analysis_runs": {
"count": len(archived_run_manifests),
"current_snapshot_count": current_snapshot_run_count,
},
"remote_path_count": len(remote_paths),
"age": _age_summary(extracted_at),
}
def _remote_current_analysis(
api: HfApi,
repo_id: str,
root: Path,
*,
revision: str | None,
remote_paths: set[str],
latest_pointer: dict[str, Any] | None,
) -> dict[str, Any]:
if CURRENT_ANALYSIS_MANIFEST_PATH not in remote_paths:
return {"present": False}
downloaded = load_remote_file(
api,
repo_id,
CURRENT_ANALYSIS_MANIFEST_PATH,
root,
revision=revision,
)
if downloaded is None:
return {"present": False}
try:
manifest = load_current_analysis_manifest(downloaded)
except ValueError as exc:
return {"present": True, "valid": False, "detail": str(exc)}
latest_snapshot_id = (
str(latest_pointer.get("latest_snapshot_id")) if isinstance(latest_pointer, dict) else None
)
return {
"present": True,
"valid": True,
"snapshot_id": manifest["snapshot_id"],
"analysis_id": manifest["analysis_id"],
"variant": manifest["variant"],
"published_at": manifest["published_at"],
"matches_latest_snapshot": manifest["snapshot_id"] == latest_snapshot_id,
"artifact_count": len(manifest["artifacts"]),
}
def _remote_has_latest_artifact(
remote_paths: set[str],
latest_pointer: dict[str, Any] | None,
filename: str,
) -> bool:
candidates = stable_snapshot_candidates(latest_pointer, filename)
return any(candidate in remote_paths for candidate in candidates)
def get_dataset_status(options: DatasetStatusOptions) -> dict[str, Any]:
remote = _remote_status(options.hf_repo_id, options.hf_revision) if options.hf_repo_id else None
local = _local_status(options.output_dir)
repo = options.repo
if repo is None and remote and remote.get("manifest"):
repo = remote["manifest"].get("repo")
if repo is None and local and isinstance(local.get("latest_pointer"), dict):
repo = local["latest_pointer"].get("repo")
return {
"repo": repo,
"dataset_id": options.hf_repo_id,
"remote": remote,
"local": local,
}
def format_dataset_status(status: dict[str, Any]) -> str:
remote = status.get("remote") or {}
local = status.get("local") or {}
manifest = remote.get("manifest") or {}
watermark = remote.get("watermark") or {}
latest_pointer = remote.get("latest_pointer") or {}
age = remote.get("age") or {}
current_analysis = remote.get("current_analysis") or {}
cheap_artifacts = remote.get("cheap_artifacts") or {}
archived_runs = remote.get("archived_analysis_runs") or {}
lines = [
f"Repo: {status.get('repo') or '?'}",
f"Dataset: {status.get('dataset_id') or 'not configured'}",
]
if remote:
lines.extend(
[
f"Remote latest snapshot: {manifest.get('snapshot_id') or latest_pointer.get('latest_snapshot_id') or '?'}",
f"Remote extracted at: {manifest.get('extracted_at') or '?'}",
f"Remote next_since: {watermark.get('next_since') or latest_pointer.get('next_since') or '?'}",
f"PR scope artifact: {'yes' if cheap_artifacts.get('pr_scope_clusters') else 'no'}",
f"Contributor artifacts: {'yes' if cheap_artifacts.get('contributors') else 'no'}",
]
)
if current_analysis.get("present"):
if current_analysis.get("valid") is False:
lines.append(f"Current analysis: invalid ({current_analysis.get('detail')})")
else:
lines.append(
"Current analysis: "
f"snapshot={current_analysis.get('snapshot_id')} "
f"analysis_id={current_analysis.get('analysis_id')}"
)
lines.append(
"Current analysis matches latest snapshot: "
f"{'yes' if current_analysis.get('matches_latest_snapshot') else 'no'}"
)
else:
lines.append("Current analysis: none")
lines.append(
"Archived analysis runs: "
f"{archived_runs.get('count', 0)} total, {archived_runs.get('current_snapshot_count', 0)} for latest snapshot"
)
lines.append(
f"Freshness: {age.get('summary') or 'unknown'} ({age.get('staleness') or 'unknown'})"
)
if local:
lines.extend(
[
f"Local latest pointer: {local.get('latest_path')}",
f"Local snapshot id: {local.get('snapshot_id') or '?'}",
]
)
local_current_analysis = local.get("current_analysis") or {}
if local_current_analysis.get("present"):
lines.append(
"Local current analysis: "
f"snapshot={local_current_analysis.get('snapshot_id')} "
f"analysis_id={local_current_analysis.get('analysis_id')}"
)
else:
lines.append("Local current analysis: none")
else:
lines.append("Local latest pointer: none")
return "\n".join(lines)