Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| from pydantic import BaseModel | |
| from slop_farmer.config import AnalysisOptions | |
| from slop_farmer.data.parquet_io import read_json | |
| from slop_farmer.reports.analysis import LLM_PROVIDER_ENV_VARS, run_analysis | |
| from slop_farmer.reports.canonical_duplicate_pr import ( | |
| SnapshotBundle, | |
| load_snapshot_bundle, | |
| select_ranked_duplicate_pr_cluster, | |
| select_ranked_duplicate_pr_clusters, | |
| ) | |
| DEFAULT_DUPLICATE_PR_MODEL = "gpt-5.4-mini?service_tier=flex" | |
| HYBRID_REPORT_FILENAME = "analysis-report-hybrid.json" | |
| class DuplicatePrClusterMergeabilityResponse(BaseModel): | |
| accept: bool | |
| confidence: float | |
| reason: str | |
| def ensure_hybrid_report( | |
| *, | |
| report_path: Path | None, | |
| snapshot_dir: Path | None, | |
| model: str = DEFAULT_DUPLICATE_PR_MODEL, | |
| ) -> Path: | |
| resolved_report, resolved_snapshot_dir = _resolve_duplicate_pr_inputs( | |
| report_path=report_path, | |
| snapshot_dir=snapshot_dir, | |
| ) | |
| if resolved_report is not None and _report_has_llm_enrichment(resolved_report): | |
| return resolved_report | |
| cached_hybrid_report = resolved_snapshot_dir / HYBRID_REPORT_FILENAME | |
| if cached_hybrid_report.exists() and _report_has_llm_enrichment(cached_hybrid_report): | |
| return cached_hybrid_report.resolve() | |
| assert_hybrid_analysis_prerequisites() | |
| output_path = cached_hybrid_report.resolve() | |
| generated_report = run_analysis( | |
| AnalysisOptions( | |
| snapshot_dir=resolved_snapshot_dir, | |
| output_dir=resolved_snapshot_dir.parent, | |
| output=output_path, | |
| hf_repo_id=None, | |
| hf_revision=None, | |
| hf_materialize_dir=None, | |
| ranking_backend="hybrid", | |
| model=model, | |
| max_clusters=10, | |
| ) | |
| ).resolve() | |
| if not _report_has_llm_enrichment(generated_report): | |
| raise RuntimeError( | |
| f"Hybrid analysis for {resolved_snapshot_dir} completed without LLM enrichment. " | |
| "Install the optional fast-agent dependency, configure a provider API key, and retry." | |
| ) | |
| return generated_report | |
| def assert_hybrid_analysis_prerequisites() -> None: | |
| problems: list[str] = [] | |
| try: | |
| import fast_agent # noqa: F401 | |
| except Exception: | |
| problems.append( | |
| "Install `slop-farmer[llm]` or `fast-agent-mcp` so hybrid duplicate-PR gating can run." | |
| ) | |
| if not any(bool(os.environ.get(name)) for name in LLM_PROVIDER_ENV_VARS): | |
| problems.append( | |
| "Set one of OPENAI_API_KEY, ANTHROPIC_API_KEY, GOOGLE_API_KEY, or DEEPSEEK_API_KEY." | |
| ) | |
| if problems: | |
| raise RuntimeError( | |
| "Hybrid duplicate-PR analysis prerequisites are missing. " + " ".join(problems) | |
| ) | |
| def load_duplicate_pr_bundle( | |
| *, | |
| report_path: Path | None, | |
| snapshot_dir: Path | None, | |
| model: str = DEFAULT_DUPLICATE_PR_MODEL, | |
| ) -> SnapshotBundle: | |
| hybrid_report_path = ensure_hybrid_report( | |
| report_path=report_path, | |
| snapshot_dir=snapshot_dir, | |
| model=model, | |
| ) | |
| return load_snapshot_bundle(hybrid_report_path) | |
| def list_mergeable_duplicate_pr_clusters( | |
| *, | |
| report_path: Path | None, | |
| snapshot_dir: Path | None, | |
| limit: int | None, | |
| model: str = DEFAULT_DUPLICATE_PR_MODEL, | |
| ) -> list[dict[str, Any]]: | |
| if limit is not None and limit < 1: | |
| raise ValueError("--limit must be at least 1") | |
| bundle = load_duplicate_pr_bundle( | |
| report_path=report_path, | |
| snapshot_dir=snapshot_dir, | |
| model=model, | |
| ) | |
| assert_hybrid_analysis_prerequisites() | |
| mergeable_clusters: list[dict[str, Any]] = [] | |
| for candidate in select_ranked_duplicate_pr_clusters(bundle): | |
| gate_result = assess_duplicate_pr_cluster_mergeability(bundle, candidate, model=model) | |
| if not gate_result.accept: | |
| continue | |
| mergeable_clusters.append( | |
| { | |
| **candidate, | |
| "repo": bundle.repo, | |
| "snapshot_id": bundle.snapshot_id, | |
| "report_path": str(bundle.report_path), | |
| "mergeability_confidence": round(float(gate_result.confidence), 3), | |
| "mergeability_reason": gate_result.reason, | |
| } | |
| ) | |
| if limit is not None and len(mergeable_clusters) >= limit: | |
| break | |
| return mergeable_clusters | |
| def select_mergeable_duplicate_pr_cluster( | |
| bundle: SnapshotBundle, | |
| *, | |
| cluster_id: str | None, | |
| model: str = DEFAULT_DUPLICATE_PR_MODEL, | |
| ) -> dict[str, Any]: | |
| assert_hybrid_analysis_prerequisites() | |
| if cluster_id is not None: | |
| candidate = select_ranked_duplicate_pr_cluster(bundle, cluster_id=cluster_id) | |
| gate_result = assess_duplicate_pr_cluster_mergeability(bundle, candidate, model=model) | |
| if not gate_result.accept: | |
| raise ValueError( | |
| f"Cluster {cluster_id} did not pass the mergeability gate: {gate_result.reason}" | |
| ) | |
| return { | |
| **candidate, | |
| "mergeability_confidence": round(float(gate_result.confidence), 3), | |
| "mergeability_reason": gate_result.reason, | |
| } | |
| for candidate in select_ranked_duplicate_pr_clusters(bundle): | |
| gate_result = assess_duplicate_pr_cluster_mergeability(bundle, candidate, model=model) | |
| if gate_result.accept: | |
| return { | |
| **candidate, | |
| "mergeability_confidence": round(float(gate_result.confidence), 3), | |
| "mergeability_reason": gate_result.reason, | |
| } | |
| raise ValueError("No duplicate PR cluster passed the mergeability gate.") | |
| def assess_duplicate_pr_cluster_mergeability( | |
| bundle: SnapshotBundle, | |
| candidate: dict[str, Any], | |
| *, | |
| model: str = DEFAULT_DUPLICATE_PR_MODEL, | |
| ) -> DuplicatePrClusterMergeabilityResponse: | |
| packet = _duplicate_pr_cluster_packet(bundle, candidate) | |
| result = _run_duplicate_pr_cluster_gate(packet, model=model) | |
| if result is None: | |
| raise RuntimeError("Hybrid duplicate-PR mergeability gate failed to return a result.") | |
| return result | |
| def _resolve_duplicate_pr_inputs( | |
| *, | |
| report_path: Path | None, | |
| snapshot_dir: Path | None, | |
| ) -> tuple[Path | None, Path]: | |
| if (report_path is None) == (snapshot_dir is None): | |
| raise ValueError("Provide exactly one of --report or --snapshot-dir.") | |
| if report_path is not None: | |
| resolved_report = report_path.resolve() | |
| return resolved_report, resolved_report.parent.resolve() | |
| assert snapshot_dir is not None | |
| return None, snapshot_dir.resolve() | |
| def _report_has_llm_enrichment(report_path: Path) -> bool: | |
| if not report_path.exists(): | |
| return False | |
| try: | |
| payload = read_json(report_path) | |
| except Exception: | |
| return False | |
| return bool(payload.get("llm_enrichment")) | |
| def _duplicate_pr_cluster_packet( | |
| bundle: SnapshotBundle, candidate: dict[str, Any] | |
| ) -> dict[str, Any]: | |
| pr_rows = { | |
| int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None | |
| } | |
| issue_rows = {int(row["number"]): row for row in bundle.issues if row.get("number") is not None} | |
| pull_request_packets: list[dict[str, Any]] = [] | |
| for pr_number in candidate["source_pr_numbers"]: | |
| pull_request = pr_rows.get(int(pr_number)) | |
| if pull_request is None: | |
| continue | |
| files = [ | |
| row | |
| for row in bundle.pr_files | |
| if _coerce_int(row.get("pull_request_number")) == int(pr_number) | |
| ] | |
| diff_row = next( | |
| ( | |
| row | |
| for row in bundle.pr_diffs | |
| if _coerce_int(row.get("pull_request_number")) == int(pr_number) | |
| ), | |
| None, | |
| ) | |
| comments = [ | |
| row | |
| for row in bundle.comments | |
| if row.get("parent_kind") == "pull_request" | |
| and _coerce_int(row.get("parent_number")) == int(pr_number) | |
| ] | |
| reviews = [ | |
| row | |
| for row in bundle.reviews | |
| if _coerce_int(row.get("pull_request_number")) == int(pr_number) | |
| ] | |
| review_comments = [ | |
| row | |
| for row in bundle.review_comments | |
| if _coerce_int(row.get("pull_request_number")) == int(pr_number) | |
| ] | |
| pull_request_packets.append( | |
| { | |
| "number": int(pr_number), | |
| "title": pull_request.get("title"), | |
| "body_excerpt": _excerpt(pull_request.get("body"), 600), | |
| "filenames": sorted( | |
| {str(row.get("filename")) for row in files if row.get("filename")} | |
| )[:20], | |
| "diff_preview": _excerpt((diff_row or {}).get("diff"), 900), | |
| "discussion_comments": [ | |
| _excerpt(row.get("body"), 180) for row in comments[:2] if row.get("body") | |
| ], | |
| "reviews": [ | |
| { | |
| "state": row.get("state"), | |
| "body_excerpt": _excerpt(row.get("body"), 180), | |
| } | |
| for row in reviews[:2] | |
| ], | |
| "review_comments": [ | |
| { | |
| "path": row.get("path"), | |
| "body_excerpt": _excerpt(row.get("body"), 180), | |
| } | |
| for row in review_comments[:2] | |
| ], | |
| } | |
| ) | |
| target_issue_packet: dict[str, Any] | None = None | |
| target_issue_number = _coerce_int(candidate.get("target_issue_number")) | |
| if target_issue_number is not None and target_issue_number in issue_rows: | |
| issue = issue_rows[target_issue_number] | |
| issue_comments = [ | |
| row | |
| for row in bundle.comments | |
| if row.get("parent_kind") == "issue" | |
| and _coerce_int(row.get("parent_number")) == target_issue_number | |
| ] | |
| target_issue_packet = { | |
| "number": target_issue_number, | |
| "title": issue.get("title"), | |
| "body_excerpt": _excerpt(issue.get("body"), 500), | |
| "comments": [ | |
| _excerpt(row.get("body"), 180) for row in issue_comments[:2] if row.get("body") | |
| ], | |
| } | |
| return { | |
| "repo": bundle.repo, | |
| "snapshot_id": bundle.snapshot_id, | |
| "cluster_id": candidate["cluster_id"], | |
| "summary": candidate.get("summary"), | |
| "canonical_issue_number": _coerce_int(candidate.get("canonical_issue_number")), | |
| "canonical_pr_number": _coerce_int(candidate.get("canonical_pr_number")), | |
| "target_issue": target_issue_packet, | |
| "source_pr_numbers": candidate["source_pr_numbers"], | |
| "pull_requests": pull_request_packets, | |
| } | |
| def _run_duplicate_pr_cluster_gate( | |
| packet: dict[str, Any], | |
| *, | |
| model: str, | |
| ) -> DuplicatePrClusterMergeabilityResponse | None: | |
| try: | |
| from fast_agent import FastAgent | |
| except Exception: | |
| return None | |
| instruction = ( | |
| "You decide whether a cluster of open GitHub pull requests should be synthesized into one " | |
| "canonical pull request. Accept only when the PRs appear to implement the same concrete " | |
| "code-path fix and one small patch could replace them. Reject when the root cause, scope, " | |
| "or implementation strategy diverges, or when the overlap is only docs/tests/chatter." | |
| ) | |
| fast = FastAgent("slop-farmer-duplicate-pr-mergeability") | |
| async def mergeability_gate_stub() -> None: | |
| return None | |
| prompt = json.dumps(packet, indent=2, sort_keys=True) | |
| try: | |
| import asyncio | |
| async def _run() -> DuplicatePrClusterMergeabilityResponse | None: | |
| async with fast.run() as agent: | |
| result, _ = await agent.mergeability_gate.structured( | |
| prompt, | |
| DuplicatePrClusterMergeabilityResponse, | |
| ) | |
| return result | |
| return asyncio.run(_run()) | |
| except Exception: | |
| return None | |
| def _excerpt(value: Any, limit: int) -> str | None: | |
| text = str(value or "").strip() | |
| if not text: | |
| return None | |
| if len(text) <= limit: | |
| return text | |
| return text[: limit - 1].rstrip() + "…" | |
| def _coerce_int(value: Any) -> int | None: | |
| if value is None: | |
| return None | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| return None | |