File size: 5,195 Bytes
3960366
1278b3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
450bb0a
 
1278b3f
 
 
 
 
 
 
 
 
 
 
450bb0a
1278b3f
 
 
 
 
 
 
 
 
3960366
1278b3f
 
 
 
 
 
 
 
 
 
 
3960366
 
1278b3f
450bb0a
1278b3f
 
 
 
 
 
3960366
1278b3f
 
 
 
3960366
 
1278b3f
 
 
 
 
3960366
 
1278b3f
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import asyncio
import base64
from datetime import datetime, timezone, timedelta
import jwt
import threading
import time
from typing import List
import requests
from config import APP_ID, APP_PRIVATE_KEY


installation_tokens = {}
token_lock = threading.Lock()


def generate_jwt():
    """Generate a JWT signed with GitHub App private key."""
    now = int(time.time())
    payload = {
        "iat": now,
        "exp": now + (10 * 60),
        "iss": APP_ID,
    }
    encoded_jwt = jwt.encode(payload, APP_PRIVATE_KEY, algorithm="RS256")
    return encoded_jwt


def github_request(method, url, headers=None, **kwargs):
    if headers is None:
        jwt_token = generate_jwt()
        headers = {
            "Authorization": f"Bearer {jwt_token}",
            "Accept": "application/vnd.github.v3+json",
        }
    while True:
        response = requests.request(method, url, headers=headers, **kwargs)
        
        remaining = response.headers.get("X-RateLimit-Remaining")
        reset_time = response.headers.get("X-RateLimit-Reset")

        if remaining is None or reset_time is None:
            return response
        
        remaining = int(remaining)
        reset_time = int(reset_time)

        print(f"[GitHub] Remaining: {remaining}, Reset: {reset_time}")

        if response.status_code == 403 and "rate limit" in response.text.lower():
            wait = reset_time - int(time.time()) + 5
            print(f"Hit rate limit. Sleeping for {wait} seconds.")
            time.sleep(max(wait, 0))
            continue
        if remaining <= 2:
            wait = reset_time - int(time.time()) + 5
            print(f"Approaching rate limit ({remaining} left). Sleeping for {wait} seconds.")
            time.sleep(max(wait, 0))
            continue

        return response

    
def get_installation_id(owner, repo):
    """Fetch the installation ID for the app on a repo."""
    url = f"https://api.github.com/repos/{owner}/{repo}/installation"
    response = github_request("GET", url)
    if response.status_code == 200:
        data = response.json()
        return data["id"]
    else:
        raise Exception(f"Failed to get installation ID for {owner}/{repo}: {response.status_code}. Please [install the GitHub App](https://github.com/apps/opensorus) on this repository before using the agent.")


def get_installation_token(installation_id):
    """Return a valid installation token, fetch new if expired or missing."""
    with token_lock:
        token_info = installation_tokens.get(installation_id)
        if token_info and token_info["expires_at"] > datetime.now(timezone.utc) + timedelta(seconds=30):
            return token_info["token"]

        url = f"https://api.github.com/app/installations/{installation_id}/access_tokens"
        response = github_request("POST", url)
        if response.status_code != 201:
            raise Exception(f"Failed to fetch installation token: {response.status_code}. Please [install the GitHub App](https://github.com/apps/opensorus) on this repository before using the agent.")

        token_data = response.json()
        token = token_data["token"]
        expires_at = datetime.strptime(token_data["expires_at"], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)

        installation_tokens[installation_id] = {"token": token, "expires_at": expires_at}
        return token


async def fetch_repo_files(owner: str, repo: str, ref: str = "main") -> List[str]:
    """
    Lists all files in the repository by recursively fetching the Git tree from GitHub API.
    Returns a list of file paths.
    """
    installation_id = get_installation_id(owner, repo)
    token = get_installation_token(installation_id)
    url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{ref}?recursive=1"
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/vnd.github.v3+json"
    }

    response = await asyncio.to_thread(github_request, "GET", url, headers=headers)
    if response.status_code != 200:
        raise Exception(f"Failed to fetch repository files: {response.status_code}. Please ensure the branch name is correct and files exist in this branch.")

    tree = response.json().get("tree", [])
    file_paths = [item["path"] for item in tree if item["type"] == "blob"]
    return file_paths


async def fetch_file_content(owner: str, repo: str, path: str, ref: str = "main") -> str:
    """
    Fetches the content of a file from the GitHub repository.
    """
    installation_id = get_installation_id(owner, repo)
    token = await asyncio.to_thread(get_installation_token, installation_id)

    url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={ref}"
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/vnd.github.v3+json"
    }

    response = await asyncio.to_thread(github_request, "GET", url, headers=headers)
    if response.status_code != 200:
        raise Exception(f"Failed to fetch file content {path}: {response.status_code} {response.text}")

    content_json = response.json()
    content = base64.b64decode(content_json["content"]).decode("utf-8", errors="ignore")
    return content