Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from datetime import UTC, datetime | |
| from pathlib import Path, PurePosixPath | |
| from typing import Any | |
| from slop_farmer.config import RepoRef | |
| from slop_farmer.data.parquet_io import read_json, write_json, write_text | |
| from slop_farmer.reports.canonical_duplicate_pr import prepare_publish_artifacts, stage_run_bundle | |
| from slop_farmer.reports.duplicate_prs import ( | |
| DEFAULT_DUPLICATE_PR_MODEL, | |
| load_duplicate_pr_bundle, | |
| select_mergeable_duplicate_pr_cluster, | |
| ) | |
| # Navigation: | |
| # - run_duplicate_pr_merge(): end-to-end orchestration entrypoint | |
| # - validate_codex_result(): structured result checks | |
| # - git/gh/codex helpers | |
| # - file-policy helpers | |
| DEFAULT_RUNS_DIR = Path("runs/duplicate_prs") | |
| DEFAULT_FILE_POLICY = "pure-loc" | |
| FILE_POLICY_CHOICES = ("pure-loc", "allow-docs", "allow-any") | |
| CODE_FILE_SUFFIXES = { | |
| ".c", | |
| ".cc", | |
| ".cpp", | |
| ".go", | |
| ".h", | |
| ".hpp", | |
| ".java", | |
| ".js", | |
| ".jsx", | |
| ".kt", | |
| ".m", | |
| ".mm", | |
| ".php", | |
| ".py", | |
| ".rb", | |
| ".rs", | |
| ".scala", | |
| ".sh", | |
| ".swift", | |
| ".ts", | |
| ".tsx", | |
| } | |
| DOC_FILE_SUFFIXES = {".md", ".mdx", ".rst", ".txt"} | |
| DOC_DIRECTORY_NAMES = {"doc", "docs"} | |
| DOC_FILE_PREFIXES = ("changelog", "readme", "news") | |
| TEST_DIRECTORY_NAMES = {"test", "tests"} | |
| # Merge orchestration | |
| def run_duplicate_pr_merge( | |
| *, | |
| report_path: Path | None, | |
| snapshot_dir: Path | None, | |
| repo_dir: Path, | |
| cluster_id: str | None, | |
| fork_owner: str | None, | |
| fork_repo: str | None = None, | |
| upstream_repo: str | None = None, | |
| upstream_remote: str = "origin", | |
| fork_remote: str = "fork", | |
| file_policy: str = DEFAULT_FILE_POLICY, | |
| model: str = DEFAULT_DUPLICATE_PR_MODEL, | |
| runs_dir: Path = DEFAULT_RUNS_DIR, | |
| ) -> dict[str, Any]: | |
| if file_policy not in FILE_POLICY_CHOICES: | |
| raise ValueError( | |
| f"Unsupported file policy {file_policy!r}. " | |
| f"Expected one of: {', '.join(FILE_POLICY_CHOICES)}." | |
| ) | |
| bundle = load_duplicate_pr_bundle( | |
| report_path=report_path, | |
| snapshot_dir=snapshot_dir, | |
| model=model, | |
| ) | |
| effective_upstream_repo = _normalize_repo_slug(upstream_repo or bundle.repo) | |
| selected_cluster = select_mergeable_duplicate_pr_cluster( | |
| bundle, | |
| cluster_id=cluster_id, | |
| model=model, | |
| ) | |
| _require_command("git") | |
| _require_command("gh") | |
| _require_command("codex") | |
| authenticated_user = _resolve_authenticated_github_user() | |
| fork_target = _resolve_fork_target( | |
| upstream_repo=effective_upstream_repo, | |
| fork_repo=fork_repo, | |
| fork_owner=fork_owner, | |
| authenticated_user=authenticated_user, | |
| ) | |
| effective_fork_owner = fork_target.owner | |
| effective_fork_repo = fork_target.slug | |
| resolved_repo_dir = repo_dir.resolve() | |
| _validate_repo_checkout( | |
| resolved_repo_dir, | |
| expected_repo=effective_upstream_repo, | |
| remote_name=upstream_remote, | |
| ) | |
| default_branch = _resolve_default_branch(effective_upstream_repo) | |
| run_dir = _create_run_dir(runs_dir) | |
| manifest = stage_run_bundle( | |
| bundle.report_path, | |
| run_dir, | |
| selected_cluster=selected_cluster, | |
| max_clusters=1, | |
| prompt_repo=effective_upstream_repo, | |
| prompt_default_branch=default_branch, | |
| prompt_file_policy_instruction=_file_policy_instruction(file_policy), | |
| ) | |
| manifest_path = run_dir / "run-manifest.json" | |
| run_stamp = _utc_stamp() | |
| branch_name = f"codex/{selected_cluster['cluster_id']}-{run_stamp}" | |
| worktree_dir = run_dir / "worktree" | |
| _create_worktree( | |
| repo_dir=resolved_repo_dir, | |
| worktree_dir=worktree_dir, | |
| branch_name=branch_name, | |
| default_branch=default_branch, | |
| upstream_remote=upstream_remote, | |
| ) | |
| _update_manifest( | |
| manifest_path, | |
| { | |
| "upstream_repo": effective_upstream_repo, | |
| "upstream_remote": upstream_remote, | |
| "default_branch": default_branch, | |
| "branch_name": branch_name, | |
| "worktree_dir": str(worktree_dir.resolve()), | |
| "fork_owner": effective_fork_owner, | |
| "fork_repo": effective_fork_repo, | |
| "fork_remote": fork_remote, | |
| "file_policy": file_policy, | |
| }, | |
| ) | |
| artifacts = manifest["artifacts"] | |
| result_path = Path(artifacts["result_path"]) | |
| _run_codex_exec( | |
| worktree_dir=worktree_dir, | |
| run_dir=run_dir, | |
| prompt_path=Path(artifacts["prompt_path"]), | |
| schema_path=Path(artifacts["schema_path"]), | |
| result_path=result_path, | |
| ) | |
| result = validate_codex_result(manifest_path, result_path) | |
| changed_paths = _validate_synthesized_branch( | |
| worktree_dir=worktree_dir, | |
| upstream_remote=upstream_remote, | |
| default_branch=default_branch, | |
| commit_message=result["commit_message"], | |
| file_policy=file_policy, | |
| ) | |
| publish_metadata = prepare_publish_artifacts(manifest_path, result_path) | |
| fork_repo = _ensure_fork_repo( | |
| upstream_repo=effective_upstream_repo, | |
| fork_repo=effective_fork_repo, | |
| authenticated_user=authenticated_user, | |
| ) | |
| _ensure_fork_remote(worktree_dir=worktree_dir, fork_repo=fork_repo, remote_name=fork_remote) | |
| _push_branch(worktree_dir=worktree_dir, branch_name=branch_name, remote_name=fork_remote) | |
| pr_url = _create_pull_request( | |
| upstream_repo=effective_upstream_repo, | |
| default_branch=default_branch, | |
| fork_owner=effective_fork_owner, | |
| branch_name=branch_name, | |
| title=publish_metadata["pr_title"], | |
| body_path=Path(publish_metadata["pr_body_path"]), | |
| ) | |
| pr_url_path = Path(artifacts["pr_url_path"]) | |
| write_text(pr_url.rstrip() + "\n", pr_url_path) | |
| _update_manifest( | |
| manifest_path, | |
| { | |
| "changed_paths": changed_paths, | |
| "pr_url": pr_url, | |
| }, | |
| ) | |
| publish_metadata_path = Path(artifacts["publish_metadata_path"]) | |
| publish_metadata["pr_url"] = pr_url | |
| publish_metadata["changed_paths"] = changed_paths | |
| write_json(publish_metadata, publish_metadata_path) | |
| return { | |
| "cluster_id": selected_cluster["cluster_id"], | |
| "repo": effective_upstream_repo, | |
| "report_path": str(bundle.report_path), | |
| "run_dir": str(run_dir.resolve()), | |
| "worktree_dir": str(worktree_dir.resolve()), | |
| "branch_name": branch_name, | |
| "fork_repo": fork_repo, | |
| "fork_remote": fork_remote, | |
| "upstream_remote": upstream_remote, | |
| "file_policy": file_policy, | |
| "pr_url": pr_url, | |
| "changed_paths": changed_paths, | |
| } | |
| def validate_codex_result(manifest_path: Path, result_path: Path) -> dict[str, Any]: | |
| manifest = read_json(manifest_path.resolve()) | |
| result = json.loads(result_path.resolve().read_text(encoding="utf-8")) | |
| selected_cluster = manifest["selected_cluster"] | |
| if result.get("status") != "success": | |
| summary = str(result.get("summary") or "").strip() | |
| raise ValueError( | |
| "Codex did not synthesize a valid canonical PR." + (f" {summary}" if summary else "") | |
| ) | |
| if result.get("cluster_id") != selected_cluster["cluster_id"]: | |
| raise ValueError("Codex result cluster_id does not match the selected cluster.") | |
| expected_source_pr_numbers = _ordered_ints(selected_cluster["source_pr_numbers"]) | |
| actual_source_pr_numbers = _normalize_result_source_pr_numbers( | |
| expected_source_pr_numbers=expected_source_pr_numbers, | |
| raw_source_pr_numbers=result.get("source_pr_numbers"), | |
| ) | |
| tests_run = [ | |
| str(value).strip() for value in result.get("tests_run") or [] if str(value).strip() | |
| ] | |
| if not tests_run: | |
| raise ValueError("Codex result did not include any executed validation commands.") | |
| for field in ("commit_message", "pr_title", "summary"): | |
| if not str(result.get(field) or "").strip(): | |
| raise ValueError(f"Codex result did not provide a {field.replace('_', ' ')}.") | |
| normalized = dict(result) | |
| normalized["source_pr_numbers"] = actual_source_pr_numbers | |
| normalized["tests_run"] = tests_run | |
| return normalized | |
| # GitHub / git / Codex helpers | |
| def _require_command(command_name: str) -> None: | |
| if shutil.which(command_name): | |
| return | |
| raise RuntimeError(f"Missing required command: {command_name}") | |
| def _resolve_authenticated_github_user() -> str: | |
| try: | |
| _run_checked(["gh", "auth", "status"]) | |
| except RuntimeError as exc: | |
| raise RuntimeError( | |
| "GitHub CLI authentication is invalid. Run `gh auth login` and retry." | |
| ) from exc | |
| login = _run_stdout(["gh", "api", "user", "--jq", ".login"]).strip() | |
| if not login: | |
| raise RuntimeError("Could not resolve the authenticated GitHub user from `gh api user`.") | |
| return login | |
| def _normalize_repo_slug(raw: str) -> str: | |
| return RepoRef.parse(raw).slug | |
| def _resolve_fork_target( | |
| *, | |
| upstream_repo: str, | |
| fork_repo: str | None, | |
| fork_owner: str | None, | |
| authenticated_user: str, | |
| ) -> RepoRef: | |
| if fork_repo is not None: | |
| return RepoRef.parse(fork_repo.strip()) | |
| owner = (fork_owner or authenticated_user).strip() | |
| if not owner: | |
| raise RuntimeError("Could not resolve the GitHub fork owner.") | |
| upstream = RepoRef.parse(upstream_repo) | |
| return RepoRef(owner=owner, name=upstream.name) | |
| def _validate_repo_checkout(repo_dir: Path, *, expected_repo: str, remote_name: str) -> None: | |
| if not repo_dir.exists(): | |
| raise RuntimeError(f"Missing repo checkout: {repo_dir}") | |
| remote_url = _run_stdout(["git", "-C", str(repo_dir), "remote", "get-url", remote_name]).strip() | |
| actual_repo = _repo_slug_from_remote_url(remote_url) | |
| if actual_repo != expected_repo: | |
| raise RuntimeError( | |
| f"`--repo-dir` remote {remote_name!r} must point at {expected_repo}, " | |
| f"but resolves to {actual_repo or remote_url!r}." | |
| ) | |
| def _resolve_default_branch(repo: str) -> str: | |
| default_branch = _run_stdout( | |
| [ | |
| "gh", | |
| "repo", | |
| "view", | |
| repo, | |
| "--json", | |
| "defaultBranchRef", | |
| "--jq", | |
| ".defaultBranchRef.name", | |
| ] | |
| ).strip() | |
| if not default_branch: | |
| raise RuntimeError(f"Could not resolve the default branch for {repo}.") | |
| return default_branch | |
| def _create_run_dir(runs_dir: Path) -> Path: | |
| base_dir = runs_dir.resolve() | |
| base_dir.mkdir(parents=True, exist_ok=True) | |
| return Path(tempfile.mkdtemp(prefix=f"{_utc_stamp()}.", dir=base_dir)) | |
| def _create_worktree( | |
| *, | |
| repo_dir: Path, | |
| worktree_dir: Path, | |
| branch_name: str, | |
| default_branch: str, | |
| upstream_remote: str, | |
| ) -> None: | |
| _run_checked(["git", "-C", str(repo_dir), "fetch", upstream_remote, default_branch]) | |
| _run_checked( | |
| [ | |
| "git", | |
| "-C", | |
| str(repo_dir), | |
| "worktree", | |
| "add", | |
| "-B", | |
| branch_name, | |
| str(worktree_dir), | |
| f"{upstream_remote}/{default_branch}", | |
| ] | |
| ) | |
| def _run_codex_exec( | |
| *, | |
| worktree_dir: Path, | |
| run_dir: Path, | |
| prompt_path: Path, | |
| schema_path: Path, | |
| result_path: Path, | |
| ) -> None: | |
| prompt_text = prompt_path.read_text(encoding="utf-8") | |
| _run_checked( | |
| [ | |
| "codex", | |
| "exec", | |
| "-C", | |
| str(worktree_dir), | |
| "--add-dir", | |
| str(run_dir), | |
| "--full-auto", | |
| "--output-schema", | |
| str(schema_path), | |
| "-o", | |
| str(result_path), | |
| "-", | |
| ], | |
| input_text=prompt_text, | |
| ) | |
| if not result_path.exists(): | |
| raise RuntimeError("Codex did not write a structured result.") | |
| def _validate_synthesized_branch( | |
| *, | |
| worktree_dir: Path, | |
| upstream_remote: str, | |
| default_branch: str, | |
| commit_message: str, | |
| file_policy: str, | |
| ) -> list[str]: | |
| ahead_count = int( | |
| _run_stdout( | |
| [ | |
| "git", | |
| "-C", | |
| str(worktree_dir), | |
| "rev-list", | |
| "--count", | |
| f"{upstream_remote}/{default_branch}..HEAD", | |
| ] | |
| ) | |
| ) | |
| if ahead_count != 1: | |
| raise RuntimeError( | |
| f"Synthesized branch must contain exactly one commit on top of " | |
| f"{upstream_remote}/{default_branch}; found {ahead_count}." | |
| ) | |
| head_subject = _run_stdout(["git", "-C", str(worktree_dir), "log", "-1", "--pretty=%s"]).strip() | |
| if head_subject != commit_message: | |
| raise RuntimeError( | |
| f"Codex commit message {commit_message!r} does not match HEAD subject {head_subject!r}." | |
| ) | |
| status_output = _run_stdout(["git", "-C", str(worktree_dir), "status", "--porcelain"]) | |
| if status_output.strip(): | |
| raise RuntimeError("Codex left uncommitted changes in the synthesis worktree.") | |
| changed_paths = [ | |
| line.strip() | |
| for line in _run_stdout( | |
| [ | |
| "git", | |
| "-C", | |
| str(worktree_dir), | |
| "diff", | |
| "--name-only", | |
| f"{upstream_remote}/{default_branch}..HEAD", | |
| ] | |
| ).splitlines() | |
| if line.strip() | |
| ] | |
| if not changed_paths: | |
| raise RuntimeError("The synthesized branch does not modify any files.") | |
| if file_policy == "pure-loc": | |
| disallowed_paths = [path for path in changed_paths if _is_doc_path(path)] | |
| if disallowed_paths: | |
| raise RuntimeError( | |
| "The synthesized branch touched non-LOC documentation paths: " | |
| + ", ".join(disallowed_paths) | |
| ) | |
| unsupported_paths = [ | |
| path for path in changed_paths if not _is_allowed_path(path, allow_docs=False) | |
| ] | |
| if unsupported_paths: | |
| raise RuntimeError( | |
| "The synthesized branch touched files outside implementation/test code paths: " | |
| + ", ".join(unsupported_paths) | |
| ) | |
| elif file_policy == "allow-docs": | |
| unsupported_paths = [ | |
| path for path in changed_paths if not _is_allowed_path(path, allow_docs=True) | |
| ] | |
| if unsupported_paths: | |
| raise RuntimeError( | |
| "The synthesized branch touched files outside implementation/test/documentation " | |
| "paths: " + ", ".join(unsupported_paths) | |
| ) | |
| elif file_policy != "allow-any": | |
| raise RuntimeError(f"Unsupported file policy: {file_policy}") | |
| return changed_paths | |
| def _ensure_fork_repo( | |
| *, | |
| upstream_repo: str, | |
| fork_repo: str, | |
| authenticated_user: str, | |
| ) -> str: | |
| fork_target = RepoRef.parse(fork_repo) | |
| try: | |
| _run_checked(["gh", "repo", "view", fork_repo, "--json", "nameWithOwner"]) | |
| except RuntimeError: | |
| fork_command = [ | |
| "gh", | |
| "repo", | |
| "fork", | |
| upstream_repo, | |
| "--clone=false", | |
| "--remote=false", | |
| "--fork-name", | |
| fork_target.name, | |
| ] | |
| if fork_target.owner != authenticated_user: | |
| fork_command.extend(["--org", fork_target.owner]) | |
| _run_checked(fork_command) | |
| return fork_target.slug | |
| def _ensure_fork_remote(*, worktree_dir: Path, fork_repo: str, remote_name: str) -> None: | |
| fork_url = f"https://github.com/{fork_repo}.git" | |
| try: | |
| existing_url = _run_stdout( | |
| ["git", "-C", str(worktree_dir), "remote", "get-url", remote_name] | |
| ).strip() | |
| except RuntimeError: | |
| _run_checked( | |
| [ | |
| "git", | |
| "-C", | |
| str(worktree_dir), | |
| "remote", | |
| "add", | |
| remote_name, | |
| fork_url, | |
| ] | |
| ) | |
| return | |
| if existing_url != fork_url: | |
| raise RuntimeError( | |
| f"Existing `{remote_name}` remote points to {existing_url}, expected {fork_url}." | |
| ) | |
| def _push_branch(*, worktree_dir: Path, branch_name: str, remote_name: str) -> None: | |
| _run_checked(["git", "-C", str(worktree_dir), "push", "-u", remote_name, branch_name]) | |
| def _create_pull_request( | |
| *, | |
| upstream_repo: str, | |
| default_branch: str, | |
| fork_owner: str, | |
| branch_name: str, | |
| title: str, | |
| body_path: Path, | |
| ) -> str: | |
| return _run_stdout( | |
| [ | |
| "gh", | |
| "pr", | |
| "create", | |
| "--repo", | |
| upstream_repo, | |
| "--base", | |
| default_branch, | |
| "--head", | |
| f"{fork_owner}:{branch_name}", | |
| "--title", | |
| title, | |
| "--body-file", | |
| str(body_path), | |
| ] | |
| ).strip() | |
| def _repo_slug_from_remote_url(url: str) -> str: | |
| normalized = url.strip() | |
| if not normalized: | |
| return "" | |
| for prefix in ( | |
| "https://github.com/", | |
| "http://github.com/", | |
| "ssh://git@github.com/", | |
| "git://github.com/", | |
| ): | |
| if normalized.startswith(prefix): | |
| normalized = normalized[len(prefix) :] | |
| break | |
| if normalized.startswith("git@github.com:"): | |
| normalized = normalized.split(":", 1)[1] | |
| normalized = normalized.rstrip("/") | |
| if normalized.endswith(".git"): | |
| normalized = normalized[:-4] | |
| return normalized | |
| # File-policy helpers | |
| def _is_doc_path(path: str) -> bool: | |
| pure_path = PurePosixPath(path) | |
| lowered_parts = [part.lower() for part in pure_path.parts] | |
| lowered_name = pure_path.name.lower() | |
| if pure_path.suffix.lower() in DOC_FILE_SUFFIXES: | |
| return True | |
| if any(part in DOC_DIRECTORY_NAMES for part in lowered_parts): | |
| return True | |
| return lowered_name.startswith(DOC_FILE_PREFIXES) | |
| def _is_allowed_path(path: str, *, allow_docs: bool) -> bool: | |
| pure_path = PurePosixPath(path) | |
| if _is_test_path(pure_path): | |
| return True | |
| if allow_docs and _is_doc_path(path): | |
| return True | |
| return pure_path.suffix.lower() in CODE_FILE_SUFFIXES | |
| def _file_policy_instruction(file_policy: str) -> str: | |
| if file_policy == "pure-loc": | |
| return ( | |
| "Do not touch README files, changelogs, markdown docs, prose-only files, " | |
| "or commentary artifacts. Fail instead of submitting a noisy branch." | |
| ) | |
| if file_policy == "allow-docs": | |
| return ( | |
| "Documentation and markdown changes are allowed only when they are necessary " | |
| "for the same fix. Keep them minimal and subordinate to the code patch." | |
| ) | |
| if file_policy == "allow-any": | |
| return ( | |
| "Non-code file changes are allowed when they are required for the same fix, " | |
| "but keep the patch as small and focused as possible." | |
| ) | |
| raise ValueError(f"Unsupported file policy: {file_policy}") | |
| def _is_test_path(path: PurePosixPath) -> bool: | |
| lowered_parts = [part.lower() for part in path.parts] | |
| lowered_name = path.name.lower() | |
| lowered_stem = path.stem.lower() | |
| if any(part in TEST_DIRECTORY_NAMES for part in lowered_parts): | |
| return True | |
| return lowered_name.startswith("test_") or lowered_stem.endswith("_test") | |
| def _run_checked( | |
| args: list[str], | |
| *, | |
| input_text: str | None = None, | |
| ) -> subprocess.CompletedProcess[str]: | |
| try: | |
| return subprocess.run( | |
| args, | |
| input=input_text, | |
| text=True, | |
| capture_output=True, | |
| check=True, | |
| ) | |
| except subprocess.CalledProcessError as exc: | |
| detail = (exc.stderr or exc.stdout or "").strip() | |
| message = f"Command failed: {' '.join(args)}" | |
| if detail: | |
| message = f"{message}: {detail}" | |
| raise RuntimeError(message) from exc | |
| def _run_stdout(args: list[str]) -> str: | |
| return _run_checked(args).stdout | |
| def _update_manifest(manifest_path: Path, updates: dict[str, Any]) -> None: | |
| manifest = read_json(manifest_path) | |
| manifest.update(updates) | |
| write_json(manifest, manifest_path) | |
| def _utc_stamp() -> str: | |
| return datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") | |
| def _ordered_ints(values: Any) -> list[int]: | |
| ordered: list[int] = [] | |
| seen: set[int] = set() | |
| for value in values or []: | |
| number = _coerce_int(value) | |
| if number is None or number in seen: | |
| continue | |
| ordered.append(number) | |
| seen.add(number) | |
| return ordered | |
| def _normalize_result_source_pr_numbers( | |
| *, | |
| expected_source_pr_numbers: list[int], | |
| raw_source_pr_numbers: Any, | |
| ) -> list[int]: | |
| actual_source_pr_numbers = _ordered_ints(raw_source_pr_numbers) | |
| if len(actual_source_pr_numbers) < 2: | |
| raise ValueError( | |
| "Codex result must reference at least two open source PRs from the selected cluster." | |
| ) | |
| expected_source_pr_set = set(expected_source_pr_numbers) | |
| unknown_source_pr_numbers = [ | |
| number for number in actual_source_pr_numbers if number not in expected_source_pr_set | |
| ] | |
| if unknown_source_pr_numbers: | |
| raise ValueError( | |
| "Codex result source_pr_numbers included PRs outside the selected open PR set: " | |
| + ", ".join(str(number) for number in unknown_source_pr_numbers) | |
| ) | |
| actual_source_pr_set = set(actual_source_pr_numbers) | |
| return [number for number in expected_source_pr_numbers if number in actual_source_pr_set] | |
| def _coerce_int(value: Any) -> int | None: | |
| if value is None: | |
| return None | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| return None | |