a2a-sin-code-backend / app /github_ops.py
SIN-Deploy-Bot
Deploy via automated script [skip ci]
9d4bc2d
Raw
History Blame Contribute Delete
5.42 kB
"""
GitHub operations module for cloud coder agents.
Handles commits, branches, and PR creation using GitHub CLI.
"""
import subprocess
from pathlib import Path
from typing import Optional
REPO_DIR = Path("/workspace/OpenSIN-Code")
if not REPO_DIR.exists():
REPO_DIR = Path("/home/user/project/OpenSIN-Code")
if not REPO_DIR.exists():
REPO_DIR = Path.cwd().parent / "OpenSIN-Code"
class GitHubOps:
"""Handles all GitHub-related operations."""
def __init__(self, repo_dir: Optional[Path] = None):
self.repo_dir = repo_dir or REPO_DIR
def _run_git(self, args: list[str], capture: bool = True) -> subprocess.CompletedProcess:
"""Run a git command in the repository."""
cmd = ["git"] + args
return subprocess.run(
cmd, cwd=str(self.repo_dir), capture_output=capture, text=True, check=False
)
def _run_gh(self, args: list[str], capture: bool = True) -> subprocess.CompletedProcess:
"""Run a gh CLI command."""
cmd = ["gh"] + args
return subprocess.run(cmd, capture_output=capture, text=True, check=False)
def ensure_branch(self, branch_name: str, base: str = "main") -> bool:
"""Ensure branch exists, create from base if not."""
# Check if branch exists locally
result = self._run_git(["branch", "--list", branch_name])
if branch_name in result.stdout:
# Checkout existing branch
checkout = self._run_git(["checkout", branch_name])
return checkout.returncode == 0
# Create new branch from base
checkout = self._run_git(["checkout", base])
if checkout.returncode != 0:
return False
pull = self._run_git(["pull", "origin", base])
if pull.returncode != 0:
return False
create = self._run_git(["checkout", "-b", branch_name])
return create.returncode == 0
def commit_changes(self, message: str) -> Optional[str]:
"""Stage and commit all changes."""
# Add all changes
add = self._run_git(["add", "."])
if add.returncode != 0:
raise RuntimeError(f"git add failed: {add.stderr}")
# Commit
commit = self._run_git(["commit", "-m", message])
if commit.returncode != 0:
if "nothing to commit" in commit.stderr.lower():
return None
raise RuntimeError(f"git commit failed: {commit.stderr}")
# Get commit SHA
sha = commit.stdout.strip().split()[-1] if commit.stdout else "unknown"
return sha
def push_branch(self, branch_name: str, force: bool = False) -> bool:
"""Push branch to origin."""
args = ["push", "origin", branch_name]
if force:
args.append("--force")
push = self._run_git(args)
return push.returncode == 0
def create_pr(
self, title: str, body: str, base: str = "main", head: str = None
) -> Optional[str]:
"""Create a pull request using gh CLI."""
if not head:
# Get current branch
branch_result = self._run_git(["branch", "--show-current"])
head = branch_result.stdout.strip()
args = ["pr", "create", "--title", title, "--body", body, "--base", base, "--head", head]
result = self._run_gh(args)
if result.returncode == 0:
# Extract PR URL from output
lines = result.stdout.strip().split("\n")
for line in lines:
if "https://github.com" in line:
return line.strip()
return result.stdout
else:
if "already exists" in result.stderr.lower():
# PR already exists, return existing URL
list_result = self._run_gh(
["pr", "list", "--head", head, "--json", "url", "--jq", ".[0].url"]
)
if list_result.returncode == 0 and list_result.stdout.strip():
return list_result.stdout.strip()
return None
def get_issue(self, issue_number: int) -> Optional[dict]:
"""Fetch issue details using gh CLI."""
result = self._run_gh(
["issue", "view", str(issue_number), "--json", "number,title,body,labels,state"]
)
if result.returncode == 0:
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return None
return None
def get_current_branch(self) -> str:
"""Get current branch name."""
result = self._run_git(["branch", "--show-current"])
return result.stdout.strip() if result.returncode == 0 else "main"
def branch_exists_on_remote(self, branch_name: str) -> bool:
"""Check if branch exists on remote."""
result = self._run_git(["ls-remote", "--heads", "origin", branch_name])
return result.returncode == 0 and bool(result.stdout.strip())
def rebase_main(self) -> bool:
"""Rebase current branch on main."""
current = self.get_current_branch()
if current == "main":
return True
# Fetch latest
fetch = self._run_git(["fetch", "origin", "main"])
if fetch.returncode != 0:
return False
# Rebase
rebase = self._run_git(["rebase", "origin/main"])
return rebase.returncode == 0