ContriBot / services /github_service.py
GitHub Actions
sync: backend from GitHub Actions 2026-03-21 15:55:43
3a507e4
import asyncio
import hmac
import hashlib
import httpx
import logging
from typing import Optional, List, Dict, Any
from github import Github, GithubException
from fastapi import HTTPException
from config import settings
logger = logging.getLogger(__name__)
class GitHubService:
def __init__(self):
# Initialize PyGithub with the ContriBot token
self.client = Github(settings.GITHUB_TOKEN)
logger.info("[GITHUB] Initialized GitHubService")
async def _run_async(self, func, *args, **kwargs):
"""Helper to run blocking PyGithub calls in a thread pool."""
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
except GithubException as e:
if isinstance(e.data, dict):
detail = e.data.get("message") or str(e)
else:
detail = str(e.data) if e.data else str(e)
# Log rate limit specifically
if e.status == 403 and "rate limit" in detail.lower():
logger.warning(f"[GITHUB] Rate limit exceeded: {detail}")
elif e.status != 404: # Don't log 404s as errors if they are expected
logger.error(f"[GITHUB] API Error ({e.status}): {detail}")
raise HTTPException(status_code=e.status, detail=detail)
except Exception as e:
logger.error(f"[GITHUB] Unexpected error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# --- Repository Operations ---
async def get_repo(self, full_name: str) -> dict:
logger.debug(f"[GITHUB][REPO: {full_name}] Fetching repo data")
repo = await self._run_async(self.client.get_repo, full_name)
return repo.raw_data
async def get_repo_tree(self, full_name: str) -> dict:
logger.debug(f"[GITHUB][REPO: {full_name}] Fetching recursive tree")
def _get_tree():
repo = self.client.get_repo(full_name)
branch = repo.get_branch(repo.default_branch)
tree = repo.get_git_tree(branch.commit.sha, recursive=True)
return {"tree": [{"path": t.path, "type": t.type, "sha": t.sha} for t in tree.tree]}
return await self._run_async(_get_tree)
async def get_file_content(self, full_name: str, path: str) -> str:
logger.debug(f"[GITHUB][REPO: {full_name}] Fetching file content: {path}")
def _get_content():
repo = self.client.get_repo(full_name)
contents = repo.get_contents(path)
# Handle case where path is a directory
if isinstance(contents, list):
raise Exception(f"Path {path} is a directory, not a file.")
return contents.decoded_content.decode("utf-8")
return await self._run_async(_get_content)
async def file_exists(self, full_name: str, path: str, branch: str = None) -> bool:
logger.debug(f"[GITHUB][REPO: {full_name}] Checking if file exists: {path} (branch: {branch})")
def _check_exists():
repo = self.client.get_repo(full_name)
try:
repo.get_contents(path, ref=branch) if branch else repo.get_contents(path)
return True
except GithubException as e:
if e.status == 404:
return False
raise e
return await self._run_async(_check_exists)
async def create_or_update_file(self, full_name: str, path: str, content: str, message: str, branch: str) -> dict:
logger.info(f"[GITHUB][REPO: {full_name}] Committing file {path} to branch {branch}")
def _commit_file():
repo = self.client.get_repo(full_name)
try:
contents = repo.get_contents(path, ref=branch)
logger.debug(f"[GITHUB][REPO: {full_name}] Updating existing file {path}")
res = repo.update_file(contents.path, message, content, contents.sha, branch=branch)
return res["commit"].raw_data
except GithubException as e:
if e.status == 404:
logger.debug(f"[GITHUB][REPO: {full_name}] Creating new file {path}")
res = repo.create_file(path, message, content, branch=branch)
return res["commit"].raw_data
raise e
return await self._run_async(_commit_file)
async def create_branch(self, full_name: str, branch_name: str, from_branch: Optional[str] = None) -> dict:
logger.info(f"[GITHUB][REPO: {full_name}] Creating branch {branch_name} from {from_branch or 'default'}")
def _create_branch():
repo = self.client.get_repo(full_name)
base = from_branch or repo.default_branch
source_branch = repo.get_branch(base)
try:
ref = repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=source_branch.commit.sha)
logger.info(f"[GITHUB][REPO: {full_name}] Branch {branch_name} created")
return {"ref": ref.ref, "url": ref.url}
except GithubException as e:
if e.status == 422 and "Reference already exists" in str(e.data):
logger.info(f"[GITHUB][REPO: {full_name}] Branch {branch_name} already exists")
ref = repo.get_git_ref(f"heads/{branch_name}")
return {"ref": ref.ref, "url": ref.url}
raise e
return await self._run_async(_create_branch)
async def delete_branch(self, full_name: str, branch_name: str) -> dict:
def _delete_branch():
repo = self.client.get_repo(full_name)
ref = repo.get_git_ref(f"heads/{branch_name}")
ref.delete()
return {"status": "deleted", "branch": branch_name}
return await self._run_async(_delete_branch)
# --- Issue Operations ---
async def create_issue(self, full_name: str, title: str, body: str, labels: list[str] = []) -> int:
logger.info(f"[GITHUB][REPO: {full_name}] Creating issue: {title}")
def _create_issue():
repo = self.client.get_repo(full_name)
issue = repo.create_issue(title=title, body=body, labels=labels)
logger.info(f"[GITHUB][REPO: {full_name}] Issue #{issue.number} created")
return issue.number
return await self._run_async(_create_issue)
async def close_issue(self, full_name: str, issue_number: int, comment: str = None) -> dict:
logger.info(f"[GITHUB][REPO: {full_name}] Closing issue #{issue_number}")
def _close_issue():
repo = self.client.get_repo(full_name)
issue = repo.get_issue(number=issue_number)
if comment:
logger.debug(f"[GITHUB][REPO: {full_name}] Adding closing comment to #{issue_number}")
issue.create_comment(comment)
issue.edit(state="closed")
return issue.raw_data
return await self._run_async(_close_issue)
async def get_issue(self, full_name: str, issue_number: int) -> dict:
logger.debug(f"[GITHUB][REPO: {full_name}] Fetching issue #{issue_number}")
def _get_issue():
repo = self.client.get_repo(full_name)
issue = repo.get_issue(number=issue_number)
return issue.raw_data
return await self._run_async(_get_issue)
async def add_issue_comment(self, full_name: str, issue_number: int, body: str) -> dict:
logger.info(f"[GITHUB][REPO: {full_name}] Adding comment to issue #{issue_number}")
def _add_comment():
repo = self.client.get_repo(full_name)
issue = repo.get_issue(number=issue_number)
comment = issue.create_comment(body)
return comment.raw_data
return await self._run_async(_add_comment)
async def get_issue_comments(self, full_name: str, issue_number: int) -> list[dict]:
logger.debug(f"[GITHUB][REPO: {full_name}] Fetching comments for issue #{issue_number}")
def _get_comments():
repo = self.client.get_repo(full_name)
issue = repo.get_issue(number=issue_number)
return [c.raw_data for c in issue.get_comments()]
return await self._run_async(_get_comments)
async def list_open_issues(self, full_name: str) -> list[dict]:
logger.debug(f"[GITHUB][REPO: {full_name}] Listing open issues")
def _list_issues():
repo = self.client.get_repo(full_name)
return [i.raw_data for i in repo.get_issues(state="open")]
return await self._run_async(_list_issues)
# --- Pull Request Operations ---
async def create_pull_request(self, full_name: str, title: str, body: str, head: str, base: Optional[str] = None) -> int:
logger.info(f"[GITHUB][REPO: {full_name}] Creating Pull Request: {title} (head: {head})")
def _create_pr():
repo = self.client.get_repo(full_name)
target_base = base or repo.default_branch
try:
pr = repo.create_pull(title=title, body=body, head=head, base=target_base)
logger.info(f"[GITHUB][REPO: {full_name}] PR #{pr.number} created")
return pr.number
except GithubException as e:
if e.status == 422 and "A pull request already exists" in str(e.data):
# Find the existing PR
logger.info(f"[GITHUB][REPO: {full_name}] PR already exists for {head}, finding it...")
prs = repo.get_pulls(state="open", head=f"{repo.owner.login}:{head}", base=target_base)
for pr in prs:
logger.info(f"[GITHUB][REPO: {full_name}] Found existing PR #{pr.number}")
return pr.number
raise e
return await self._run_async(_create_pr)
async def get_pull_request(self, full_name: str, pr_number: int) -> dict:
logger.debug(f"[GITHUB][REPO: {full_name}] Fetching PR #{pr_number}")
def _get_pr():
repo = self.client.get_repo(full_name)
pr = repo.get_pull(pr_number)
return pr.raw_data
return await self._run_async(_get_pr)
async def add_pr_review(self, full_name: str, pr_number: int, body: str, event: str = "COMMENT") -> dict:
logger.info(f"[GITHUB][REPO: {full_name}] Adding PR review to #{pr_number} (event: {event})")
def _add_review():
repo = self.client.get_repo(full_name)
pr = repo.get_pull(pr_number)
review = pr.create_review(body=body, event=event)
return review.raw_data
return await self._run_async(_add_review)
async def list_pr_files(self, full_name: str, pr_number: int) -> list[dict]:
logger.debug(f"[GITHUB][REPO: {full_name}] Listing files for PR #{pr_number}")
def _list_files():
repo = self.client.get_repo(full_name)
pr = repo.get_pull(pr_number)
return [{"filename": f.filename, "status": f.status, "patch": f.patch, "additions": f.additions, "deletions": f.deletions} for f in pr.get_files()]
return await self._run_async(_list_files)
async def get_pr_diff(self, full_name: str, pr_number: int) -> str:
logger.debug(f"[GITHUB][REPO: {full_name}] Fetching diff for PR #{pr_number}")
# Using httpx to get the raw diff string directly from GitHub API
url = f"https://api.github.com/repos/{full_name}/pulls/{pr_number}"
headers = {
"Authorization": f"Bearer {settings.GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3.diff"
}
async with httpx.AsyncClient() as client:
resp = await client.get(url, headers=headers)
if resp.status_code != 200:
logger.error(f"[GITHUB][REPO: {full_name}] Failed to fetch PR diff for #{pr_number}: {resp.status_code}")
raise HTTPException(status_code=resp.status_code, detail="Failed to fetch PR diff")
return resp.text
async def get_check_run_logs(self, full_name: str, check_run_id: int) -> str:
logger.debug(f"[GITHUB][REPO: {full_name}] Fetching check run logs for {check_run_id}")
url = f"https://api.github.com/repos/{full_name}/check-runs/{check_run_id}"
headers = {
"Authorization": f"Bearer {settings.GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json"
}
async with httpx.AsyncClient() as client:
resp = await client.get(url, headers=headers)
if resp.status_code != 200:
logger.error(f"[GITHUB][REPO: {full_name}] Failed to fetch check run details for {check_run_id}: {resp.status_code}")
return "Failed to fetch check run details."
data = resp.json()
output = data.get("output", {})
title = output.get("title", "No Title")
summary = output.get("summary", "No Summary")
text = output.get("text", "No Text")
return f"Title: {title}\nSummary: {summary}\nText: {text}"
# --- Webhook Operations ---
async def register_webhook(self, full_name: str, webhook_url: str, secret: str) -> int:
logger.info(f"[GITHUB][REPO: {full_name}] Registering webhook to {webhook_url}")
def _register_hook():
repo = self.client.get_repo(full_name)
config = {
"url": webhook_url,
"content_type": "json",
"secret": secret
}
events = ["issues", "pull_request", "issue_comment", "push", "release", "check_run"]
hook = repo.create_hook("web", config, events, active=True)
logger.info(f"[GITHUB][REPO: {full_name}] Webhook registered with ID {hook.id}")
return hook.id
return await self._run_async(_register_hook)
async def delete_webhook(self, full_name: str, hook_id: int) -> dict:
logger.info(f"[GITHUB][REPO: {full_name}] Deleting webhook {hook_id}")
def _delete_hook():
repo = self.client.get_repo(full_name)
hook = repo.get_hook(hook_id)
hook.delete()
return {"status": "deleted", "hook_id": hook_id}
return await self._run_async(_delete_hook)
def verify_webhook_signature(self, payload: bytes, signature: str, secret: str) -> bool:
if not signature or not secret:
logger.warning("[GITHUB] Missing signature or secret for webhook verification")
return False
expected_signature = "sha256=" + hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
result = hmac.compare_digest(expected_signature, signature)
if not result:
logger.warning("[GITHUB] Webhook signature mismatch")
return result
# --- Release Operations ---
async def create_release(self, full_name: str, tag: str, name: str, body: str, draft: bool = False) -> str:
logger.info(f"[GITHUB][REPO: {full_name}] Creating release {tag}: {name}")
def _create_release():
repo = self.client.get_repo(full_name)
release = repo.create_git_release(tag=tag, name=name, message=body, draft=draft)
logger.info(f"[GITHUB][REPO: {full_name}] Release created: {release.html_url}")
return release.html_url
return await self._run_async(_create_release)
async def get_latest_release(self, full_name: str) -> dict | None:
logger.debug(f"[GITHUB][REPO: {full_name}] Fetching latest release")
def _get_latest():
repo = self.client.get_repo(full_name)
try:
release = repo.get_latest_release()
return release.raw_data
except GithubException as e:
if e.status == 404:
logger.debug(f"[GITHUB][REPO: {full_name}] No latest release found (404)")
return None
raise e
return await self._run_async(_get_latest)
async def get_all_tags(self, full_name: str) -> list[str]:
logger.debug(f"[GITHUB][REPO: {full_name}] Listing all tags")
def _get_tags():
repo = self.client.get_repo(full_name)
return [tag.name for tag in repo.get_tags()]
return await self._run_async(_get_tags)
github_svc = GitHubService()