File size: 4,258 Bytes
d80253f
 
 
 
 
b9b07ad
 
 
 
 
 
d80253f
 
 
b9b07ad
 
 
d80253f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3823d44
 
 
 
2e4b438
 
 
 
 
 
 
 
 
 
3823d44
 
d80253f
 
5c5d1c5
d80253f
 
 
 
 
 
 
5c5d1c5
d80253f
 
 
 
 
b9b07ad
 
d80253f
 
 
b9b07ad
 
 
 
 
d80253f
 
 
 
 
 
 
 
 
 
 
 
 
 
5c5d1c5
2e4b438
 
 
5c5d1c5
 
 
d80253f
 
 
 
 
 
 
 
 
 
 
 
 
 
b9b07ad
 
 
 
 
 
d80253f
b9b07ad
 
 
 
 
 
d80253f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from dataclasses import dataclass

import requests


@dataclass
class User:
    name: str
    organizations: list[str]


@dataclass
class Commit:
    message: str
    user: User
    additions: int
    deletions: int


def call_with_query(query, token):
    url = 'https://api.github.com/graphql'
    r = requests.post(url, json={'query': query}, headers={'Authorization': f'Bearer {token}'})
    return r.json()


def get_tag_commit_date(token, repository, tag):
    owner, name = repository.split('/')
    query = f"""
    query GetTagCommit {{
        repository(owner: "{owner}", name: "{name}"){{
            object(expression: "{tag}") {{
                ... on Commit {{
                    oid
                    message
                    committedDate
                    author {{
                        user {{
                            login
                        }}
                    }}
                }}
            }}
        }}
    }}
    """

    response = call_with_query(query, token)

    try:
        repository = response['data']['repository']['object']

        if repository is None:
            if 'errors' in response:
                raise ValueError(response['errors'][0]['message'])
            raise ValueError('Invalid tag. Does this tag exist?')

        committed_date = repository['committedDate']
    except (KeyError, TypeError):
        raise ValueError('Invalid token. Does your token have the valid permissions?')

    return committed_date


def get_commits(token, repository, branch, since):
    owner, name = repository.split('/')

    def get_page_result(next_page=''):
        query = f"""
        query GetCommits {{
            repository(owner: "{owner}", name: "{name}"){{
                nameWithOwner
                object(expression: "{branch}") {{
                    ... on Commit {{
                        oid
                        history(first: 100, since: "{since}"{next_page}) {{
                            nodes {{
                                message
                                deletions
                                additions
                                author {{
                                    user {{
                                        login
                                        organizations(first: 100) {{
                                            nodes {{
                                                name
                                            }}
                                        }}
                                    }}
                                }}
                            }}
                            pageInfo {{
                                endCursor
                                hasNextPage
                            }}
                        }}
                    }}
                }}
            }}
        }}
        """
        result = call_with_query(query, token)

        if 'data' not in result:
            raise ValueError(result['errors'][0]['message'])

        if result['data']['repository']['object'] is None:
            raise ValueError("Either the tag or the branch were incorrect.")

        nodes = result['data']['repository']['object']['history']['nodes']
        cursor = result['data']['repository']['object']['history']['pageInfo']['endCursor']
        return nodes, cursor

    nodes, cursor = get_page_result()

    while cursor is not None:
        _nodes, cursor = get_page_result(f' after:"{cursor}"')
        nodes.extend(_nodes)


    commits = []
    for node in nodes:
        if node['author']['user'] is None:
            commits.append(Commit(
                message=node['message'].split('\n')[0],
                user=User(name='<NOT FOUND>', organizations=[]),
                additions=node.get('additions'),
                deletions=node.get('deletions')
            ))
        else:
            commits.append(Commit(
                message=node['message'].split('\n')[0],
                user=User(name=node['author']['user']['login'], organizations=[n['name'] for n in node['author']['user']['organizations']['nodes']]),
                additions=node.get('additions'),
                deletions=node.get('deletions')
            ))

    return commits