diffusers-pr-api / src /slop_farmer /data /snapshot_paths.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import re
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from typing import Any
from slop_farmer.data.parquet_io import read_json
RAW_TABLE_FILENAMES: tuple[str, ...] = (
"issues.parquet",
"pull_requests.parquet",
"comments.parquet",
"reviews.parquet",
"review_comments.parquet",
"pr_files.parquet",
"pr_diffs.parquet",
"links.parquet",
"events.parquet",
)
VIEWER_SPLIT_FILENAMES: tuple[str, ...] = (
"issue_comments.parquet",
"pr_comments.parquet",
)
ROOT_MANIFEST_FILENAME = "manifest.json"
README_FILENAME = "README.md"
STATE_WATERMARK_PATH = "state/watermark.json"
SNAPSHOTS_LATEST_PATH = "snapshots/latest.json"
PR_SCOPE_CLUSTERS_FILENAME = "pr-scope-clusters.json"
NEW_CONTRIBUTORS_PARQUET_FILENAME = "new_contributors.parquet"
NEW_CONTRIBUTORS_REPORT_JSON_FILENAME = "new-contributors-report.json"
NEW_CONTRIBUTORS_REPORT_MARKDOWN_FILENAME = "new-contributors-report.md"
CONTRIBUTOR_ARTIFACT_FILENAMES: tuple[str, ...] = (
NEW_CONTRIBUTORS_PARQUET_FILENAME,
NEW_CONTRIBUTORS_REPORT_JSON_FILENAME,
NEW_CONTRIBUTORS_REPORT_MARKDOWN_FILENAME,
)
ANALYSIS_REPORT_FILENAME_BY_VARIANT: dict[str, str] = {
"deterministic": "analysis-report.json",
"hybrid": "analysis-report-hybrid.json",
}
HYBRID_ANALYSIS_REVIEWS_FILENAME = "analysis-report-hybrid.llm-reviews.json"
LEGACY_ANALYSIS_FILENAMES: tuple[str, ...] = (
ANALYSIS_REPORT_FILENAME_BY_VARIANT["deterministic"],
ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"],
HYBRID_ANALYSIS_REVIEWS_FILENAME,
)
CURRENT_ANALYSIS_DIR = PurePosixPath("analysis/current")
CURRENT_ANALYSIS_MANIFEST_PATH = str(CURRENT_ANALYSIS_DIR / ROOT_MANIFEST_FILENAME)
ANALYSIS_MANIFEST_SCHEMA_VERSION = 1
@dataclass(frozen=True, slots=True)
class ResolvedAnalysisReportPath:
path: Path
variant: str
source: str
snapshot_id: str | None = None
analysis_id: str | None = None
def default_hf_materialize_dir(output_dir: Path, repo_id: str, revision: str | None) -> Path:
suffix = repo_id.replace("/", "--")
if revision:
suffix = f"{suffix}--{revision.replace('/', '--')}"
return output_dir.resolve() / "snapshots" / f"hf-{suffix}"
def repo_relative_path_to_local(base_dir: Path, repo_relative_path: str) -> Path:
return base_dir.joinpath(*PurePosixPath(repo_relative_path).parts)
def snapshot_artifact_path(snapshot_id: str, filename: str) -> str:
return str(PurePosixPath("snapshots") / snapshot_id / filename)
def archived_snapshot_manifest_path(snapshot_id: str) -> str:
return snapshot_artifact_path(snapshot_id, ROOT_MANIFEST_FILENAME)
def analysis_run_artifact_path(snapshot_id: str, analysis_id: str, filename: str) -> str:
return str(PurePosixPath("snapshots") / snapshot_id / "analysis-runs" / analysis_id / filename)
def analysis_run_manifest_path(snapshot_id: str, analysis_id: str) -> str:
return analysis_run_artifact_path(snapshot_id, analysis_id, ROOT_MANIFEST_FILENAME)
def current_analysis_artifact_path(filename: str) -> str:
return str(CURRENT_ANALYSIS_DIR / filename)
def repo_key(repo_slug: str) -> str:
return _path_key(repo_slug)
def model_key(model: str) -> str:
return _path_key(model)
def build_current_analysis_manifest(
*,
repo: str,
snapshot_id: str,
analysis_id: str,
variant: str,
channel: str,
model: str | None,
published_at: str,
include_hybrid_reviews: bool,
) -> dict[str, Any]:
artifacts = {
"hybrid": current_analysis_artifact_path(ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"]),
}
archived_artifacts = {
"hybrid": analysis_run_artifact_path(
snapshot_id,
analysis_id,
ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"],
)
}
if include_hybrid_reviews:
artifacts["hybrid_reviews"] = current_analysis_artifact_path(
HYBRID_ANALYSIS_REVIEWS_FILENAME
)
archived_artifacts["hybrid_reviews"] = analysis_run_artifact_path(
snapshot_id,
analysis_id,
HYBRID_ANALYSIS_REVIEWS_FILENAME,
)
payload = {
"schema_version": ANALYSIS_MANIFEST_SCHEMA_VERSION,
"repo": repo,
"snapshot_id": snapshot_id,
"analysis_id": analysis_id,
"variant": variant,
"channel": channel,
"model": model,
"published_at": published_at,
"artifacts": artifacts,
"archived_artifacts": archived_artifacts,
}
return validate_current_analysis_manifest(payload)
def build_archived_analysis_run_manifest(
*,
repo: str,
snapshot_id: str,
analysis_id: str,
variant: str,
channel: str,
model: str | None,
published_at: str,
include_hybrid_reviews: bool,
) -> dict[str, Any]:
artifacts = {
"hybrid": analysis_run_artifact_path(
snapshot_id,
analysis_id,
ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"],
)
}
if include_hybrid_reviews:
artifacts["hybrid_reviews"] = analysis_run_artifact_path(
snapshot_id,
analysis_id,
HYBRID_ANALYSIS_REVIEWS_FILENAME,
)
payload = {
"schema_version": ANALYSIS_MANIFEST_SCHEMA_VERSION,
"repo": repo,
"snapshot_id": snapshot_id,
"analysis_id": analysis_id,
"variant": variant,
"channel": channel,
"model": model,
"published_at": published_at,
"artifacts": artifacts,
}
return validate_archived_analysis_run_manifest(payload)
def load_current_analysis_manifest(path: Path) -> dict[str, Any]:
payload = read_json(path)
if not isinstance(payload, dict):
raise ValueError(f"Current analysis manifest at {path} must contain a JSON object.")
return validate_current_analysis_manifest(payload)
def load_archived_analysis_run_manifest(path: Path) -> dict[str, Any]:
payload = read_json(path)
if not isinstance(payload, dict):
raise ValueError(f"Archived analysis manifest at {path} must contain a JSON object.")
return validate_archived_analysis_run_manifest(payload)
def resolve_default_dashboard_analysis_report(
snapshot_dir: Path,
) -> ResolvedAnalysisReportPath | None:
current = resolve_current_analysis_report(snapshot_dir)
if current is not None and _analysis_matches_snapshot(snapshot_dir, current):
return current
return resolve_snapshot_local_analysis_report(snapshot_dir, variant="auto")
def resolve_current_analysis_report(
snapshot_dir: Path,
*,
variant: str = "auto",
) -> ResolvedAnalysisReportPath | None:
normalized = _normalize_analysis_variant(variant)
manifest_path = repo_relative_path_to_local(snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH)
if not manifest_path.exists():
return None
manifest = load_current_analysis_manifest(manifest_path)
artifact_key = _analysis_artifact_key_for_variant(normalized, manifest_kind="current")
artifact_path = manifest.get("artifacts", {}).get(artifact_key)
if not isinstance(artifact_path, str) or not artifact_path:
message = (
f"Published current analysis manifest does not provide the {normalized} artifact."
if normalized != "auto"
else "Published current analysis manifest does not provide the canonical hybrid artifact."
)
raise ValueError(message)
report_path = repo_relative_path_to_local(snapshot_dir, artifact_path)
if not report_path.exists():
raise ValueError(
f"Published current analysis artifact {artifact_path!r} is missing from the materialized snapshot."
)
return ResolvedAnalysisReportPath(
path=report_path,
variant="hybrid" if artifact_key == "hybrid" else normalized,
source="current",
snapshot_id=str(manifest["snapshot_id"]),
analysis_id=str(manifest["analysis_id"]),
)
def resolve_snapshot_local_analysis_report(
snapshot_dir: Path,
*,
variant: str = "auto",
) -> ResolvedAnalysisReportPath | None:
normalized = _normalize_analysis_variant(variant)
if normalized == "auto":
hybrid_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"]
if hybrid_path.exists():
return ResolvedAnalysisReportPath(
path=hybrid_path,
variant="hybrid",
source="snapshot",
)
deterministic_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["deterministic"]
if deterministic_path.exists():
return ResolvedAnalysisReportPath(
path=deterministic_path,
variant="deterministic",
source="snapshot",
)
return None
report_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT[normalized]
if not report_path.exists():
return None
return ResolvedAnalysisReportPath(
path=report_path,
variant=normalized,
source="snapshot",
)
def validate_current_analysis_manifest(payload: dict[str, Any]) -> dict[str, Any]:
validated = _validate_analysis_manifest(payload, require_archived_artifacts=True)
archived_artifacts = _validate_artifacts(
dict(validated["archived_artifacts"]),
expected_prefix=analysis_run_artifact_path(
str(validated["snapshot_id"]),
str(validated["analysis_id"]),
"",
),
)
if set(archived_artifacts) != set(validated["artifacts"]):
raise ValueError("Current analysis manifest artifacts and archived_artifacts must match.")
validated["archived_artifacts"] = archived_artifacts
return validated
def validate_archived_analysis_run_manifest(payload: dict[str, Any]) -> dict[str, Any]:
return _validate_analysis_manifest(payload, require_archived_artifacts=False)
def load_latest_snapshot_pointer(snapshots_root: Path) -> Path | None:
resolved_snapshots_root = snapshots_root.resolve()
latest_path = resolved_snapshots_root / "latest.json"
if not latest_path.exists():
return None
payload = read_json(latest_path)
snapshot_dir = payload.get("snapshot_dir")
if isinstance(snapshot_dir, str) and snapshot_dir:
path = Path(snapshot_dir)
if path.is_absolute():
return path.resolve()
return (resolved_snapshots_root.parent / path).resolve()
return None
def resolve_snapshot_dir_from_output(output_dir: Path, snapshot_dir: Path | None) -> Path:
return resolve_snapshot_dir_from_snapshots_root(
output_dir.resolve() / "snapshots", snapshot_dir
)
def resolve_snapshot_dir_from_snapshots_root(
snapshots_root: Path,
snapshot_dir: Path | None,
) -> Path:
if snapshot_dir is not None:
return snapshot_dir.resolve()
resolved_snapshots_root = snapshots_root.resolve()
latest_path = resolved_snapshots_root / "latest.json"
latest_snapshot_dir = load_latest_snapshot_pointer(resolved_snapshots_root)
if latest_snapshot_dir is not None:
return latest_snapshot_dir
snapshot_dirs = sorted(path for path in resolved_snapshots_root.glob("*") if path.is_dir())
if snapshot_dirs:
return snapshot_dirs[-1].resolve()
raise FileNotFoundError(f"Could not resolve a snapshot directory from {latest_path}")
def _validate_analysis_manifest(
payload: dict[str, Any],
*,
require_archived_artifacts: bool,
) -> dict[str, Any]:
validated = {str(key): value for key, value in payload.items()}
if validated.get("schema_version") != ANALYSIS_MANIFEST_SCHEMA_VERSION:
raise ValueError(
f"Unsupported analysis manifest schema version: {validated.get('schema_version')!r}"
)
for field in ("repo", "snapshot_id", "analysis_id", "variant", "channel", "published_at"):
if not isinstance(validated.get(field), str) or not str(validated[field]).strip():
raise ValueError(f"Analysis manifest field {field!r} must be a non-empty string.")
validated[field] = str(validated[field]).strip()
model = validated.get("model")
if model is not None and not isinstance(model, str):
raise ValueError("Analysis manifest field 'model' must be a string when present.")
artifacts = validated.get("artifacts")
if not isinstance(artifacts, dict):
raise ValueError("Analysis manifest field 'artifacts' must be an object.")
expected_prefix = (
current_analysis_artifact_path("")
if require_archived_artifacts
else analysis_run_artifact_path(
str(validated["snapshot_id"]),
str(validated["analysis_id"]),
"",
)
)
validated["artifacts"] = _validate_artifacts(dict(artifacts), expected_prefix=expected_prefix)
if require_archived_artifacts:
archived_artifacts = validated.get("archived_artifacts")
if not isinstance(archived_artifacts, dict):
raise ValueError(
"Current analysis manifest field 'archived_artifacts' must be an object."
)
validated["archived_artifacts"] = {
str(key): value for key, value in archived_artifacts.items()
}
return validated
def _validate_artifacts(artifacts: dict[str, Any], *, expected_prefix: str) -> dict[str, str]:
normalized = {str(key): value for key, value in artifacts.items()}
hybrid_path = normalized.get("hybrid")
if not isinstance(hybrid_path, str) or not hybrid_path:
raise ValueError("Analysis manifest must include artifacts.hybrid.")
validated = {"hybrid": hybrid_path}
hybrid_reviews_path = normalized.get("hybrid_reviews")
if hybrid_reviews_path is not None:
if not isinstance(hybrid_reviews_path, str) or not hybrid_reviews_path:
raise ValueError(
"Analysis manifest artifacts.hybrid_reviews must be a non-empty string."
)
validated["hybrid_reviews"] = hybrid_reviews_path
for key, value in validated.items():
if not value.startswith(expected_prefix):
raise ValueError(
f"Analysis manifest artifact {key!r} must live under {expected_prefix!r}, got {value!r}."
)
return validated
def _path_key(value: str) -> str:
normalized = re.sub(r"[^a-z0-9]+", "-", value.strip().lower())
normalized = re.sub(r"-+", "-", normalized).strip("-")
if not normalized:
raise ValueError("Expected a non-empty path key value.")
return normalized
def _analysis_matches_snapshot(
snapshot_dir: Path,
analysis_path: ResolvedAnalysisReportPath,
) -> bool:
snapshot_manifest_path = snapshot_dir / ROOT_MANIFEST_FILENAME
if snapshot_manifest_path.exists():
snapshot_manifest = read_json(snapshot_manifest_path)
snapshot_id = snapshot_manifest.get("snapshot_id")
if snapshot_id is not None:
return str(snapshot_id) == str(analysis_path.snapshot_id)
return snapshot_dir.name == str(analysis_path.snapshot_id)
def _normalize_analysis_variant(variant: str) -> str:
normalized = variant.strip().lower()
if normalized not in {"auto", "deterministic", "hybrid"}:
raise ValueError(
f"Unsupported analysis variant {variant!r}; expected auto, hybrid, or deterministic."
)
return normalized
def _analysis_artifact_key_for_variant(variant: str, *, manifest_kind: str) -> str:
if variant in {"auto", "hybrid"}:
return "hybrid"
raise ValueError(
f"Published {manifest_kind} analysis only serves canonical hybrid artifacts; requested {variant!r}."
)