""" GitHub operations module for cloud coder agents. Handles commits, branches, and PR creation using GitHub CLI. """ import subprocess from pathlib import Path from typing import Optional REPO_DIR = Path("/workspace/OpenSIN-Code") if not REPO_DIR.exists(): REPO_DIR = Path("/home/user/project/OpenSIN-Code") if not REPO_DIR.exists(): REPO_DIR = Path.cwd().parent / "OpenSIN-Code" class GitHubOps: """Handles all GitHub-related operations.""" def __init__(self, repo_dir: Optional[Path] = None): self.repo_dir = repo_dir or REPO_DIR def _run_git(self, args: list[str], capture: bool = True) -> subprocess.CompletedProcess: """Run a git command in the repository.""" cmd = ["git"] + args return subprocess.run( cmd, cwd=str(self.repo_dir), capture_output=capture, text=True, check=False ) def _run_gh(self, args: list[str], capture: bool = True) -> subprocess.CompletedProcess: """Run a gh CLI command.""" cmd = ["gh"] + args return subprocess.run(cmd, capture_output=capture, text=True, check=False) def ensure_branch(self, branch_name: str, base: str = "main") -> bool: """Ensure branch exists, create from base if not.""" # Check if branch exists locally result = self._run_git(["branch", "--list", branch_name]) if branch_name in result.stdout: # Checkout existing branch checkout = self._run_git(["checkout", branch_name]) return checkout.returncode == 0 # Create new branch from base checkout = self._run_git(["checkout", base]) if checkout.returncode != 0: return False pull = self._run_git(["pull", "origin", base]) if pull.returncode != 0: return False create = self._run_git(["checkout", "-b", branch_name]) return create.returncode == 0 def commit_changes(self, message: str) -> Optional[str]: """Stage and commit all changes.""" # Add all changes add = self._run_git(["add", "."]) if add.returncode != 0: raise RuntimeError(f"git add failed: {add.stderr}") # Commit commit = self._run_git(["commit", "-m", message]) if commit.returncode != 0: if "nothing to commit" in commit.stderr.lower(): return None raise RuntimeError(f"git commit failed: {commit.stderr}") # Get commit SHA sha = commit.stdout.strip().split()[-1] if commit.stdout else "unknown" return sha def push_branch(self, branch_name: str, force: bool = False) -> bool: """Push branch to origin.""" args = ["push", "origin", branch_name] if force: args.append("--force") push = self._run_git(args) return push.returncode == 0 def create_pr( self, title: str, body: str, base: str = "main", head: str = None ) -> Optional[str]: """Create a pull request using gh CLI.""" if not head: # Get current branch branch_result = self._run_git(["branch", "--show-current"]) head = branch_result.stdout.strip() args = ["pr", "create", "--title", title, "--body", body, "--base", base, "--head", head] result = self._run_gh(args) if result.returncode == 0: # Extract PR URL from output lines = result.stdout.strip().split("\n") for line in lines: if "https://github.com" in line: return line.strip() return result.stdout else: if "already exists" in result.stderr.lower(): # PR already exists, return existing URL list_result = self._run_gh( ["pr", "list", "--head", head, "--json", "url", "--jq", ".[0].url"] ) if list_result.returncode == 0 and list_result.stdout.strip(): return list_result.stdout.strip() return None def get_issue(self, issue_number: int) -> Optional[dict]: """Fetch issue details using gh CLI.""" result = self._run_gh( ["issue", "view", str(issue_number), "--json", "number,title,body,labels,state"] ) if result.returncode == 0: try: return json.loads(result.stdout) except json.JSONDecodeError: return None return None def get_current_branch(self) -> str: """Get current branch name.""" result = self._run_git(["branch", "--show-current"]) return result.stdout.strip() if result.returncode == 0 else "main" def branch_exists_on_remote(self, branch_name: str) -> bool: """Check if branch exists on remote.""" result = self._run_git(["ls-remote", "--heads", "origin", branch_name]) return result.returncode == 0 and bool(result.stdout.strip()) def rebase_main(self) -> bool: """Rebase current branch on main.""" current = self.get_current_branch() if current == "main": return True # Fetch latest fetch = self._run_git(["fetch", "origin", "main"]) if fetch.returncode != 0: return False # Rebase rebase = self._run_git(["rebase", "origin/main"]) return rebase.returncode == 0