File size: 5,195 Bytes
3960366 1278b3f 450bb0a 1278b3f 450bb0a 1278b3f 3960366 1278b3f 3960366 1278b3f 450bb0a 1278b3f 3960366 1278b3f 3960366 1278b3f 3960366 1278b3f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import asyncio
import base64
from datetime import datetime, timezone, timedelta
import jwt
import threading
import time
from typing import List
import requests
from config import APP_ID, APP_PRIVATE_KEY
installation_tokens = {}
token_lock = threading.Lock()
def generate_jwt():
"""Generate a JWT signed with GitHub App private key."""
now = int(time.time())
payload = {
"iat": now,
"exp": now + (10 * 60),
"iss": APP_ID,
}
encoded_jwt = jwt.encode(payload, APP_PRIVATE_KEY, algorithm="RS256")
return encoded_jwt
def github_request(method, url, headers=None, **kwargs):
if headers is None:
jwt_token = generate_jwt()
headers = {
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github.v3+json",
}
while True:
response = requests.request(method, url, headers=headers, **kwargs)
remaining = response.headers.get("X-RateLimit-Remaining")
reset_time = response.headers.get("X-RateLimit-Reset")
if remaining is None or reset_time is None:
return response
remaining = int(remaining)
reset_time = int(reset_time)
print(f"[GitHub] Remaining: {remaining}, Reset: {reset_time}")
if response.status_code == 403 and "rate limit" in response.text.lower():
wait = reset_time - int(time.time()) + 5
print(f"Hit rate limit. Sleeping for {wait} seconds.")
time.sleep(max(wait, 0))
continue
if remaining <= 2:
wait = reset_time - int(time.time()) + 5
print(f"Approaching rate limit ({remaining} left). Sleeping for {wait} seconds.")
time.sleep(max(wait, 0))
continue
return response
def get_installation_id(owner, repo):
"""Fetch the installation ID for the app on a repo."""
url = f"https://api.github.com/repos/{owner}/{repo}/installation"
response = github_request("GET", url)
if response.status_code == 200:
data = response.json()
return data["id"]
else:
raise Exception(f"Failed to get installation ID for {owner}/{repo}: {response.status_code}. Please [install the GitHub App](https://github.com/apps/opensorus) on this repository before using the agent.")
def get_installation_token(installation_id):
"""Return a valid installation token, fetch new if expired or missing."""
with token_lock:
token_info = installation_tokens.get(installation_id)
if token_info and token_info["expires_at"] > datetime.now(timezone.utc) + timedelta(seconds=30):
return token_info["token"]
url = f"https://api.github.com/app/installations/{installation_id}/access_tokens"
response = github_request("POST", url)
if response.status_code != 201:
raise Exception(f"Failed to fetch installation token: {response.status_code}. Please [install the GitHub App](https://github.com/apps/opensorus) on this repository before using the agent.")
token_data = response.json()
token = token_data["token"]
expires_at = datetime.strptime(token_data["expires_at"], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
installation_tokens[installation_id] = {"token": token, "expires_at": expires_at}
return token
async def fetch_repo_files(owner: str, repo: str, ref: str = "main") -> List[str]:
"""
Lists all files in the repository by recursively fetching the Git tree from GitHub API.
Returns a list of file paths.
"""
installation_id = get_installation_id(owner, repo)
token = get_installation_token(installation_id)
url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{ref}?recursive=1"
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github.v3+json"
}
response = await asyncio.to_thread(github_request, "GET", url, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to fetch repository files: {response.status_code}. Please ensure the branch name is correct and files exist in this branch.")
tree = response.json().get("tree", [])
file_paths = [item["path"] for item in tree if item["type"] == "blob"]
return file_paths
async def fetch_file_content(owner: str, repo: str, path: str, ref: str = "main") -> str:
"""
Fetches the content of a file from the GitHub repository.
"""
installation_id = get_installation_id(owner, repo)
token = await asyncio.to_thread(get_installation_token, installation_id)
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={ref}"
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github.v3+json"
}
response = await asyncio.to_thread(github_request, "GET", url, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to fetch file content {path}: {response.status_code} {response.text}")
content_json = response.json()
content = base64.b64decode(content_json["content"]).decode("utf-8", errors="ignore")
return content
|