diffusers-pr-api / src /slop_farmer /app /publish_analysis.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Protocol, cast
from huggingface_hub import CommitOperationAdd, HfApi, hf_hub_download
from slop_farmer.app.save_cache import _save_analysis_cache_api
from slop_farmer.config import PublishAnalysisArtifactsOptions
from slop_farmer.data.parquet_io import read_json
from slop_farmer.data.snapshot_paths import (
ANALYSIS_REPORT_FILENAME_BY_VARIANT,
HYBRID_ANALYSIS_REVIEWS_FILENAME,
ROOT_MANIFEST_FILENAME,
analysis_run_artifact_path,
analysis_run_manifest_path,
archived_snapshot_manifest_path,
build_archived_analysis_run_manifest,
build_current_analysis_manifest,
current_analysis_artifact_path,
resolve_snapshot_dir_from_output,
)
class HubApiLike(Protocol):
def create_repo(
self,
repo_id: str,
*,
repo_type: str,
private: bool,
exist_ok: bool,
) -> None: ...
def create_commit(
self,
repo_id: str,
operations: Iterable[CommitOperationAdd],
*,
commit_message: str,
repo_type: str,
) -> Any: ...
def upload_folder(
self,
*,
repo_id: str,
folder_path: Path,
path_in_repo: str,
repo_type: str,
commit_message: str,
) -> None: ...
@dataclass(frozen=True, slots=True)
class PublishableAnalysisArtifacts:
repo: str
snapshot_id: str
model: str | None
report_path: Path
reviews_path: Path | None
report_payload: dict[str, Any]
def run_publish_analysis_artifacts(options: PublishAnalysisArtifactsOptions) -> dict[str, Any]:
snapshot_dir = resolve_snapshot_dir_from_output(options.output_dir, options.snapshot_dir)
return publish_analysis_artifacts(
snapshot_dir=snapshot_dir,
analysis_input=options.analysis_input,
hf_repo_id=options.hf_repo_id,
analysis_id=options.analysis_id,
canonical=options.canonical,
save_cache=options.save_cache,
private=options.private_hf_repo,
)
def publish_analysis_artifacts(
*,
snapshot_dir: Path,
analysis_input: Path | None,
hf_repo_id: str,
analysis_id: str,
canonical: bool,
private: bool,
save_cache: bool = False,
log: Callable[[str], None] | None = None,
) -> dict[str, Any]:
return _publish_analysis_artifacts_api(
cast("HubApiLike", HfApi()),
snapshot_dir=snapshot_dir,
analysis_input=analysis_input,
hf_repo_id=hf_repo_id,
analysis_id=analysis_id,
canonical=canonical,
private=private,
save_cache=save_cache,
log=log,
)
def _publish_analysis_artifacts_api(
api: HubApiLike,
*,
snapshot_dir: Path,
analysis_input: Path | None = None,
hf_repo_id: str,
analysis_id: str,
canonical: bool,
private: bool,
save_cache: bool = False,
log: Callable[[str], None] | None = None,
) -> dict[str, Any]:
artifacts = _discover_publishable_analysis(snapshot_dir, analysis_input=analysis_input)
published_at = _iso_now()
channel = "canonical" if canonical else "comparison"
archived_manifest = build_archived_analysis_run_manifest(
repo=artifacts.repo,
snapshot_id=artifacts.snapshot_id,
analysis_id=analysis_id,
variant="hybrid",
channel=channel,
model=artifacts.model,
published_at=published_at,
include_hybrid_reviews=artifacts.reviews_path is not None,
)
current_manifest = (
build_current_analysis_manifest(
repo=artifacts.repo,
snapshot_id=artifacts.snapshot_id,
analysis_id=analysis_id,
variant="hybrid",
channel=channel,
model=artifacts.model,
published_at=published_at,
include_hybrid_reviews=artifacts.reviews_path is not None,
)
if canonical
else None
)
snapshot_manifest = _updated_snapshot_manifest(
snapshot_dir=snapshot_dir,
hf_repo_id=hf_repo_id,
snapshot_id=artifacts.snapshot_id,
analysis_id=analysis_id,
archived_manifest=archived_manifest,
canonical=canonical,
)
operations = _commit_operations(
artifacts=artifacts,
analysis_id=analysis_id,
archived_manifest=archived_manifest,
current_manifest=current_manifest,
snapshot_manifest=snapshot_manifest,
)
if log:
log(f"Ensuring Hub dataset repo exists: {hf_repo_id}")
api.create_repo(hf_repo_id, repo_type="dataset", private=private, exist_ok=True)
if log:
log(f"Publishing analysis {analysis_id} for snapshot {artifacts.snapshot_id}")
api.create_commit(
hf_repo_id,
operations,
commit_message=f"Publish analysis {analysis_id} for snapshot {artifacts.snapshot_id}",
repo_type="dataset",
)
cache_result = (
_save_analysis_cache_api(
api,
snapshot_dir=snapshot_dir,
hf_repo_id=hf_repo_id,
private=private,
log=log,
)
if save_cache
else None
)
result: dict[str, Any] = {
"repo": artifacts.repo,
"dataset_id": hf_repo_id,
"snapshot_id": artifacts.snapshot_id,
"analysis_id": analysis_id,
"canonical": canonical,
"save_cache": save_cache,
"published_at": published_at,
"artifact_paths": [operation.path_in_repo for operation in operations],
}
if cache_result is not None:
result["cache"] = cache_result
if log:
log(f"Published analysis artifacts to {hf_repo_id}")
return result
def _discover_publishable_analysis(
snapshot_dir: Path, *, analysis_input: Path | None
) -> PublishableAnalysisArtifacts:
manifest_path = snapshot_dir / ROOT_MANIFEST_FILENAME
if not manifest_path.exists():
raise FileNotFoundError(f"Snapshot manifest is missing: {manifest_path}")
manifest = read_json(manifest_path)
if not isinstance(manifest, dict):
raise ValueError(f"Snapshot manifest at {manifest_path} must contain a JSON object.")
snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name).strip()
repo = str(manifest.get("repo") or "").strip()
if not repo:
raise ValueError(f"Snapshot manifest at {manifest_path} does not define repo.")
report_path = (
analysis_input.resolve()
if analysis_input is not None
else snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"]
)
if not report_path.exists():
raise FileNotFoundError(f"Hybrid analysis report is missing: {report_path}")
report_payload = read_json(report_path)
if not isinstance(report_payload, dict):
raise ValueError(f"Hybrid analysis report at {report_path} must contain a JSON object.")
report_snapshot_id = str(report_payload.get("snapshot_id") or snapshot_id).strip()
if report_snapshot_id != snapshot_id:
raise ValueError(
f"Hybrid analysis report snapshot_id {report_snapshot_id!r} does not match manifest snapshot_id {snapshot_id!r}."
)
report_repo = str(report_payload.get("repo") or repo).strip()
if report_repo != repo:
raise ValueError(
f"Hybrid analysis report repo {report_repo!r} does not match manifest repo {repo!r}."
)
model = report_payload.get("model")
if model is not None:
model = str(model)
reviews_path = report_path.with_name(f"{report_path.stem}.llm-reviews.json")
return PublishableAnalysisArtifacts(
repo=repo,
snapshot_id=snapshot_id,
model=model,
report_path=report_path,
reviews_path=reviews_path if reviews_path.exists() else None,
report_payload={str(key): value for key, value in report_payload.items()},
)
def _updated_snapshot_manifest(
*,
snapshot_dir: Path,
hf_repo_id: str,
snapshot_id: str,
analysis_id: str,
archived_manifest: dict[str, Any],
canonical: bool,
) -> dict[str, Any]:
manifest = _load_remote_snapshot_manifest(hf_repo_id, snapshot_id) or read_json(
snapshot_dir / ROOT_MANIFEST_FILENAME
)
if not isinstance(manifest, dict):
raise ValueError("Archived snapshot manifest must contain a JSON object.")
updated = {str(key): value for key, value in manifest.items()}
published_analysis: dict[str, Any] | Any = updated.get("published_analysis")
if not isinstance(published_analysis, dict):
published_analysis = {"schema_version": 1, "runs": {}}
runs: dict[str, Any] | Any = published_analysis.get("runs")
if not isinstance(runs, dict):
runs = {}
runs[analysis_id] = {
"analysis_id": analysis_id,
"variant": archived_manifest["variant"],
"channel": archived_manifest["channel"],
"model": archived_manifest.get("model"),
"published_at": archived_manifest["published_at"],
"manifest_path": analysis_run_manifest_path(snapshot_id, analysis_id),
"artifacts": archived_manifest["artifacts"],
}
published_analysis["schema_version"] = 1
published_analysis["runs"] = runs
if canonical:
published_analysis["canonical_analysis_id"] = analysis_id
updated["published_analysis"] = published_analysis
return updated
def _load_remote_snapshot_manifest(hf_repo_id: str, snapshot_id: str) -> dict[str, Any] | None:
try:
downloaded = hf_hub_download(
repo_id=hf_repo_id,
repo_type="dataset",
filename=archived_snapshot_manifest_path(snapshot_id),
)
except Exception:
return None
payload = json.loads(Path(downloaded).read_text(encoding="utf-8"))
return payload if isinstance(payload, dict) else None
def _commit_operations(
*,
artifacts: PublishableAnalysisArtifacts,
analysis_id: str,
archived_manifest: dict[str, Any],
current_manifest: dict[str, Any] | None,
snapshot_manifest: dict[str, Any],
) -> list[CommitOperationAdd]:
report_filename = ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"]
operations = [
CommitOperationAdd(
path_in_repo=analysis_run_artifact_path(
artifacts.snapshot_id,
analysis_id,
report_filename,
),
path_or_fileobj=artifacts.report_path,
),
CommitOperationAdd(
path_in_repo=analysis_run_manifest_path(artifacts.snapshot_id, analysis_id),
path_or_fileobj=_json_bytes(archived_manifest),
),
CommitOperationAdd(
path_in_repo=archived_snapshot_manifest_path(artifacts.snapshot_id),
path_or_fileobj=_json_bytes(snapshot_manifest),
),
]
if artifacts.reviews_path is not None:
operations.append(
CommitOperationAdd(
path_in_repo=analysis_run_artifact_path(
artifacts.snapshot_id,
analysis_id,
HYBRID_ANALYSIS_REVIEWS_FILENAME,
),
path_or_fileobj=artifacts.reviews_path,
)
)
if current_manifest is not None:
operations.extend(
[
CommitOperationAdd(
path_in_repo=current_analysis_artifact_path(report_filename),
path_or_fileobj=artifacts.report_path,
),
CommitOperationAdd(
path_in_repo=current_analysis_artifact_path(ROOT_MANIFEST_FILENAME),
path_or_fileobj=_json_bytes(current_manifest),
),
]
)
if artifacts.reviews_path is not None:
operations.append(
CommitOperationAdd(
path_in_repo=current_analysis_artifact_path(HYBRID_ANALYSIS_REVIEWS_FILENAME),
path_or_fileobj=artifacts.reviews_path,
)
)
return operations
def _json_bytes(payload: dict[str, Any]) -> bytes:
return (json.dumps(payload, indent=2, sort_keys=True) + "\n").encode("utf-8")
def _iso_now() -> str:
return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")