Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| from collections.abc import Callable, Iterable | |
| from dataclasses import dataclass | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from typing import Any, Protocol, cast | |
| from huggingface_hub import CommitOperationAdd, HfApi, hf_hub_download | |
| from slop_farmer.app.save_cache import _save_analysis_cache_api | |
| from slop_farmer.config import PublishAnalysisArtifactsOptions | |
| from slop_farmer.data.parquet_io import read_json | |
| from slop_farmer.data.snapshot_paths import ( | |
| ANALYSIS_REPORT_FILENAME_BY_VARIANT, | |
| HYBRID_ANALYSIS_REVIEWS_FILENAME, | |
| ROOT_MANIFEST_FILENAME, | |
| analysis_run_artifact_path, | |
| analysis_run_manifest_path, | |
| archived_snapshot_manifest_path, | |
| build_archived_analysis_run_manifest, | |
| build_current_analysis_manifest, | |
| current_analysis_artifact_path, | |
| resolve_snapshot_dir_from_output, | |
| ) | |
| class HubApiLike(Protocol): | |
| def create_repo( | |
| self, | |
| repo_id: str, | |
| *, | |
| repo_type: str, | |
| private: bool, | |
| exist_ok: bool, | |
| ) -> None: ... | |
| def create_commit( | |
| self, | |
| repo_id: str, | |
| operations: Iterable[CommitOperationAdd], | |
| *, | |
| commit_message: str, | |
| repo_type: str, | |
| ) -> Any: ... | |
| def upload_folder( | |
| self, | |
| *, | |
| repo_id: str, | |
| folder_path: Path, | |
| path_in_repo: str, | |
| repo_type: str, | |
| commit_message: str, | |
| ) -> None: ... | |
| class PublishableAnalysisArtifacts: | |
| repo: str | |
| snapshot_id: str | |
| model: str | None | |
| report_path: Path | |
| reviews_path: Path | None | |
| report_payload: dict[str, Any] | |
| def run_publish_analysis_artifacts(options: PublishAnalysisArtifactsOptions) -> dict[str, Any]: | |
| snapshot_dir = resolve_snapshot_dir_from_output(options.output_dir, options.snapshot_dir) | |
| return publish_analysis_artifacts( | |
| snapshot_dir=snapshot_dir, | |
| analysis_input=options.analysis_input, | |
| hf_repo_id=options.hf_repo_id, | |
| analysis_id=options.analysis_id, | |
| canonical=options.canonical, | |
| save_cache=options.save_cache, | |
| private=options.private_hf_repo, | |
| ) | |
| def publish_analysis_artifacts( | |
| *, | |
| snapshot_dir: Path, | |
| analysis_input: Path | None, | |
| hf_repo_id: str, | |
| analysis_id: str, | |
| canonical: bool, | |
| private: bool, | |
| save_cache: bool = False, | |
| log: Callable[[str], None] | None = None, | |
| ) -> dict[str, Any]: | |
| return _publish_analysis_artifacts_api( | |
| cast("HubApiLike", HfApi()), | |
| snapshot_dir=snapshot_dir, | |
| analysis_input=analysis_input, | |
| hf_repo_id=hf_repo_id, | |
| analysis_id=analysis_id, | |
| canonical=canonical, | |
| private=private, | |
| save_cache=save_cache, | |
| log=log, | |
| ) | |
| def _publish_analysis_artifacts_api( | |
| api: HubApiLike, | |
| *, | |
| snapshot_dir: Path, | |
| analysis_input: Path | None = None, | |
| hf_repo_id: str, | |
| analysis_id: str, | |
| canonical: bool, | |
| private: bool, | |
| save_cache: bool = False, | |
| log: Callable[[str], None] | None = None, | |
| ) -> dict[str, Any]: | |
| artifacts = _discover_publishable_analysis(snapshot_dir, analysis_input=analysis_input) | |
| published_at = _iso_now() | |
| channel = "canonical" if canonical else "comparison" | |
| archived_manifest = build_archived_analysis_run_manifest( | |
| repo=artifacts.repo, | |
| snapshot_id=artifacts.snapshot_id, | |
| analysis_id=analysis_id, | |
| variant="hybrid", | |
| channel=channel, | |
| model=artifacts.model, | |
| published_at=published_at, | |
| include_hybrid_reviews=artifacts.reviews_path is not None, | |
| ) | |
| current_manifest = ( | |
| build_current_analysis_manifest( | |
| repo=artifacts.repo, | |
| snapshot_id=artifacts.snapshot_id, | |
| analysis_id=analysis_id, | |
| variant="hybrid", | |
| channel=channel, | |
| model=artifacts.model, | |
| published_at=published_at, | |
| include_hybrid_reviews=artifacts.reviews_path is not None, | |
| ) | |
| if canonical | |
| else None | |
| ) | |
| snapshot_manifest = _updated_snapshot_manifest( | |
| snapshot_dir=snapshot_dir, | |
| hf_repo_id=hf_repo_id, | |
| snapshot_id=artifacts.snapshot_id, | |
| analysis_id=analysis_id, | |
| archived_manifest=archived_manifest, | |
| canonical=canonical, | |
| ) | |
| operations = _commit_operations( | |
| artifacts=artifacts, | |
| analysis_id=analysis_id, | |
| archived_manifest=archived_manifest, | |
| current_manifest=current_manifest, | |
| snapshot_manifest=snapshot_manifest, | |
| ) | |
| if log: | |
| log(f"Ensuring Hub dataset repo exists: {hf_repo_id}") | |
| api.create_repo(hf_repo_id, repo_type="dataset", private=private, exist_ok=True) | |
| if log: | |
| log(f"Publishing analysis {analysis_id} for snapshot {artifacts.snapshot_id}") | |
| api.create_commit( | |
| hf_repo_id, | |
| operations, | |
| commit_message=f"Publish analysis {analysis_id} for snapshot {artifacts.snapshot_id}", | |
| repo_type="dataset", | |
| ) | |
| cache_result = ( | |
| _save_analysis_cache_api( | |
| api, | |
| snapshot_dir=snapshot_dir, | |
| hf_repo_id=hf_repo_id, | |
| private=private, | |
| log=log, | |
| ) | |
| if save_cache | |
| else None | |
| ) | |
| result: dict[str, Any] = { | |
| "repo": artifacts.repo, | |
| "dataset_id": hf_repo_id, | |
| "snapshot_id": artifacts.snapshot_id, | |
| "analysis_id": analysis_id, | |
| "canonical": canonical, | |
| "save_cache": save_cache, | |
| "published_at": published_at, | |
| "artifact_paths": [operation.path_in_repo for operation in operations], | |
| } | |
| if cache_result is not None: | |
| result["cache"] = cache_result | |
| if log: | |
| log(f"Published analysis artifacts to {hf_repo_id}") | |
| return result | |
| def _discover_publishable_analysis( | |
| snapshot_dir: Path, *, analysis_input: Path | None | |
| ) -> PublishableAnalysisArtifacts: | |
| manifest_path = snapshot_dir / ROOT_MANIFEST_FILENAME | |
| if not manifest_path.exists(): | |
| raise FileNotFoundError(f"Snapshot manifest is missing: {manifest_path}") | |
| manifest = read_json(manifest_path) | |
| if not isinstance(manifest, dict): | |
| raise ValueError(f"Snapshot manifest at {manifest_path} must contain a JSON object.") | |
| snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name).strip() | |
| repo = str(manifest.get("repo") or "").strip() | |
| if not repo: | |
| raise ValueError(f"Snapshot manifest at {manifest_path} does not define repo.") | |
| report_path = ( | |
| analysis_input.resolve() | |
| if analysis_input is not None | |
| else snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"] | |
| ) | |
| if not report_path.exists(): | |
| raise FileNotFoundError(f"Hybrid analysis report is missing: {report_path}") | |
| report_payload = read_json(report_path) | |
| if not isinstance(report_payload, dict): | |
| raise ValueError(f"Hybrid analysis report at {report_path} must contain a JSON object.") | |
| report_snapshot_id = str(report_payload.get("snapshot_id") or snapshot_id).strip() | |
| if report_snapshot_id != snapshot_id: | |
| raise ValueError( | |
| f"Hybrid analysis report snapshot_id {report_snapshot_id!r} does not match manifest snapshot_id {snapshot_id!r}." | |
| ) | |
| report_repo = str(report_payload.get("repo") or repo).strip() | |
| if report_repo != repo: | |
| raise ValueError( | |
| f"Hybrid analysis report repo {report_repo!r} does not match manifest repo {repo!r}." | |
| ) | |
| model = report_payload.get("model") | |
| if model is not None: | |
| model = str(model) | |
| reviews_path = report_path.with_name(f"{report_path.stem}.llm-reviews.json") | |
| return PublishableAnalysisArtifacts( | |
| repo=repo, | |
| snapshot_id=snapshot_id, | |
| model=model, | |
| report_path=report_path, | |
| reviews_path=reviews_path if reviews_path.exists() else None, | |
| report_payload={str(key): value for key, value in report_payload.items()}, | |
| ) | |
| def _updated_snapshot_manifest( | |
| *, | |
| snapshot_dir: Path, | |
| hf_repo_id: str, | |
| snapshot_id: str, | |
| analysis_id: str, | |
| archived_manifest: dict[str, Any], | |
| canonical: bool, | |
| ) -> dict[str, Any]: | |
| manifest = _load_remote_snapshot_manifest(hf_repo_id, snapshot_id) or read_json( | |
| snapshot_dir / ROOT_MANIFEST_FILENAME | |
| ) | |
| if not isinstance(manifest, dict): | |
| raise ValueError("Archived snapshot manifest must contain a JSON object.") | |
| updated = {str(key): value for key, value in manifest.items()} | |
| published_analysis: dict[str, Any] | Any = updated.get("published_analysis") | |
| if not isinstance(published_analysis, dict): | |
| published_analysis = {"schema_version": 1, "runs": {}} | |
| runs: dict[str, Any] | Any = published_analysis.get("runs") | |
| if not isinstance(runs, dict): | |
| runs = {} | |
| runs[analysis_id] = { | |
| "analysis_id": analysis_id, | |
| "variant": archived_manifest["variant"], | |
| "channel": archived_manifest["channel"], | |
| "model": archived_manifest.get("model"), | |
| "published_at": archived_manifest["published_at"], | |
| "manifest_path": analysis_run_manifest_path(snapshot_id, analysis_id), | |
| "artifacts": archived_manifest["artifacts"], | |
| } | |
| published_analysis["schema_version"] = 1 | |
| published_analysis["runs"] = runs | |
| if canonical: | |
| published_analysis["canonical_analysis_id"] = analysis_id | |
| updated["published_analysis"] = published_analysis | |
| return updated | |
| def _load_remote_snapshot_manifest(hf_repo_id: str, snapshot_id: str) -> dict[str, Any] | None: | |
| try: | |
| downloaded = hf_hub_download( | |
| repo_id=hf_repo_id, | |
| repo_type="dataset", | |
| filename=archived_snapshot_manifest_path(snapshot_id), | |
| ) | |
| except Exception: | |
| return None | |
| payload = json.loads(Path(downloaded).read_text(encoding="utf-8")) | |
| return payload if isinstance(payload, dict) else None | |
| def _commit_operations( | |
| *, | |
| artifacts: PublishableAnalysisArtifacts, | |
| analysis_id: str, | |
| archived_manifest: dict[str, Any], | |
| current_manifest: dict[str, Any] | None, | |
| snapshot_manifest: dict[str, Any], | |
| ) -> list[CommitOperationAdd]: | |
| report_filename = ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"] | |
| operations = [ | |
| CommitOperationAdd( | |
| path_in_repo=analysis_run_artifact_path( | |
| artifacts.snapshot_id, | |
| analysis_id, | |
| report_filename, | |
| ), | |
| path_or_fileobj=artifacts.report_path, | |
| ), | |
| CommitOperationAdd( | |
| path_in_repo=analysis_run_manifest_path(artifacts.snapshot_id, analysis_id), | |
| path_or_fileobj=_json_bytes(archived_manifest), | |
| ), | |
| CommitOperationAdd( | |
| path_in_repo=archived_snapshot_manifest_path(artifacts.snapshot_id), | |
| path_or_fileobj=_json_bytes(snapshot_manifest), | |
| ), | |
| ] | |
| if artifacts.reviews_path is not None: | |
| operations.append( | |
| CommitOperationAdd( | |
| path_in_repo=analysis_run_artifact_path( | |
| artifacts.snapshot_id, | |
| analysis_id, | |
| HYBRID_ANALYSIS_REVIEWS_FILENAME, | |
| ), | |
| path_or_fileobj=artifacts.reviews_path, | |
| ) | |
| ) | |
| if current_manifest is not None: | |
| operations.extend( | |
| [ | |
| CommitOperationAdd( | |
| path_in_repo=current_analysis_artifact_path(report_filename), | |
| path_or_fileobj=artifacts.report_path, | |
| ), | |
| CommitOperationAdd( | |
| path_in_repo=current_analysis_artifact_path(ROOT_MANIFEST_FILENAME), | |
| path_or_fileobj=_json_bytes(current_manifest), | |
| ), | |
| ] | |
| ) | |
| if artifacts.reviews_path is not None: | |
| operations.append( | |
| CommitOperationAdd( | |
| path_in_repo=current_analysis_artifact_path(HYBRID_ANALYSIS_REVIEWS_FILENAME), | |
| path_or_fileobj=artifacts.reviews_path, | |
| ) | |
| ) | |
| return operations | |
| def _json_bytes(payload: dict[str, Any]) -> bytes: | |
| return (json.dumps(payload, indent=2, sort_keys=True) + "\n").encode("utf-8") | |
| def _iso_now() -> str: | |
| return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") | |