Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import re | |
| from dataclasses import dataclass | |
| from pathlib import Path, PurePosixPath | |
| from typing import Any | |
| from slop_farmer.data.parquet_io import read_json | |
| RAW_TABLE_FILENAMES: tuple[str, ...] = ( | |
| "issues.parquet", | |
| "pull_requests.parquet", | |
| "comments.parquet", | |
| "reviews.parquet", | |
| "review_comments.parquet", | |
| "pr_files.parquet", | |
| "pr_diffs.parquet", | |
| "links.parquet", | |
| "events.parquet", | |
| ) | |
| VIEWER_SPLIT_FILENAMES: tuple[str, ...] = ( | |
| "issue_comments.parquet", | |
| "pr_comments.parquet", | |
| ) | |
| ROOT_MANIFEST_FILENAME = "manifest.json" | |
| README_FILENAME = "README.md" | |
| STATE_WATERMARK_PATH = "state/watermark.json" | |
| SNAPSHOTS_LATEST_PATH = "snapshots/latest.json" | |
| PR_SCOPE_CLUSTERS_FILENAME = "pr-scope-clusters.json" | |
| NEW_CONTRIBUTORS_PARQUET_FILENAME = "new_contributors.parquet" | |
| NEW_CONTRIBUTORS_REPORT_JSON_FILENAME = "new-contributors-report.json" | |
| NEW_CONTRIBUTORS_REPORT_MARKDOWN_FILENAME = "new-contributors-report.md" | |
| CONTRIBUTOR_ARTIFACT_FILENAMES: tuple[str, ...] = ( | |
| NEW_CONTRIBUTORS_PARQUET_FILENAME, | |
| NEW_CONTRIBUTORS_REPORT_JSON_FILENAME, | |
| NEW_CONTRIBUTORS_REPORT_MARKDOWN_FILENAME, | |
| ) | |
| ANALYSIS_REPORT_FILENAME_BY_VARIANT: dict[str, str] = { | |
| "deterministic": "analysis-report.json", | |
| "hybrid": "analysis-report-hybrid.json", | |
| } | |
| HYBRID_ANALYSIS_REVIEWS_FILENAME = "analysis-report-hybrid.llm-reviews.json" | |
| LEGACY_ANALYSIS_FILENAMES: tuple[str, ...] = ( | |
| ANALYSIS_REPORT_FILENAME_BY_VARIANT["deterministic"], | |
| ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"], | |
| HYBRID_ANALYSIS_REVIEWS_FILENAME, | |
| ) | |
| CURRENT_ANALYSIS_DIR = PurePosixPath("analysis/current") | |
| CURRENT_ANALYSIS_MANIFEST_PATH = str(CURRENT_ANALYSIS_DIR / ROOT_MANIFEST_FILENAME) | |
| ANALYSIS_MANIFEST_SCHEMA_VERSION = 1 | |
| class ResolvedAnalysisReportPath: | |
| path: Path | |
| variant: str | |
| source: str | |
| snapshot_id: str | None = None | |
| analysis_id: str | None = None | |
| def default_hf_materialize_dir(output_dir: Path, repo_id: str, revision: str | None) -> Path: | |
| suffix = repo_id.replace("/", "--") | |
| if revision: | |
| suffix = f"{suffix}--{revision.replace('/', '--')}" | |
| return output_dir.resolve() / "snapshots" / f"hf-{suffix}" | |
| def repo_relative_path_to_local(base_dir: Path, repo_relative_path: str) -> Path: | |
| return base_dir.joinpath(*PurePosixPath(repo_relative_path).parts) | |
| def snapshot_artifact_path(snapshot_id: str, filename: str) -> str: | |
| return str(PurePosixPath("snapshots") / snapshot_id / filename) | |
| def archived_snapshot_manifest_path(snapshot_id: str) -> str: | |
| return snapshot_artifact_path(snapshot_id, ROOT_MANIFEST_FILENAME) | |
| def analysis_run_artifact_path(snapshot_id: str, analysis_id: str, filename: str) -> str: | |
| return str(PurePosixPath("snapshots") / snapshot_id / "analysis-runs" / analysis_id / filename) | |
| def analysis_run_manifest_path(snapshot_id: str, analysis_id: str) -> str: | |
| return analysis_run_artifact_path(snapshot_id, analysis_id, ROOT_MANIFEST_FILENAME) | |
| def current_analysis_artifact_path(filename: str) -> str: | |
| return str(CURRENT_ANALYSIS_DIR / filename) | |
| def repo_key(repo_slug: str) -> str: | |
| return _path_key(repo_slug) | |
| def model_key(model: str) -> str: | |
| return _path_key(model) | |
| def build_current_analysis_manifest( | |
| *, | |
| repo: str, | |
| snapshot_id: str, | |
| analysis_id: str, | |
| variant: str, | |
| channel: str, | |
| model: str | None, | |
| published_at: str, | |
| include_hybrid_reviews: bool, | |
| ) -> dict[str, Any]: | |
| artifacts = { | |
| "hybrid": current_analysis_artifact_path(ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"]), | |
| } | |
| archived_artifacts = { | |
| "hybrid": analysis_run_artifact_path( | |
| snapshot_id, | |
| analysis_id, | |
| ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"], | |
| ) | |
| } | |
| if include_hybrid_reviews: | |
| artifacts["hybrid_reviews"] = current_analysis_artifact_path( | |
| HYBRID_ANALYSIS_REVIEWS_FILENAME | |
| ) | |
| archived_artifacts["hybrid_reviews"] = analysis_run_artifact_path( | |
| snapshot_id, | |
| analysis_id, | |
| HYBRID_ANALYSIS_REVIEWS_FILENAME, | |
| ) | |
| payload = { | |
| "schema_version": ANALYSIS_MANIFEST_SCHEMA_VERSION, | |
| "repo": repo, | |
| "snapshot_id": snapshot_id, | |
| "analysis_id": analysis_id, | |
| "variant": variant, | |
| "channel": channel, | |
| "model": model, | |
| "published_at": published_at, | |
| "artifacts": artifacts, | |
| "archived_artifacts": archived_artifacts, | |
| } | |
| return validate_current_analysis_manifest(payload) | |
| def build_archived_analysis_run_manifest( | |
| *, | |
| repo: str, | |
| snapshot_id: str, | |
| analysis_id: str, | |
| variant: str, | |
| channel: str, | |
| model: str | None, | |
| published_at: str, | |
| include_hybrid_reviews: bool, | |
| ) -> dict[str, Any]: | |
| artifacts = { | |
| "hybrid": analysis_run_artifact_path( | |
| snapshot_id, | |
| analysis_id, | |
| ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"], | |
| ) | |
| } | |
| if include_hybrid_reviews: | |
| artifacts["hybrid_reviews"] = analysis_run_artifact_path( | |
| snapshot_id, | |
| analysis_id, | |
| HYBRID_ANALYSIS_REVIEWS_FILENAME, | |
| ) | |
| payload = { | |
| "schema_version": ANALYSIS_MANIFEST_SCHEMA_VERSION, | |
| "repo": repo, | |
| "snapshot_id": snapshot_id, | |
| "analysis_id": analysis_id, | |
| "variant": variant, | |
| "channel": channel, | |
| "model": model, | |
| "published_at": published_at, | |
| "artifacts": artifacts, | |
| } | |
| return validate_archived_analysis_run_manifest(payload) | |
| def load_current_analysis_manifest(path: Path) -> dict[str, Any]: | |
| payload = read_json(path) | |
| if not isinstance(payload, dict): | |
| raise ValueError(f"Current analysis manifest at {path} must contain a JSON object.") | |
| return validate_current_analysis_manifest(payload) | |
| def load_archived_analysis_run_manifest(path: Path) -> dict[str, Any]: | |
| payload = read_json(path) | |
| if not isinstance(payload, dict): | |
| raise ValueError(f"Archived analysis manifest at {path} must contain a JSON object.") | |
| return validate_archived_analysis_run_manifest(payload) | |
| def resolve_default_dashboard_analysis_report( | |
| snapshot_dir: Path, | |
| ) -> ResolvedAnalysisReportPath | None: | |
| current = resolve_current_analysis_report(snapshot_dir) | |
| if current is not None and _analysis_matches_snapshot(snapshot_dir, current): | |
| return current | |
| return resolve_snapshot_local_analysis_report(snapshot_dir, variant="auto") | |
| def resolve_current_analysis_report( | |
| snapshot_dir: Path, | |
| *, | |
| variant: str = "auto", | |
| ) -> ResolvedAnalysisReportPath | None: | |
| normalized = _normalize_analysis_variant(variant) | |
| manifest_path = repo_relative_path_to_local(snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH) | |
| if not manifest_path.exists(): | |
| return None | |
| manifest = load_current_analysis_manifest(manifest_path) | |
| artifact_key = _analysis_artifact_key_for_variant(normalized, manifest_kind="current") | |
| artifact_path = manifest.get("artifacts", {}).get(artifact_key) | |
| if not isinstance(artifact_path, str) or not artifact_path: | |
| message = ( | |
| f"Published current analysis manifest does not provide the {normalized} artifact." | |
| if normalized != "auto" | |
| else "Published current analysis manifest does not provide the canonical hybrid artifact." | |
| ) | |
| raise ValueError(message) | |
| report_path = repo_relative_path_to_local(snapshot_dir, artifact_path) | |
| if not report_path.exists(): | |
| raise ValueError( | |
| f"Published current analysis artifact {artifact_path!r} is missing from the materialized snapshot." | |
| ) | |
| return ResolvedAnalysisReportPath( | |
| path=report_path, | |
| variant="hybrid" if artifact_key == "hybrid" else normalized, | |
| source="current", | |
| snapshot_id=str(manifest["snapshot_id"]), | |
| analysis_id=str(manifest["analysis_id"]), | |
| ) | |
| def resolve_snapshot_local_analysis_report( | |
| snapshot_dir: Path, | |
| *, | |
| variant: str = "auto", | |
| ) -> ResolvedAnalysisReportPath | None: | |
| normalized = _normalize_analysis_variant(variant) | |
| if normalized == "auto": | |
| hybrid_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"] | |
| if hybrid_path.exists(): | |
| return ResolvedAnalysisReportPath( | |
| path=hybrid_path, | |
| variant="hybrid", | |
| source="snapshot", | |
| ) | |
| deterministic_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["deterministic"] | |
| if deterministic_path.exists(): | |
| return ResolvedAnalysisReportPath( | |
| path=deterministic_path, | |
| variant="deterministic", | |
| source="snapshot", | |
| ) | |
| return None | |
| report_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT[normalized] | |
| if not report_path.exists(): | |
| return None | |
| return ResolvedAnalysisReportPath( | |
| path=report_path, | |
| variant=normalized, | |
| source="snapshot", | |
| ) | |
| def validate_current_analysis_manifest(payload: dict[str, Any]) -> dict[str, Any]: | |
| validated = _validate_analysis_manifest(payload, require_archived_artifacts=True) | |
| archived_artifacts = _validate_artifacts( | |
| dict(validated["archived_artifacts"]), | |
| expected_prefix=analysis_run_artifact_path( | |
| str(validated["snapshot_id"]), | |
| str(validated["analysis_id"]), | |
| "", | |
| ), | |
| ) | |
| if set(archived_artifacts) != set(validated["artifacts"]): | |
| raise ValueError("Current analysis manifest artifacts and archived_artifacts must match.") | |
| validated["archived_artifacts"] = archived_artifacts | |
| return validated | |
| def validate_archived_analysis_run_manifest(payload: dict[str, Any]) -> dict[str, Any]: | |
| return _validate_analysis_manifest(payload, require_archived_artifacts=False) | |
| def load_latest_snapshot_pointer(snapshots_root: Path) -> Path | None: | |
| resolved_snapshots_root = snapshots_root.resolve() | |
| latest_path = resolved_snapshots_root / "latest.json" | |
| if not latest_path.exists(): | |
| return None | |
| payload = read_json(latest_path) | |
| snapshot_dir = payload.get("snapshot_dir") | |
| if isinstance(snapshot_dir, str) and snapshot_dir: | |
| path = Path(snapshot_dir) | |
| if path.is_absolute(): | |
| return path.resolve() | |
| return (resolved_snapshots_root.parent / path).resolve() | |
| return None | |
| def resolve_snapshot_dir_from_output(output_dir: Path, snapshot_dir: Path | None) -> Path: | |
| return resolve_snapshot_dir_from_snapshots_root( | |
| output_dir.resolve() / "snapshots", snapshot_dir | |
| ) | |
| def resolve_snapshot_dir_from_snapshots_root( | |
| snapshots_root: Path, | |
| snapshot_dir: Path | None, | |
| ) -> Path: | |
| if snapshot_dir is not None: | |
| return snapshot_dir.resolve() | |
| resolved_snapshots_root = snapshots_root.resolve() | |
| latest_path = resolved_snapshots_root / "latest.json" | |
| latest_snapshot_dir = load_latest_snapshot_pointer(resolved_snapshots_root) | |
| if latest_snapshot_dir is not None: | |
| return latest_snapshot_dir | |
| snapshot_dirs = sorted(path for path in resolved_snapshots_root.glob("*") if path.is_dir()) | |
| if snapshot_dirs: | |
| return snapshot_dirs[-1].resolve() | |
| raise FileNotFoundError(f"Could not resolve a snapshot directory from {latest_path}") | |
| def _validate_analysis_manifest( | |
| payload: dict[str, Any], | |
| *, | |
| require_archived_artifacts: bool, | |
| ) -> dict[str, Any]: | |
| validated = {str(key): value for key, value in payload.items()} | |
| if validated.get("schema_version") != ANALYSIS_MANIFEST_SCHEMA_VERSION: | |
| raise ValueError( | |
| f"Unsupported analysis manifest schema version: {validated.get('schema_version')!r}" | |
| ) | |
| for field in ("repo", "snapshot_id", "analysis_id", "variant", "channel", "published_at"): | |
| if not isinstance(validated.get(field), str) or not str(validated[field]).strip(): | |
| raise ValueError(f"Analysis manifest field {field!r} must be a non-empty string.") | |
| validated[field] = str(validated[field]).strip() | |
| model = validated.get("model") | |
| if model is not None and not isinstance(model, str): | |
| raise ValueError("Analysis manifest field 'model' must be a string when present.") | |
| artifacts = validated.get("artifacts") | |
| if not isinstance(artifacts, dict): | |
| raise ValueError("Analysis manifest field 'artifacts' must be an object.") | |
| expected_prefix = ( | |
| current_analysis_artifact_path("") | |
| if require_archived_artifacts | |
| else analysis_run_artifact_path( | |
| str(validated["snapshot_id"]), | |
| str(validated["analysis_id"]), | |
| "", | |
| ) | |
| ) | |
| validated["artifacts"] = _validate_artifacts(dict(artifacts), expected_prefix=expected_prefix) | |
| if require_archived_artifacts: | |
| archived_artifacts = validated.get("archived_artifacts") | |
| if not isinstance(archived_artifacts, dict): | |
| raise ValueError( | |
| "Current analysis manifest field 'archived_artifacts' must be an object." | |
| ) | |
| validated["archived_artifacts"] = { | |
| str(key): value for key, value in archived_artifacts.items() | |
| } | |
| return validated | |
| def _validate_artifacts(artifacts: dict[str, Any], *, expected_prefix: str) -> dict[str, str]: | |
| normalized = {str(key): value for key, value in artifacts.items()} | |
| hybrid_path = normalized.get("hybrid") | |
| if not isinstance(hybrid_path, str) or not hybrid_path: | |
| raise ValueError("Analysis manifest must include artifacts.hybrid.") | |
| validated = {"hybrid": hybrid_path} | |
| hybrid_reviews_path = normalized.get("hybrid_reviews") | |
| if hybrid_reviews_path is not None: | |
| if not isinstance(hybrid_reviews_path, str) or not hybrid_reviews_path: | |
| raise ValueError( | |
| "Analysis manifest artifacts.hybrid_reviews must be a non-empty string." | |
| ) | |
| validated["hybrid_reviews"] = hybrid_reviews_path | |
| for key, value in validated.items(): | |
| if not value.startswith(expected_prefix): | |
| raise ValueError( | |
| f"Analysis manifest artifact {key!r} must live under {expected_prefix!r}, got {value!r}." | |
| ) | |
| return validated | |
| def _path_key(value: str) -> str: | |
| normalized = re.sub(r"[^a-z0-9]+", "-", value.strip().lower()) | |
| normalized = re.sub(r"-+", "-", normalized).strip("-") | |
| if not normalized: | |
| raise ValueError("Expected a non-empty path key value.") | |
| return normalized | |
| def _analysis_matches_snapshot( | |
| snapshot_dir: Path, | |
| analysis_path: ResolvedAnalysisReportPath, | |
| ) -> bool: | |
| snapshot_manifest_path = snapshot_dir / ROOT_MANIFEST_FILENAME | |
| if snapshot_manifest_path.exists(): | |
| snapshot_manifest = read_json(snapshot_manifest_path) | |
| snapshot_id = snapshot_manifest.get("snapshot_id") | |
| if snapshot_id is not None: | |
| return str(snapshot_id) == str(analysis_path.snapshot_id) | |
| return snapshot_dir.name == str(analysis_path.snapshot_id) | |
| def _normalize_analysis_variant(variant: str) -> str: | |
| normalized = variant.strip().lower() | |
| if normalized not in {"auto", "deterministic", "hybrid"}: | |
| raise ValueError( | |
| f"Unsupported analysis variant {variant!r}; expected auto, hybrid, or deterministic." | |
| ) | |
| return normalized | |
| def _analysis_artifact_key_for_variant(variant: str, *, manifest_kind: str) -> str: | |
| if variant in {"auto", "hybrid"}: | |
| return "hybrid" | |
| raise ValueError( | |
| f"Published {manifest_kind} analysis only serves canonical hybrid artifacts; requested {variant!r}." | |
| ) | |