""" bilibili_api.cheese 有关 bilibili 课程的 api。 注意,注意!课程中的视频和其他视频几乎没有任何相通的 API! 不能将 CheeseVideo 换成 Video 类。(CheeseVideo 类保留了所有的通用的 API) 获取下载链接需要使用 bilibili_api.cheese.get_download_url,video.get_download_url 不适用。 还有,课程的 season_id 和 ep_id 不与番剧相通,井水不犯河水,请不要错用! """ import json import datetime from typing import Any, List, Union from . import settings from .utils.utils import get_api from .utils.danmaku import Danmaku from .utils.credential import Credential from .utils.BytesReader import BytesReader from .exceptions.ArgsException import ArgsException from .utils.network import Api, get_session from .exceptions import NetworkException, ResponseException, DanmakuClosedException API = get_api("cheese") API_video = get_api("video") cheese_video_meta_cache = {} class CheeseList: """ 课程类 Attributes: credential (Credential): 凭据类 """ def __init__( self, season_id: int = -1, ep_id: int = -1, credential: Union[Credential, None] = None, ): """ Args: season_id (int) : ssid ep_id (int) : 单集 ep_id credential (Credential): 凭据类 注意:season_id 和 ep_id 任选一个即可,两个都选的话 以 season_id 为主 """ if (season_id == -1) and (ep_id == -1): raise ValueError("season id 和 ep id 必须选一个") self.__season_id = season_id self.__ep_id = ep_id self.credential = credential if credential else Credential() if self.__season_id == -1: # self.season_id = str(sync(self.get_meta())["season_id"]) api = API["info"]["meta"] params = {"season_id": self.__season_id, "ep_id": self.__ep_id} meta = Api(**api, credential=self.credential).update_params(**params).result_sync self.__season_id = int(meta["season_id"]) def set_season_id(self, season_id: int) -> None: self.__init__(season_id=season_id) def set_ep_id(self, ep_id: int) -> None: self.__init__(ep_id=ep_id) def get_season_id(self) -> int: return self.__season_id async def get_meta(self) -> dict: """ 获取教程元数据 Returns: 调用 API 所得的结果。 """ api = API["info"]["meta"] params = {"season_id": self.__season_id, "ep_id": self.__ep_id} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_list_raw(self): """ 获取教程所有视频 (返回原始数据) Returns: dict: 调用 API 返回的结果 """ api = API["info"]["list"] params = {"season_id": self.__season_id, "pn": 1, "ps": 1000} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_list(self) -> List["CheeseVideo"]: """ 获取教程所有视频 Returns: List[CheeseVideo]: 课程视频列表 """ global cheese_video_meta_cache api = API["info"]["list"] params = {"season_id": self.__season_id, "pn": 1, "ps": 1000} lists = ( await Api(**api, credential=self.credential).update_params(**params).result ) cheese_videos = [] for c in lists["items"]: c["ssid"] = self.get_season_id() cheese_video_meta_cache[c["id"]] = c cheese_videos.append(CheeseVideo(c["id"], self.credential)) return cheese_videos class CheeseVideo: """ 教程视频类 因为不和其他视频相通,所以这里是一个新的类,无继承 Attributes: credential (Credential): 凭据类 cheese (CheeseList): 所属的课程 """ def __init__(self, epid, credential: Union[Credential, None] = None): """ Args: epid (int) : 单集 ep_id credential (Credential): 凭据类 """ global cheese_video_meta_cache self.__epid = epid meta = cheese_video_meta_cache.get(epid) if meta == None: self.cheese = CheeseList(ep_id=self.__epid) else: self.cheese = CheeseList(season_id=meta["ssid"]) self.credential = credential if credential else Credential() if meta == None: api = API["info"]["meta"] params = {"season_id": self.cheese.get_season_id(), "ep_id": self.__epid} metadata = Api(**api).update_params(**params).result_sync for v in metadata["episodes"]: if v["id"] == epid: self.__aid = v["aid"] self.__cid = v["cid"] self.__meta = v else: self.__meta = meta self.__aid = meta["aid"] self.__cid = meta["cid"] def get_aid(self) -> int: return self.__aid def get_cid(self) -> int: return self.__cid def get_meta(self) -> dict: """ 获取课程元数据 Returns: dict: 视频元数据 """ return self.__meta def get_cheese(self) -> "CheeseList": """ 获取所属课程 Returns: CheeseList: 所属课程 """ return self.cheese def set_epid(self, epid: int) -> None: self.__init__(epid, self.credential) def get_epid(self) -> int: return self.__epid async def get_download_url(self) -> dict: """ 获取下载链接 Returns: dict: 调用 API 返回的结果。 """ api = API["info"]["playurl"] params = { "avid": self.get_aid(), "ep_id": self.get_epid(), "cid": self.get_cid(), "qn": 127, "fnval": 4048, "fourk": 1, } return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_stat(self) -> dict: """ 获取视频统计数据(播放量,点赞数等)。 Returns: dict: 调用 API 返回的结果。 """ api = API_video["info"]["stat"] params = {"aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_pages(self) -> dict: """ 获取分 P 信息。 Returns: dict: 调用 API 返回的结果。 """ api = API_video["info"]["pages"] params = {"aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_danmaku_view(self) -> dict: """ 获取弹幕设置、特殊弹幕、弹幕数量、弹幕分段等信息。 Returns: dict: 调用 API 返回的结果。 """ cid = self.get_cid() session = get_session() api = API_video["danmaku"]["view"] config = {} config["url"] = api["url"] config["params"] = {"type": 1, "oid": cid, "pid": self.get_aid()} config["cookies"] = self.credential.get_cookies() config["headers"] = { "Referer": "https://www.bilibili.com", "User-Agent": "Mozilla/5.0", } try: resp = await session.get(**config) except Exception as e: raise NetworkException(-1, str(e)) resp_data = resp.read() json_data = {} reader = BytesReader(resp_data) # 解析二进制数据流 def read_dm_seg(stream: bytes): reader_ = BytesReader(stream) data = {} while not reader_.has_end(): t = reader_.varint() >> 3 if t == 1: data["page_size"] = reader_.varint() elif t == 2: data["total"] = reader_.varint() else: continue return data def read_flag(stream: bytes): reader_ = BytesReader(stream) data = {} while not reader_.has_end(): t = reader_.varint() >> 3 if t == 1: data["rec_flag"] = reader_.varint() elif t == 2: data["rec_text"] = reader_.string() elif t == 3: data["rec_switch"] = reader_.varint() else: continue return data def read_command_danmakus(stream: bytes): reader_ = BytesReader(stream) data = {} while not reader_.has_end(): t = reader_.varint() >> 3 if t == 1: data["id"] = reader_.varint() elif t == 2: data["oid"] = reader_.varint() elif t == 3: data["mid"] = reader_.varint() elif t == 4: data["commend"] = reader_.string() elif t == 5: data["content"] = reader_.string() elif t == 6: data["progress"] = reader_.varint() elif t == 7: data["ctime"] = reader_.string() elif t == 8: data["mtime"] = reader_.string() elif t == 9: data["extra"] = json.loads(reader_.string()) elif t == 10: data["id_str"] = reader_.string() else: continue return data def read_settings(stream: bytes): reader_ = BytesReader(stream) data = {} while not reader_.has_end(): t = reader_.varint() >> 3 if t == 1: data["dm_switch"] = reader_.bool() elif t == 2: data["ai_switch"] = reader_.bool() elif t == 3: data["ai_level"] = reader_.varint() elif t == 4: data["enable_top"] = reader_.bool() elif t == 5: data["enable_scroll"] = reader_.bool() elif t == 6: data["enable_bottom"] = reader_.bool() elif t == 7: data["enable_color"] = reader_.bool() elif t == 8: data["enable_special"] = reader_.bool() elif t == 9: data["prevent_shade"] = reader_.bool() elif t == 10: data["dmask"] = reader_.bool() elif t == 11: data["opacity"] = reader_.float(True) elif t == 12: data["dm_area"] = reader_.varint() elif t == 13: data["speed_plus"] = reader_.float(True) elif t == 14: data["font_size"] = reader_.float(True) elif t == 15: data["screen_sync"] = reader_.bool() elif t == 16: data["speed_sync"] = reader_.bool() elif t == 17: data["font_family"] = reader_.string() elif t == 18: data["bold"] = reader_.bool() elif t == 19: data["font_border"] = reader_.varint() elif t == 20: data["draw_type"] = reader_.string() else: continue return data def read_image_danmakus(string: bytes): image_list = [] reader_ = BytesReader(string) while not reader_.has_end(): type_ = reader_.varint() >> 3 if type_ == 1: details_dict = {} details_dict["texts"] = [] img_details = reader_.bytes_string() reader_details = BytesReader(img_details) while not reader_details.has_end(): type_details = reader_details.varint() >> 3 if type_details == 1: details_dict["texts"].append(reader_details.string()) elif type_details == 2: details_dict["image"] = reader_details.string() elif type_details == 3: id_string = reader_details.bytes_string() id_reader = BytesReader(id_string) while not id_reader.has_end(): type_id = id_reader.varint() >> 3 if type_id == 2: details_dict["id"] = id_reader.varint() else: raise ResponseException("解析响应数据错误") image_list.append(details_dict) else: raise ResponseException("解析响应数据错误") return image_list while not reader.has_end(): type_ = reader.varint() >> 3 if type_ == 1: json_data["state"] = reader.varint() elif type_ == 2: json_data["text"] = reader.string() elif type_ == 3: json_data["text_side"] = reader.string() elif type_ == 4: json_data["dm_seg"] = read_dm_seg(reader.bytes_string()) elif type_ == 5: json_data["flag"] = read_flag(reader.bytes_string()) elif type_ == 6: if "special_dms" not in json_data: json_data["special_dms"] = [] json_data["special_dms"].append(reader.string()) elif type_ == 7: json_data["check_box"] = reader.bool() elif type_ == 8: json_data["count"] = reader.varint() elif type_ == 9: if "command_dms" not in json_data: json_data["command_dms"] = [] json_data["command_dms"].append( read_command_danmakus(reader.bytes_string()) ) elif type_ == 10: json_data["dm_setting"] = read_settings(reader.bytes_string()) elif type_ == 12: json_data["image_dms"] = read_image_danmakus(reader.bytes_string()) else: continue return json_data async def get_danmakus(self, date: Union[datetime.date, None] = None): """ 获取弹幕。 Args: date (datetime.Date | None, optional): 指定日期后为获取历史弹幕,精确到年月日。Defaults to None. Returns: List[Danmaku]: Danmaku 类的列表。 """ if date is not None: self.credential.raise_for_no_sessdata() # self.credential.raise_for_no_sessdata() session = get_session() aid = self.get_aid() params: dict[str, Any] = {"oid": self.get_cid(), "type": 1, "pid": aid} if date is not None: # 获取历史弹幕 api = API_video["danmaku"]["get_history_danmaku"] params["date"] = date.strftime("%Y-%m-%d") params["type"] = 1 all_seg = 1 else: api = API_video["danmaku"]["get_danmaku"] view = await self.get_danmaku_view() all_seg = view["dm_seg"]["total"] danmakus = [] for seg in range(all_seg): if date is None: # 仅当获取当前弹幕时需要该参数 params["segment_index"] = seg + 1 config = {} config["url"] = api["url"] config["params"] = params config["headers"] = { "Referer": "https://www.bilibili.com", "User-Agent": "Mozilla/5.0", } config["cookies"] = self.credential.get_cookies() try: req = await session.get(**config) except Exception as e: raise NetworkException(-1, str(e)) if "content-type" not in req.headers.keys(): break else: content_type = req.headers["content-type"] if content_type != "application/octet-stream": raise ResponseException("返回数据类型错误:") # 解析二进制流数据 data = req.read() if data == b"\x10\x01": # 视频弹幕被关闭 raise DanmakuClosedException() reader = BytesReader(data) while not reader.has_end(): type_ = reader.varint() >> 3 if type_ != 1: if type_ == 4: reader.bytes_string() # 什么鬼?我用 protoc 解析出乱码! elif type_ == 5: # 大会员专属颜色 reader.varint() reader.varint() reader.varint() reader.bytes_string() elif type_ == 13: # ??? continue else: raise ResponseException("解析响应数据错误") dm = Danmaku("") dm_pack_data = reader.bytes_string() dm_reader = BytesReader(dm_pack_data) while not dm_reader.has_end(): data_type = dm_reader.varint() >> 3 if data_type == 1: dm.id_ = dm_reader.varint() elif data_type == 2: dm.dm_time = dm_reader.varint() / 1000 elif data_type == 3: dm.mode = dm_reader.varint() elif data_type == 4: dm.font_size = dm_reader.varint() elif data_type == 5: dm.color = hex(dm_reader.varint())[2:] elif data_type == 6: dm.crc32_id = dm_reader.string() elif data_type == 7: dm.text = dm_reader.string() elif data_type == 8: dm.send_time = dm_reader.varint() elif data_type == 9: dm.weight = dm_reader.varint() elif data_type == 10: dm.action = str(dm_reader.varint()) elif data_type == 11: dm.pool = dm_reader.varint() elif data_type == 12: dm.id_str = dm_reader.string() elif data_type == 13: dm.attr = dm_reader.varint() elif data_type == 14: dm.uid = dm_reader.varint() elif data_type == 15: dm_reader.varint() elif data_type == 20: dm_reader.bytes_string() elif data_type == 21: dm_reader.bytes_string() else: break danmakus.append(dm) return danmakus async def get_pbp(self): """ 获取高能进度条 Returns: 调用 API 返回的结果 """ cid = self.get_cid() api = API_video["info"]["pbp"] params = {"cid": cid} session = get_session() return json.loads( ( await session.get( api["url"], params=params, cookies=self.credential.get_cookies() ) ).text ) async def send_danmaku(self, danmaku: Union[Danmaku, None] = None): """ 发送弹幕。 Args: danmaku (Danmaku | None): Danmaku 类。Defaults to None. Returns: dict: 调用 API 返回的结果。 """ danmaku = danmaku if danmaku else Danmaku("") self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = API_video["danmaku"]["send_danmaku"] if danmaku.is_sub: pool = 1 else: pool = 0 data = { "type": 1, "oid": self.get_cid(), "msg": danmaku.text, "aid": self.get_aid(), "progress": int(danmaku.dm_time * 1000), "color": int(danmaku.color, 16), "fontsize": danmaku.font_size, "pool": pool, "mode": danmaku.mode, "plat": 1, } return await Api(**api, credential=self.credential).update_data(**data).result async def has_liked(self): """ 视频是否点赞过。 Returns: bool: 视频是否点赞过。 """ self.credential.raise_for_no_sessdata() api = API_video["info"]["has_liked"] params = {"aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result == 1 ) async def get_pay_coins(self): """ 获取视频已投币数量。 Returns: int: 视频已投币数量。 """ self.credential.raise_for_no_sessdata() api = API_video["info"]["get_pay_coins"] params = {"aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result )["multiply"] async def has_favoured(self): """ 是否已收藏。 Returns: bool: 视频是否已收藏。 """ self.credential.raise_for_no_sessdata() api = API_video["info"]["has_favoured"] params = {"aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result )["favoured"] async def like(self, status: bool = True): """ 点赞视频。 Args: status (bool, optional): 点赞状态。Defaults to True. Returns: dict: 调用 API 返回的结果。 """ self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = API_video["operate"]["like"] data = {"aid": self.get_aid(), "like": 1 if status else 2} return await Api(**api, credential=self.credential).update_data(**data).result async def pay_coin(self, num: int = 1, like: bool = False): """ 投币。 Args: num (int, optional) : 硬币数量,为 1 ~ 2 个。Defaults to 1. like (bool, optional): 是否同时点赞。Defaults to False. Returns: dict: 调用 API 返回的结果。 """ self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() if num not in (1, 2): raise ArgsException("投币数量只能是 1 ~ 2 个。") api = API_video["operate"]["coin"] data = { "aid": self.get_aid(), "multiply": num, "like": 1 if like else 0, } return await Api(**api, credential=self.credential).update_data(**data).result async def set_favorite( self, add_media_ids: List[int] = [], del_media_ids: List[int] = [] ): """ 设置视频收藏状况。 Args: add_media_ids (List[int], optional): 要添加到的收藏夹 ID. Defaults to []. del_media_ids (List[int], optional): 要移出的收藏夹 ID. Defaults to []. Returns: dict: 调用 API 返回结果。 """ if len(add_media_ids) + len(del_media_ids) == 0: raise ArgsException("对收藏夹无修改。请至少提供 add_media_ids 和 del_media_ids 中的其中一个。") self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = API_video["operate"]["favorite"] data = { "rid": self.get_aid(), "type": 2, "add_media_ids": ",".join(map(lambda x: str(x), add_media_ids)), "del_media_ids": ",".join(map(lambda x: str(x), del_media_ids)), } return await Api(**api, credential=self.credential).update_data(**data).result async def get_danmaku_xml(self): """ 获取弹幕(xml 源) Returns: str: xml 文件源 """ url = f"https://comment.bilibili.com/{self.get_cid()}.xml" sess = get_session() config: dict[str, Any] = {"url": url} # 代理 if settings.proxy: config["proxies"] = {"all://", settings.proxy} resp = await sess.get(**config) return resp.content.decode("utf-8")