Spaces:
Sleeping
Sleeping
| """ | |
| GitHub App Authentication and API Module for Codesage | |
| Handles JWT generation, installation token exchange, and GitHub API calls | |
| """ | |
| import jwt | |
| import time | |
| import requests | |
| from typing import Dict, List, Optional | |
| from datetime import datetime, timedelta | |
| import os | |
| class GitHubAppAuth: | |
| """ | |
| Enterprise-grade GitHub App authentication | |
| No PATs, no user passwords - just secure app-level access | |
| """ | |
| def __init__(self, app_id: str, private_key: str): | |
| """ | |
| Initialize GitHub App authentication | |
| Args: | |
| app_id: Your GitHub App ID | |
| private_key: Contents of your .pem private key file | |
| """ | |
| self.app_id = app_id | |
| self.private_key = private_key | |
| self.base_url = "https://api.github.com" | |
| def generate_jwt(self) -> str: | |
| """ | |
| Generate JWT for GitHub App authentication | |
| Following official GitHub documentation | |
| JWT Rules: | |
| - Algorithm: RS256 | |
| - Expiry: 10 minutes maximum | |
| - Issued at: Current time | |
| Returns: | |
| JWT token string | |
| """ | |
| # Match official GitHub documentation exactly | |
| # Using 5 minutes instead of 10 to account for clock drift | |
| payload = { | |
| # Issued at time | |
| 'iat': int(time.time()), | |
| # JWT expiration time (5 minutes to avoid clock drift issues) | |
| 'exp': int(time.time()) + 300, | |
| # GitHub App's client ID (App ID) | |
| 'iss': self.app_id | |
| } | |
| # Generate JWT using RS256 algorithm | |
| # Private key is already read as string, which PyJWT handles correctly | |
| jwt_token = jwt.encode(payload, self.private_key, algorithm="RS256") | |
| return jwt_token | |
| def get_installation_token(self, installation_id: int) -> Dict: | |
| """ | |
| Exchange JWT for Installation Access Token | |
| This token: | |
| - Can access private repos | |
| - Is scoped to permissions you selected | |
| - Expires automatically (usually 1 hour) | |
| Args: | |
| installation_id: The installation ID from org install | |
| Returns: | |
| Dict with 'token', 'expires_at', and other metadata | |
| """ | |
| jwt_token = self.generate_jwt() | |
| url = f"{self.base_url}/app/installations/{installation_id}/access_tokens" | |
| headers = { | |
| "Authorization": f"Bearer {jwt_token}", | |
| "Accept": "application/vnd.github+json", | |
| "X-GitHub-Api-Version": "2022-11-28" | |
| } | |
| response = requests.post(url, headers=headers) | |
| response.raise_for_status() | |
| return response.json() | |
| def get_installation_repositories(self, installation_token: str) -> List[Dict]: | |
| """ | |
| List all repositories accessible to this installation | |
| Args: | |
| installation_token: Active installation access token | |
| Returns: | |
| List of repository objects | |
| """ | |
| url = f"{self.base_url}/installation/repositories" | |
| headers = { | |
| "Authorization": f"Bearer {installation_token}", | |
| "Accept": "application/vnd.github+json", | |
| "X-GitHub-Api-Version": "2022-11-28" | |
| } | |
| response = requests.get(url, headers=headers) | |
| response.raise_for_status() | |
| return response.json().get("repositories", []) | |
| class GitHubInsights: | |
| """ | |
| Fetch powerful insights from GitHub repositories | |
| All read-only, zero-risk operations | |
| """ | |
| def __init__(self, installation_token: str): | |
| """ | |
| Initialize with an active installation token | |
| Args: | |
| installation_token: Valid installation access token | |
| """ | |
| self.token = installation_token | |
| self.base_url = "https://api.github.com" | |
| self.headers = { | |
| "Authorization": f"Bearer {installation_token}", | |
| "Accept": "application/vnd.github+json", | |
| "X-GitHub-Api-Version": "2022-11-28" | |
| } | |
| def get_commits(self, org: str, repo: str, per_page: int = 30) -> List[Dict]: | |
| """ | |
| Fetch recent commits from a repository | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| per_page: Number of commits to fetch (default 30) | |
| Returns: | |
| List of commit objects | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}/commits" | |
| params = {"per_page": per_page} | |
| response = requests.get(url, headers=self.headers, params=params) | |
| if response.status_code == 409: | |
| # Empty repo or missing default branch | |
| return [] | |
| response.raise_for_status() | |
| return response.json() | |
| def get_pull_requests(self, org: str, repo: str, state: str = "all") -> List[Dict]: | |
| """ | |
| Fetch pull requests from a repository | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| state: "open", "closed", or "all" (default "all") | |
| Returns: | |
| List of pull request objects | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}/pulls" | |
| params = {"state": state, "per_page": 100} | |
| response = requests.get(url, headers=self.headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| def get_issues(self, org: str, repo: str, state: str = "all") -> List[Dict]: | |
| """ | |
| Fetch issues from a repository | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| state: "open", "closed", or "all" (default "all") | |
| Returns: | |
| List of issue objects | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}/issues" | |
| params = {"state": state, "per_page": 100} | |
| response = requests.get(url, headers=self.headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| def get_contributors(self, org: str, repo: str) -> List[Dict]: | |
| """ | |
| Fetch contributors and their stats | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| Returns: | |
| List of contributor objects with stats | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}/contributors" | |
| params = {"per_page": 100} | |
| response = requests.get(url, headers=self.headers, params=params) | |
| if response.status_code == 204 or not response.text: | |
| return [] | |
| response.raise_for_status() | |
| return response.json() | |
| def get_code_frequency(self, org: str, repo: str) -> List[List[int]]: | |
| """ | |
| Get code frequency stats (additions/deletions per week) | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| Returns: | |
| List of [timestamp, additions, deletions] arrays | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}/stats/code_frequency" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 202 or response.status_code == 204 or not response.text: | |
| return [] | |
| response.raise_for_status() | |
| return response.json() | |
| def get_commit_activity(self, org: str, repo: str) -> List[Dict]: | |
| """ | |
| Get commit activity by week for the last year | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| Returns: | |
| List of activity objects with days and total | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}/stats/commit_activity" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 202 or response.status_code == 204 or not response.text: | |
| return [] | |
| response.raise_for_status() | |
| return response.json() | |
| def get_repository_info(self, org: str, repo: str) -> Dict: | |
| """ | |
| Get detailed repository information | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| Returns: | |
| Repository object with metadata | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}" | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| return response.json() | |
| def get_languages(self, org: str, repo: str) -> Dict[str, int]: | |
| """ | |
| Get programming languages used in the repository | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| Returns: | |
| Dict mapping language name to bytes of code | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}/languages" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 204 or not response.text: | |
| return {} | |
| response.raise_for_status() | |
| return response.json() | |
| def get_workflow_runs(self, org: str, repo: str) -> List[Dict]: | |
| """ | |
| Get GitHub Actions workflow runs (CI/CD insights) | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| Returns: | |
| List of workflow run objects | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}/actions/runs" | |
| params = {"per_page": 50} | |
| response = requests.get(url, headers=self.headers, params=params) | |
| response.raise_for_status() | |
| return response.json().get("workflow_runs", []) | |
| def get_dependabot_alerts(self, org: str, repo: str) -> List[Dict]: | |
| """ | |
| Get Dependabot security alerts | |
| Args: | |
| org: Organization name | |
| repo: Repository name | |
| Returns: | |
| List of Dependabot alert objects | |
| """ | |
| url = f"{self.base_url}/repos/{org}/{repo}/dependabot/alerts" | |
| response = requests.get(url, headers=self.headers) | |
| # Not all repos have this enabled | |
| if response.status_code == 404: | |
| return [] | |
| response.raise_for_status() | |
| return response.json() | |
| class TokenManager: | |
| """ | |
| Manages installation tokens and automatic refresh | |
| Tokens expire after ~1 hour, this handles recreation | |
| """ | |
| def __init__(self, github_auth: GitHubAppAuth): | |
| self.github_auth = github_auth | |
| self.token_cache: Dict[int, Dict] = {} | |
| def get_token(self, installation_id: int) -> str: | |
| """ | |
| Get valid installation token, refreshing if needed | |
| Args: | |
| installation_id: The installation ID | |
| Returns: | |
| Valid installation access token | |
| """ | |
| cached = self.token_cache.get(installation_id) | |
| # Check if we have a cached token that's still valid | |
| if cached: | |
| expires_at = datetime.fromisoformat(cached["expires_at"].replace("Z", "+00:00")) | |
| # Refresh if less than 5 minutes remaining | |
| if datetime.now(expires_at.tzinfo) < expires_at - timedelta(minutes=5): | |
| return cached["token"] | |
| # Get fresh token | |
| token_data = self.github_auth.get_installation_token(installation_id) | |
| self.token_cache[installation_id] = token_data | |
| return token_data["token"] | |