File size: 5,856 Bytes
0aee47a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""

from bilibili_api import Credential



凭据操作类

"""

import re
import time
import uuid
import binascii
from typing import Union

from Cryptodome.Hash import SHA256
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_OAEP

from .credential import Credential as _Credential
from .network import Api, get_api, get_session, HEADERS

key = RSA.importKey(
    """\

-----BEGIN PUBLIC KEY-----

MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDLgd2OAkcGVtoE3ThUREbio0Eg

Uc/prcajMKXvkCKFCWhJYJcLkcM2DKKcSeFpD/j6Boy538YXnR6VhcuUJOhH2x71

nzPjfdTcqMz7djHum0qSZA0AyCBDABUqCrfNgCiJ00Ra7GmRj+YCK1NJEuewlb40

JNrRuoEUXpabUzGB8QIDAQAB

-----END PUBLIC KEY-----"""
)

API = get_api("credential")


class Credential(_Credential):
    """

    凭据操作类,用于各种请求操作。

    """

    async def check_refresh(self) -> bool:
        """

        检查是否需要刷新 cookies



        Returns:

            bool: cookies 是否需要刷新

        """
        return await check_cookies(self)

    async def refresh(self) -> None:
        """

        刷新 cookies

        """
        new_cred: Credential = await refresh_cookies(self)
        self.sessdata = new_cred.sessdata
        self.bili_jct = new_cred.bili_jct
        self.dedeuserid = new_cred.dedeuserid
        self.ac_time_value = new_cred.ac_time_value

    async def check_valid(self) -> bool:
        """

        检查 cookies 是否有效



        Returns:

            bool: cookies 是否有效

        """
        data = await Api(
            credential=self, **get_api("credential")["info"]["valid"]
        ).result
        return data["isLogin"]

    @staticmethod
    def from_cookies(cookies: dict={}) -> "Credential":
        """

        从 cookies 新建 Credential



        Args:

            cookies (dict, optional): Cookies. Defaults to {}.



        Returns:

            Credential: 凭据类

        """
        c = Credential()
        c.sessdata = cookies.get("SESSDATA")
        c.bili_jct = cookies.get("bili_jct")
        c.buvid3 = cookies.get("buvid3")
        c.dedeuserid = cookies.get("DedeUserID")
        c.ac_time_value = cookies.get("ac_time_value")
        return c


"""

Cookies 刷新相关



感谢 bilibili-API-collect 提供的刷新 Cookies 的思路



https://socialsisteryi.github.io/bilibili-API-collect/docs/login/cookie_refresh.html

"""


async def check_cookies(credential: Credential) -> bool:
    """

    检查是否需要刷新 Cookies



    Args:

        credential (Credential): 用户凭证



    Return:

        bool: 是否需要刷新 Cookies

    """
    api = API["info"]["check_cookies"]
    return (await Api(**api, credential=credential).result)["refresh"]


def getCorrespondPath() -> str:
    """

    根据时间生成 CorrespondPath



    Return:

        str: CorrespondPath

    """
    ts = round(time.time() * 1000)
    cipher = PKCS1_OAEP.new(key, SHA256)
    encrypted = cipher.encrypt(f"refresh_{ts}".encode())
    return binascii.b2a_hex(encrypted).decode()


async def get_refresh_csrf(credential: Credential) -> str:
    """

    获取刷新 Cookies 的 csrf



    Return:

        str: csrf

    """
    correspond_path = getCorrespondPath()
    api = API["operate"]["get_refresh_csrf"]
    cookies = credential.get_cookies()
    cookies["buvid3"] = str(uuid.uuid1())
    cookies["Domain"] = ".bilibili.com"
    resp = await get_session().request(
        "GET",
        api["url"].replace("{correspondPath}", correspond_path),
        cookies=cookies,
        headers=HEADERS.copy(),
    )
    if resp.status_code == 404:
        raise Exception("correspondPath 过期或错误。")
    elif resp.status_code == 200:
        text = resp.text
        refresh_csrf = re.findall('<div id="1-name">(.+?)</div>', text)[0]
        return refresh_csrf
    elif resp.status_code != 200:
        raise Exception("获取刷新 Cookies 的 csrf 失败。")


async def refresh_cookies(credential: Credential) -> Credential:
    """

    刷新 Cookies



    Args:

        credential (Credential): 用户凭证



    Return:

        Credential: 新的用户凭证

    """
    api = API["operate"]["refresh_cookies"]
    credential.raise_for_no_bili_jct()
    credential.raise_for_no_ac_time_value()
    refresh_csrf = await get_refresh_csrf(credential)
    data = {
        "csrf": credential.bili_jct,
        "refresh_csrf": refresh_csrf,
        "refresh_token": credential.ac_time_value,
        "source": "main_web",
    }
    cookies = credential.get_cookies()
    cookies["buvid3"] = str(uuid.uuid1())
    cookies["Domain"] = ".bilibili.com"
    resp = await get_session().request(
        "POST", api["url"], cookies=cookies, data=data, headers=HEADERS.copy()
    )
    if resp.status_code != 200 or resp.json()["code"] != 0:
        raise Exception("刷新 Cookies 失败")
    new_credential = Credential(
        sessdata=resp.cookies["SESSDATA"],
        bili_jct=resp.cookies["bili_jct"],
        dedeuserid=resp.cookies["DedeUserID"],
        ac_time_value=resp.json()["data"]["refresh_token"],
    )
    await confirm_refresh(credential, new_credential)
    return new_credential


async def confirm_refresh(

    old_credential: Credential, new_credential: Credential

) -> None:
    """

    让旧的refresh_token对应的 Cookie 失效



    Args:

        old_credential (Credential): 旧的用户凭证



        new_credential (Credential): 新的用户凭证

    """
    api = API["operate"]["confirm_refresh"]
    data = {
        "csrf": new_credential.bili_jct,
        "refresh_token": old_credential.ac_time_value,
    }
    await Api(**api, credential=new_credential).update_data(**data).result