diffusers-pr-api / src /slop_farmer /app /duplicate_prs.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
import shutil
import subprocess
import tempfile
from datetime import UTC, datetime
from pathlib import Path, PurePosixPath
from typing import Any
from slop_farmer.config import RepoRef
from slop_farmer.data.parquet_io import read_json, write_json, write_text
from slop_farmer.reports.canonical_duplicate_pr import prepare_publish_artifacts, stage_run_bundle
from slop_farmer.reports.duplicate_prs import (
DEFAULT_DUPLICATE_PR_MODEL,
load_duplicate_pr_bundle,
select_mergeable_duplicate_pr_cluster,
)
# Navigation:
# - run_duplicate_pr_merge(): end-to-end orchestration entrypoint
# - validate_codex_result(): structured result checks
# - git/gh/codex helpers
# - file-policy helpers
DEFAULT_RUNS_DIR = Path("runs/duplicate_prs")
DEFAULT_FILE_POLICY = "pure-loc"
FILE_POLICY_CHOICES = ("pure-loc", "allow-docs", "allow-any")
CODE_FILE_SUFFIXES = {
".c",
".cc",
".cpp",
".go",
".h",
".hpp",
".java",
".js",
".jsx",
".kt",
".m",
".mm",
".php",
".py",
".rb",
".rs",
".scala",
".sh",
".swift",
".ts",
".tsx",
}
DOC_FILE_SUFFIXES = {".md", ".mdx", ".rst", ".txt"}
DOC_DIRECTORY_NAMES = {"doc", "docs"}
DOC_FILE_PREFIXES = ("changelog", "readme", "news")
TEST_DIRECTORY_NAMES = {"test", "tests"}
# Merge orchestration
def run_duplicate_pr_merge(
*,
report_path: Path | None,
snapshot_dir: Path | None,
repo_dir: Path,
cluster_id: str | None,
fork_owner: str | None,
fork_repo: str | None = None,
upstream_repo: str | None = None,
upstream_remote: str = "origin",
fork_remote: str = "fork",
file_policy: str = DEFAULT_FILE_POLICY,
model: str = DEFAULT_DUPLICATE_PR_MODEL,
runs_dir: Path = DEFAULT_RUNS_DIR,
) -> dict[str, Any]:
if file_policy not in FILE_POLICY_CHOICES:
raise ValueError(
f"Unsupported file policy {file_policy!r}. "
f"Expected one of: {', '.join(FILE_POLICY_CHOICES)}."
)
bundle = load_duplicate_pr_bundle(
report_path=report_path,
snapshot_dir=snapshot_dir,
model=model,
)
effective_upstream_repo = _normalize_repo_slug(upstream_repo or bundle.repo)
selected_cluster = select_mergeable_duplicate_pr_cluster(
bundle,
cluster_id=cluster_id,
model=model,
)
_require_command("git")
_require_command("gh")
_require_command("codex")
authenticated_user = _resolve_authenticated_github_user()
fork_target = _resolve_fork_target(
upstream_repo=effective_upstream_repo,
fork_repo=fork_repo,
fork_owner=fork_owner,
authenticated_user=authenticated_user,
)
effective_fork_owner = fork_target.owner
effective_fork_repo = fork_target.slug
resolved_repo_dir = repo_dir.resolve()
_validate_repo_checkout(
resolved_repo_dir,
expected_repo=effective_upstream_repo,
remote_name=upstream_remote,
)
default_branch = _resolve_default_branch(effective_upstream_repo)
run_dir = _create_run_dir(runs_dir)
manifest = stage_run_bundle(
bundle.report_path,
run_dir,
selected_cluster=selected_cluster,
max_clusters=1,
prompt_repo=effective_upstream_repo,
prompt_default_branch=default_branch,
prompt_file_policy_instruction=_file_policy_instruction(file_policy),
)
manifest_path = run_dir / "run-manifest.json"
run_stamp = _utc_stamp()
branch_name = f"codex/{selected_cluster['cluster_id']}-{run_stamp}"
worktree_dir = run_dir / "worktree"
_create_worktree(
repo_dir=resolved_repo_dir,
worktree_dir=worktree_dir,
branch_name=branch_name,
default_branch=default_branch,
upstream_remote=upstream_remote,
)
_update_manifest(
manifest_path,
{
"upstream_repo": effective_upstream_repo,
"upstream_remote": upstream_remote,
"default_branch": default_branch,
"branch_name": branch_name,
"worktree_dir": str(worktree_dir.resolve()),
"fork_owner": effective_fork_owner,
"fork_repo": effective_fork_repo,
"fork_remote": fork_remote,
"file_policy": file_policy,
},
)
artifacts = manifest["artifacts"]
result_path = Path(artifacts["result_path"])
_run_codex_exec(
worktree_dir=worktree_dir,
run_dir=run_dir,
prompt_path=Path(artifacts["prompt_path"]),
schema_path=Path(artifacts["schema_path"]),
result_path=result_path,
)
result = validate_codex_result(manifest_path, result_path)
changed_paths = _validate_synthesized_branch(
worktree_dir=worktree_dir,
upstream_remote=upstream_remote,
default_branch=default_branch,
commit_message=result["commit_message"],
file_policy=file_policy,
)
publish_metadata = prepare_publish_artifacts(manifest_path, result_path)
fork_repo = _ensure_fork_repo(
upstream_repo=effective_upstream_repo,
fork_repo=effective_fork_repo,
authenticated_user=authenticated_user,
)
_ensure_fork_remote(worktree_dir=worktree_dir, fork_repo=fork_repo, remote_name=fork_remote)
_push_branch(worktree_dir=worktree_dir, branch_name=branch_name, remote_name=fork_remote)
pr_url = _create_pull_request(
upstream_repo=effective_upstream_repo,
default_branch=default_branch,
fork_owner=effective_fork_owner,
branch_name=branch_name,
title=publish_metadata["pr_title"],
body_path=Path(publish_metadata["pr_body_path"]),
)
pr_url_path = Path(artifacts["pr_url_path"])
write_text(pr_url.rstrip() + "\n", pr_url_path)
_update_manifest(
manifest_path,
{
"changed_paths": changed_paths,
"pr_url": pr_url,
},
)
publish_metadata_path = Path(artifacts["publish_metadata_path"])
publish_metadata["pr_url"] = pr_url
publish_metadata["changed_paths"] = changed_paths
write_json(publish_metadata, publish_metadata_path)
return {
"cluster_id": selected_cluster["cluster_id"],
"repo": effective_upstream_repo,
"report_path": str(bundle.report_path),
"run_dir": str(run_dir.resolve()),
"worktree_dir": str(worktree_dir.resolve()),
"branch_name": branch_name,
"fork_repo": fork_repo,
"fork_remote": fork_remote,
"upstream_remote": upstream_remote,
"file_policy": file_policy,
"pr_url": pr_url,
"changed_paths": changed_paths,
}
def validate_codex_result(manifest_path: Path, result_path: Path) -> dict[str, Any]:
manifest = read_json(manifest_path.resolve())
result = json.loads(result_path.resolve().read_text(encoding="utf-8"))
selected_cluster = manifest["selected_cluster"]
if result.get("status") != "success":
summary = str(result.get("summary") or "").strip()
raise ValueError(
"Codex did not synthesize a valid canonical PR." + (f" {summary}" if summary else "")
)
if result.get("cluster_id") != selected_cluster["cluster_id"]:
raise ValueError("Codex result cluster_id does not match the selected cluster.")
expected_source_pr_numbers = _ordered_ints(selected_cluster["source_pr_numbers"])
actual_source_pr_numbers = _normalize_result_source_pr_numbers(
expected_source_pr_numbers=expected_source_pr_numbers,
raw_source_pr_numbers=result.get("source_pr_numbers"),
)
tests_run = [
str(value).strip() for value in result.get("tests_run") or [] if str(value).strip()
]
if not tests_run:
raise ValueError("Codex result did not include any executed validation commands.")
for field in ("commit_message", "pr_title", "summary"):
if not str(result.get(field) or "").strip():
raise ValueError(f"Codex result did not provide a {field.replace('_', ' ')}.")
normalized = dict(result)
normalized["source_pr_numbers"] = actual_source_pr_numbers
normalized["tests_run"] = tests_run
return normalized
# GitHub / git / Codex helpers
def _require_command(command_name: str) -> None:
if shutil.which(command_name):
return
raise RuntimeError(f"Missing required command: {command_name}")
def _resolve_authenticated_github_user() -> str:
try:
_run_checked(["gh", "auth", "status"])
except RuntimeError as exc:
raise RuntimeError(
"GitHub CLI authentication is invalid. Run `gh auth login` and retry."
) from exc
login = _run_stdout(["gh", "api", "user", "--jq", ".login"]).strip()
if not login:
raise RuntimeError("Could not resolve the authenticated GitHub user from `gh api user`.")
return login
def _normalize_repo_slug(raw: str) -> str:
return RepoRef.parse(raw).slug
def _resolve_fork_target(
*,
upstream_repo: str,
fork_repo: str | None,
fork_owner: str | None,
authenticated_user: str,
) -> RepoRef:
if fork_repo is not None:
return RepoRef.parse(fork_repo.strip())
owner = (fork_owner or authenticated_user).strip()
if not owner:
raise RuntimeError("Could not resolve the GitHub fork owner.")
upstream = RepoRef.parse(upstream_repo)
return RepoRef(owner=owner, name=upstream.name)
def _validate_repo_checkout(repo_dir: Path, *, expected_repo: str, remote_name: str) -> None:
if not repo_dir.exists():
raise RuntimeError(f"Missing repo checkout: {repo_dir}")
remote_url = _run_stdout(["git", "-C", str(repo_dir), "remote", "get-url", remote_name]).strip()
actual_repo = _repo_slug_from_remote_url(remote_url)
if actual_repo != expected_repo:
raise RuntimeError(
f"`--repo-dir` remote {remote_name!r} must point at {expected_repo}, "
f"but resolves to {actual_repo or remote_url!r}."
)
def _resolve_default_branch(repo: str) -> str:
default_branch = _run_stdout(
[
"gh",
"repo",
"view",
repo,
"--json",
"defaultBranchRef",
"--jq",
".defaultBranchRef.name",
]
).strip()
if not default_branch:
raise RuntimeError(f"Could not resolve the default branch for {repo}.")
return default_branch
def _create_run_dir(runs_dir: Path) -> Path:
base_dir = runs_dir.resolve()
base_dir.mkdir(parents=True, exist_ok=True)
return Path(tempfile.mkdtemp(prefix=f"{_utc_stamp()}.", dir=base_dir))
def _create_worktree(
*,
repo_dir: Path,
worktree_dir: Path,
branch_name: str,
default_branch: str,
upstream_remote: str,
) -> None:
_run_checked(["git", "-C", str(repo_dir), "fetch", upstream_remote, default_branch])
_run_checked(
[
"git",
"-C",
str(repo_dir),
"worktree",
"add",
"-B",
branch_name,
str(worktree_dir),
f"{upstream_remote}/{default_branch}",
]
)
def _run_codex_exec(
*,
worktree_dir: Path,
run_dir: Path,
prompt_path: Path,
schema_path: Path,
result_path: Path,
) -> None:
prompt_text = prompt_path.read_text(encoding="utf-8")
_run_checked(
[
"codex",
"exec",
"-C",
str(worktree_dir),
"--add-dir",
str(run_dir),
"--full-auto",
"--output-schema",
str(schema_path),
"-o",
str(result_path),
"-",
],
input_text=prompt_text,
)
if not result_path.exists():
raise RuntimeError("Codex did not write a structured result.")
def _validate_synthesized_branch(
*,
worktree_dir: Path,
upstream_remote: str,
default_branch: str,
commit_message: str,
file_policy: str,
) -> list[str]:
ahead_count = int(
_run_stdout(
[
"git",
"-C",
str(worktree_dir),
"rev-list",
"--count",
f"{upstream_remote}/{default_branch}..HEAD",
]
)
)
if ahead_count != 1:
raise RuntimeError(
f"Synthesized branch must contain exactly one commit on top of "
f"{upstream_remote}/{default_branch}; found {ahead_count}."
)
head_subject = _run_stdout(["git", "-C", str(worktree_dir), "log", "-1", "--pretty=%s"]).strip()
if head_subject != commit_message:
raise RuntimeError(
f"Codex commit message {commit_message!r} does not match HEAD subject {head_subject!r}."
)
status_output = _run_stdout(["git", "-C", str(worktree_dir), "status", "--porcelain"])
if status_output.strip():
raise RuntimeError("Codex left uncommitted changes in the synthesis worktree.")
changed_paths = [
line.strip()
for line in _run_stdout(
[
"git",
"-C",
str(worktree_dir),
"diff",
"--name-only",
f"{upstream_remote}/{default_branch}..HEAD",
]
).splitlines()
if line.strip()
]
if not changed_paths:
raise RuntimeError("The synthesized branch does not modify any files.")
if file_policy == "pure-loc":
disallowed_paths = [path for path in changed_paths if _is_doc_path(path)]
if disallowed_paths:
raise RuntimeError(
"The synthesized branch touched non-LOC documentation paths: "
+ ", ".join(disallowed_paths)
)
unsupported_paths = [
path for path in changed_paths if not _is_allowed_path(path, allow_docs=False)
]
if unsupported_paths:
raise RuntimeError(
"The synthesized branch touched files outside implementation/test code paths: "
+ ", ".join(unsupported_paths)
)
elif file_policy == "allow-docs":
unsupported_paths = [
path for path in changed_paths if not _is_allowed_path(path, allow_docs=True)
]
if unsupported_paths:
raise RuntimeError(
"The synthesized branch touched files outside implementation/test/documentation "
"paths: " + ", ".join(unsupported_paths)
)
elif file_policy != "allow-any":
raise RuntimeError(f"Unsupported file policy: {file_policy}")
return changed_paths
def _ensure_fork_repo(
*,
upstream_repo: str,
fork_repo: str,
authenticated_user: str,
) -> str:
fork_target = RepoRef.parse(fork_repo)
try:
_run_checked(["gh", "repo", "view", fork_repo, "--json", "nameWithOwner"])
except RuntimeError:
fork_command = [
"gh",
"repo",
"fork",
upstream_repo,
"--clone=false",
"--remote=false",
"--fork-name",
fork_target.name,
]
if fork_target.owner != authenticated_user:
fork_command.extend(["--org", fork_target.owner])
_run_checked(fork_command)
return fork_target.slug
def _ensure_fork_remote(*, worktree_dir: Path, fork_repo: str, remote_name: str) -> None:
fork_url = f"https://github.com/{fork_repo}.git"
try:
existing_url = _run_stdout(
["git", "-C", str(worktree_dir), "remote", "get-url", remote_name]
).strip()
except RuntimeError:
_run_checked(
[
"git",
"-C",
str(worktree_dir),
"remote",
"add",
remote_name,
fork_url,
]
)
return
if existing_url != fork_url:
raise RuntimeError(
f"Existing `{remote_name}` remote points to {existing_url}, expected {fork_url}."
)
def _push_branch(*, worktree_dir: Path, branch_name: str, remote_name: str) -> None:
_run_checked(["git", "-C", str(worktree_dir), "push", "-u", remote_name, branch_name])
def _create_pull_request(
*,
upstream_repo: str,
default_branch: str,
fork_owner: str,
branch_name: str,
title: str,
body_path: Path,
) -> str:
return _run_stdout(
[
"gh",
"pr",
"create",
"--repo",
upstream_repo,
"--base",
default_branch,
"--head",
f"{fork_owner}:{branch_name}",
"--title",
title,
"--body-file",
str(body_path),
]
).strip()
def _repo_slug_from_remote_url(url: str) -> str:
normalized = url.strip()
if not normalized:
return ""
for prefix in (
"https://github.com/",
"http://github.com/",
"ssh://git@github.com/",
"git://github.com/",
):
if normalized.startswith(prefix):
normalized = normalized[len(prefix) :]
break
if normalized.startswith("git@github.com:"):
normalized = normalized.split(":", 1)[1]
normalized = normalized.rstrip("/")
if normalized.endswith(".git"):
normalized = normalized[:-4]
return normalized
# File-policy helpers
def _is_doc_path(path: str) -> bool:
pure_path = PurePosixPath(path)
lowered_parts = [part.lower() for part in pure_path.parts]
lowered_name = pure_path.name.lower()
if pure_path.suffix.lower() in DOC_FILE_SUFFIXES:
return True
if any(part in DOC_DIRECTORY_NAMES for part in lowered_parts):
return True
return lowered_name.startswith(DOC_FILE_PREFIXES)
def _is_allowed_path(path: str, *, allow_docs: bool) -> bool:
pure_path = PurePosixPath(path)
if _is_test_path(pure_path):
return True
if allow_docs and _is_doc_path(path):
return True
return pure_path.suffix.lower() in CODE_FILE_SUFFIXES
def _file_policy_instruction(file_policy: str) -> str:
if file_policy == "pure-loc":
return (
"Do not touch README files, changelogs, markdown docs, prose-only files, "
"or commentary artifacts. Fail instead of submitting a noisy branch."
)
if file_policy == "allow-docs":
return (
"Documentation and markdown changes are allowed only when they are necessary "
"for the same fix. Keep them minimal and subordinate to the code patch."
)
if file_policy == "allow-any":
return (
"Non-code file changes are allowed when they are required for the same fix, "
"but keep the patch as small and focused as possible."
)
raise ValueError(f"Unsupported file policy: {file_policy}")
def _is_test_path(path: PurePosixPath) -> bool:
lowered_parts = [part.lower() for part in path.parts]
lowered_name = path.name.lower()
lowered_stem = path.stem.lower()
if any(part in TEST_DIRECTORY_NAMES for part in lowered_parts):
return True
return lowered_name.startswith("test_") or lowered_stem.endswith("_test")
def _run_checked(
args: list[str],
*,
input_text: str | None = None,
) -> subprocess.CompletedProcess[str]:
try:
return subprocess.run(
args,
input=input_text,
text=True,
capture_output=True,
check=True,
)
except subprocess.CalledProcessError as exc:
detail = (exc.stderr or exc.stdout or "").strip()
message = f"Command failed: {' '.join(args)}"
if detail:
message = f"{message}: {detail}"
raise RuntimeError(message) from exc
def _run_stdout(args: list[str]) -> str:
return _run_checked(args).stdout
def _update_manifest(manifest_path: Path, updates: dict[str, Any]) -> None:
manifest = read_json(manifest_path)
manifest.update(updates)
write_json(manifest, manifest_path)
def _utc_stamp() -> str:
return datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
def _ordered_ints(values: Any) -> list[int]:
ordered: list[int] = []
seen: set[int] = set()
for value in values or []:
number = _coerce_int(value)
if number is None or number in seen:
continue
ordered.append(number)
seen.add(number)
return ordered
def _normalize_result_source_pr_numbers(
*,
expected_source_pr_numbers: list[int],
raw_source_pr_numbers: Any,
) -> list[int]:
actual_source_pr_numbers = _ordered_ints(raw_source_pr_numbers)
if len(actual_source_pr_numbers) < 2:
raise ValueError(
"Codex result must reference at least two open source PRs from the selected cluster."
)
expected_source_pr_set = set(expected_source_pr_numbers)
unknown_source_pr_numbers = [
number for number in actual_source_pr_numbers if number not in expected_source_pr_set
]
if unknown_source_pr_numbers:
raise ValueError(
"Codex result source_pr_numbers included PRs outside the selected open PR set: "
+ ", ".join(str(number) for number in unknown_source_pr_numbers)
)
actual_source_pr_set = set(actual_source_pr_numbers)
return [number for number in expected_source_pr_numbers if number in actual_source_pr_set]
def _coerce_int(value: Any) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None