""" bilibili_api.video 视频相关操作 注意,同时存在 page_index 和 cid 的参数,两者至少提供一个。 """ import re import os import json import struct import asyncio import logging import datetime from enum import Enum from inspect import isfunction from functools import cmp_to_key from dataclasses import dataclass from typing import Any, List, Union, Optional import httpx import aiohttp from . import settings from .utils.aid_bvid_transformer import bvid2aid, aid2bvid from .utils.utils import get_api from .utils.AsyncEvent import AsyncEvent from .utils.credential import Credential from .utils.BytesReader import BytesReader from .utils.danmaku import Danmaku, SpecialDanmaku from .utils.network import get_aiohttp_session, Api, get_session from .exceptions import ( ArgsException, NetworkException, ResponseException, DanmakuClosedException, ) API = get_api("video") def get_cid_info_sync(cid: int): """ 获取 cid 信息 (对应的视频,具体分 P 序号,up 等) Returns: dict: 调用 https://hd.biliplus.com 的 API 返回的结果 """ api = API["info"]["cid_info"] params = {"cid": cid} return Api(**api).update_params(**params).result_sync async def get_cid_info(cid: int): """ 获取 cid 信息 (对应的视频,具体分 P 序号,up 等) Returns: dict: 调用 https://hd.biliplus.com 的 API 返回的结果 """ api = API["info"]["cid_info"] params = {"cid": cid} return await Api(**api).update_params(**params).result class DanmakuOperatorType(Enum): """ 弹幕操作枚举 + DELETE - 删除弹幕 + PROTECT - 保护弹幕 + UNPROTECT - 取消保护弹幕 """ DELETE = 1 PROTECT = 2 UNPROTECT = 3 class VideoAppealReasonType: """ 视频投诉原因枚举 注意: 每一项均为函数,部分项有参数,没有参数的函数无需调用函数,直接传入即可,有参数的函数请调用结果之后传入。 - ILLEGAL(): 违法违禁 - PRON(): 色情 - VULGAR(): 低俗 - GAMBLED_SCAMS(): 赌博诈骗 - VIOLENT(): 血腥暴力 - PERSONAL_ATTACK(): 人身攻击 - PLAGIARISM(bvid: str): 与站内其他视频撞车 - BAD_FOR_YOUNGS(): 青少年不良信息 - CLICKBAIT(): 不良封面/标题 - POLITICAL_RUMORS(): 涉政谣言 - SOCIAL_RUMORS(): 涉社会事件谣言 - COVID_RUMORS(): 疫情谣言 - UNREAL_EVENT(): 虚假不实消息 - OTHER(): 有其他问题 - LEAD_WAR(): 引战 - CANNOT_CHARGE(): 不能参加充电 - UNREAL_COPYRIGHT(source: str): 转载/自制类型错误 """ ILLEGAL = lambda: 2 PRON = lambda: 3 VULGAR = lambda: 4 GAMBLED_SCAMS = lambda: 5 VIOLENT = lambda: 6 PERSONAL_ATTACK = lambda: 7 BAD_FOR_YOUNGS = lambda: 10000 CLICKBAIT = lambda: 10013 POLITICAL_RUMORS = lambda: 10014 SOCIAL_RUMORS = lambda: 10015 COVID_RUMORS = lambda: 10016 UNREAL_EVENT = lambda: 10017 OTHER = lambda: 1 LEAD_WAR = lambda: 9 CANNOT_CHARGE = lambda: 10 @staticmethod def PLAGIARISM(bvid: str): """ 与站内其他视频撞车 Args: bvid (str): 撞车对象 """ return {"tid": 8, "撞车对象": bvid} @staticmethod def UNREAL_COPYRIGHT(source: str): """ 转载/自制类型错误 Args: source (str): 原创视频出处 """ return {"tid": 52, "出处": source} class Video: """ 视频类,各种对视频的操作均在里面。 """ def __init__( self, bvid: Union[None, str] = None, aid: Union[None, int] = None, credential: Union[None, Credential] = None, ): """ Args: bvid (str | None, optional) : BV 号. bvid 和 aid 必须提供其中之一。 aid (int | None, optional) : AV 号. bvid 和 aid 必须提供其中之一。 credential (Credential | None, optional): Credential 类. Defaults to None. """ # ID 检查 if bvid is not None: self.set_bvid(bvid) elif aid is not None: self.set_aid(aid) else: # 未提供任一 ID raise ArgsException("请至少提供 bvid 和 aid 中的其中一个参数。") # 未提供 credential 时初始化该类 self.credential: Credential = Credential() if credential is None else credential # 用于存储视频信息,避免接口依赖视频信息时重复调用 self.__info: Union[dict, None] = None def set_bvid(self, bvid: str) -> None: """ 设置 bvid。 Args: bvid (str): 要设置的 bvid。 """ # 检查 bvid 是否有效 if not re.search("^BV[a-zA-Z0-9]{10}$", bvid): raise ArgsException("bvid 提供错误,必须是以 BV 开头的纯字母和数字组成的 12 位字符串(大小写敏感)。") self.__bvid = bvid self.__aid = bvid2aid(bvid) def get_bvid(self) -> str: """ 获取 BVID。 Returns: str: BVID。 """ return self.__bvid def set_aid(self, aid: int) -> None: """ 设置 aid。 Args: aid (int): AV 号。 """ if aid <= 0: raise ArgsException("aid 不能小于或等于 0。") self.__aid = aid self.__bvid = aid2bvid(aid) def get_aid(self) -> int: """ 获取 AID。 Returns: int: aid。 """ return self.__aid async def get_info(self) -> dict: """ 获取视频信息。 Returns: dict: 调用 API 返回的结果。 """ api = API["info"]["info"] params = {"bvid": self.get_bvid(), "aid": self.get_aid()} resp = ( await Api(**api, credential=self.credential).update_params(**params).result ) # 存入 self.__info 中以备后续调用 self.__info = resp return resp async def get_detail(self) -> dict: """ 获取视频详细信息 Returns: dict: 调用 API 返回的结果。 """ api = API["info"]["detail"] params = { "bvid": self.get_bvid(), "aid": self.get_aid(), "need_operation_card": 0, "need_elec": 0, } return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def __get_info_cached(self) -> dict: """ 获取视频信息,如果已获取过则使用之前获取的信息,没有则重新获取。 Returns: dict: 调用 API 返回的结果。 """ if self.__info is None: return await self.get_info() return self.__info # get_stat 403/404 https://github.com/SocialSisterYi/bilibili-API-collect/issues/797 等恢复 # async def get_stat(self) -> dict: # """ # 获取视频统计数据(播放量,点赞数等)。 # Returns: # dict: 调用 API 返回的结果。 # """ # api = API["info"]["stat"] # params = {"bvid": self.get_bvid(), "aid": self.get_aid()} # return await Api(**api, credential=self.credential).update_params(**params).result async def get_up_mid(self) -> int: """ 获取视频 up 主的 mid。 Returns: int: up_mid """ info = await self.__get_info_cached() return info["owner"]["mid"] async def get_tags( self, page_index: Union[int, None] = 0, cid: Union[int, None] = None ) -> List[dict]: """ 获取视频标签。 Args: page_index (int | None): 分 P 序号. Defaults to 0. cid (int | None): 分 P 编码. Defaults to None. Returns: List[dict]: 调用 API 返回的结果。 """ if cid == None: if page_index == None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.get_cid(page_index=page_index) api = API["info"]["tags"] params = {"bvid": self.get_bvid(), "aid": self.get_aid(), "cid": cid} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_chargers(self) -> dict: """ 获取视频充电用户。 Returns: dict: 调用 API 返回的结果。 """ info = await self.__get_info_cached() mid = info["owner"]["mid"] api = API["info"]["chargers"] params = {"aid": self.get_aid(), "bvid": self.get_bvid(), "mid": mid} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_pages(self) -> List[dict]: """ 获取分 P 信息。 Returns: dict: 调用 API 返回的结果。 """ api = API["info"]["pages"] params = {"aid": self.get_aid(), "bvid": self.get_bvid()} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def __get_cid_by_index(self, page_index: int) -> int: """ 根据分 p 号获取 cid。 Args: page_index (int): 分 P 号,从 0 开始。 Returns: int: cid 分 P 的唯一 ID。 """ if page_index < 0: raise ArgsException("分 p 号必须大于或等于 0。") info = await self.__get_info_cached() pages = info["pages"] if len(pages) <= page_index: raise ArgsException("不存在该分 p。") page = pages[page_index] cid = page["cid"] return cid async def get_video_snapshot( self, cid: Union[int, None] = None, json_index: bool = False, pvideo: bool = True, ) -> dict: """ 获取视频快照(视频各个时间段的截图拼图) Args: cid(int): 分 P CID(可选) json_index(bool): json 数组截取时间表 True 为需要,False 不需要 pvideo(bool): 是否只获取预览 Returns: dict: 调用 API 返回的结果,数据中 Url 没有 http 头 """ params: dict[str, Any] = {"aid": self.get_aid()} if pvideo: api = API["info"]["video_snapshot_pvideo"] else: params["bvid"] = self.get_bvid() if json_index: params["index"] = 1 if cid: params["cid"] = cid api = API["info"]["video_snapshot"] return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_cid(self, page_index: int) -> int: """ 获取稿件 cid Args: page_index(int): 分 P Returns: int: cid """ return await self.__get_cid_by_index(page_index) async def get_download_url( self, page_index: Union[int, None] = None, cid: Union[int, None] = None, html5: bool = False, ) -> dict: """ 获取视频下载信息。 返回结果可以传入 `VideoDownloadURLDataDetecter` 进行解析。 page_index 和 cid 至少提供其中一个,其中 cid 优先级最高 Args: page_index (int | None, optional) : 分 P 号,从 0 开始。Defaults to None cid (int | None, optional) : 分 P 的 ID。Defaults to None html5 (bool, optional): 是否以 html5 平台访问,这样子能直接在网页中播放,但是链接少。 Returns: dict: 调用 API 返回的结果。 """ if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) api = API["info"]["playurl"] if html5: params = { "avid": self.get_aid(), "cid": cid, "qn": "127", "otype": "json", "fnval": 4048, "fourk": 1, "platform": "html5", "high_quality": "1", } else: params = { "avid": self.get_aid(), "cid": cid, "qn": "127", "otype": "json", "fnval": 4048, "fourk": 1, } result = ( await Api(**api, credential=self.credential).update_params(**params).result ) result.update({"is_html5": True} if html5 else {}) return result async def get_related(self) -> dict: """ 获取相关视频信息。 Returns: dict: 调用 API 返回的结果。 """ api = API["info"]["related"] params = {"aid": self.get_aid(), "bvid": self.get_bvid()} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_relation(self) -> dict: """ 获取用户与视频的关系 Returns: dict: 调用 API 返回的结果。 """ self.credential.raise_for_no_sessdata() api = API["info"]["relation"] params = {"aid": self.get_aid(), "bvid": self.get_bvid()} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def has_liked(self) -> bool: """ 视频是否点赞过。 Returns: bool: 视频是否点赞过。 """ self.credential.raise_for_no_sessdata() api = API["info"]["has_liked"] params = {"bvid": self.get_bvid(), "aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_pay_coins(self) -> int: """ 获取视频已投币数量。 Returns: int: 视频已投币数量。 """ self.credential.raise_for_no_sessdata() api = API["info"]["get_pay_coins"] params = {"bvid": self.get_bvid(), "aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result )["multiply"] async def has_favoured(self) -> bool: """ 是否已收藏。 Returns: bool: 视频是否已收藏。 """ self.credential.raise_for_no_sessdata() api = API["info"]["has_favoured"] params = {"bvid": self.get_bvid(), "aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result )["favoured"] async def is_forbid_note(self) -> bool: """ 是否禁止笔记。 Returns: bool: 是否禁止笔记。 """ api = API["info"]["is_forbid"] params = {"aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result )["forbid_note_entrance"] async def get_private_notes_list(self) -> list: """ 获取稿件私有笔记列表。 Returns: list: note_Ids。 """ self.credential.raise_for_no_sessdata() api = API["info"]["private_notes"] params = {"oid": self.get_aid(), "oid_type": 0} return ( await Api(**api, credential=self.credential).update_params(**params).result )["noteIds"] async def get_public_notes_list(self, pn: int, ps: int) -> dict: """ 获取稿件公开笔记列表。 Args: pn (int): 页码 ps (int): 每页项数 Returns: dict: 调用 API 返回的结果。 """ api = API["info"]["public_notes"] params = {"oid": self.get_aid(), "oid_type": 0, "pn": pn, "ps": ps} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_ai_conclusion(self, cid: Optional[int] = None, page_index: Optional[int] = None, up_mid: Optional[int] = None) -> dict: """ 获取稿件 AI 总结结果。 cid 和 page_index 至少提供其中一个,其中 cid 优先级最高 Args: cid (Optional, int): 分 P 的 cid。 page_index (Optional, int): 分 P 号,从 0 开始。 up_mid (Optional, int): up 主的 mid。 Returns: dict: 调用 API 返回的结果。 """ if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) api = API["info"]["ai_conclusion"] params = {"aid": self.get_aid(), "bvid": self.get_bvid(), "cid": cid, "up_mid": await self.get_up_mid() if up_mid is None else up_mid} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def get_danmaku_view( self, page_index: Union[int, None] = None, cid: Union[int, None] = None ) -> dict: """ 获取弹幕设置、特殊弹幕、弹幕数量、弹幕分段等信息。 Args: page_index (int, optional): 分 P 号,从 0 开始。Defaults to None cid (int, optional): 分 P 的 ID。Defaults to None Returns: dict: 调用 API 返回的结果。 """ if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) session = get_session() api = API["danmaku"]["view"] config = {} config["url"] = api["url"] config["params"] = {"type": 1, "oid": cid, "pid": self.get_aid()} config["cookies"] = self.credential.get_cookies() config["headers"] = { "Referer": "https://www.bilibili.com", "User-Agent": "Mozilla/5.0", } try: resp = await session.get(**config) except Exception as e: raise NetworkException(-1, str(e)) resp_data = resp.read() json_data = {} reader = BytesReader(resp_data) # 解析二进制数据流 def read_dm_seg(stream: bytes): reader_ = BytesReader(stream) data = {} while not reader_.has_end(): t = reader_.varint() >> 3 if t == 1: data["page_size"] = reader_.varint() elif t == 2: data["total"] = reader_.varint() else: continue return data def read_flag(stream: bytes): reader_ = BytesReader(stream) data = {} while not reader_.has_end(): t = reader_.varint() >> 3 if t == 1: data["rec_flag"] = reader_.varint() elif t == 2: data["rec_text"] = reader_.string() elif t == 3: data["rec_switch"] = reader_.varint() else: continue return data def read_command_danmakus(stream: bytes): reader_ = BytesReader(stream) data = {} while not reader_.has_end(): t = reader_.varint() >> 3 if t == 1: data["id"] = reader_.varint() elif t == 2: data["oid"] = reader_.varint() elif t == 3: data["mid"] = reader_.varint() elif t == 4: data["commend"] = reader_.string() elif t == 5: data["content"] = reader_.string() elif t == 6: data["progress"] = reader_.varint() elif t == 7: data["ctime"] = reader_.string() elif t == 8: data["mtime"] = reader_.string() elif t == 9: data["extra"] = json.loads(reader_.string()) elif t == 10: data["id_str"] = reader_.string() else: continue return data def read_settings(stream: bytes): reader_ = BytesReader(stream) data = {} while not reader_.has_end(): t = reader_.varint() >> 3 if t == 1: data["dm_switch"] = reader_.bool() elif t == 2: data["ai_switch"] = reader_.bool() elif t == 3: data["ai_level"] = reader_.varint() elif t == 4: data["enable_top"] = reader_.bool() elif t == 5: data["enable_scroll"] = reader_.bool() elif t == 6: data["enable_bottom"] = reader_.bool() elif t == 7: data["enable_color"] = reader_.bool() elif t == 8: data["enable_special"] = reader_.bool() elif t == 9: data["prevent_shade"] = reader_.bool() elif t == 10: data["dmask"] = reader_.bool() elif t == 11: data["opacity"] = reader_.float(True) elif t == 12: data["dm_area"] = reader_.varint() elif t == 13: data["speed_plus"] = reader_.float(True) elif t == 14: data["font_size"] = reader_.float(True) elif t == 15: data["screen_sync"] = reader_.bool() elif t == 16: data["speed_sync"] = reader_.bool() elif t == 17: data["font_family"] = reader_.string() elif t == 18: data["bold"] = reader_.bool() elif t == 19: data["font_border"] = reader_.varint() elif t == 20: data["draw_type"] = reader_.string() else: continue return data def read_image_danmakus(string: bytes): image_list = [] reader_ = BytesReader(string) while not reader_.has_end(): type_ = reader_.varint() >> 3 if type_ == 1: details_dict: dict[Any, Any] = {"texts": []} img_details = reader_.bytes_string() reader_details = BytesReader(img_details) while not reader_details.has_end(): type_details = reader_details.varint() >> 3 if type_details == 1: details_dict["texts"].append(reader_details.string()) elif type_details == 2: details_dict["image"] = reader_details.string() elif type_details == 3: id_string = reader_details.bytes_string() id_reader = BytesReader(id_string) while not id_reader.has_end(): type_id = id_reader.varint() >> 3 if type_id == 2: details_dict["id"] = id_reader.varint() else: raise ResponseException("解析响应数据错误") image_list.append(details_dict) else: raise ResponseException("解析响应数据错误") return image_list while not reader.has_end(): type_ = reader.varint() >> 3 if type_ == 1: json_data["state"] = reader.varint() elif type_ == 2: json_data["text"] = reader.string() elif type_ == 3: json_data["text_side"] = reader.string() elif type_ == 4: json_data["dm_seg"] = read_dm_seg(reader.bytes_string()) elif type_ == 5: json_data["flag"] = read_flag(reader.bytes_string()) elif type_ == 6: if "special_dms" not in json_data: json_data["special_dms"] = [] json_data["special_dms"].append(reader.string()) elif type_ == 7: json_data["check_box"] = reader.bool() elif type_ == 8: json_data["count"] = reader.varint() elif type_ == 9: if "command_dms" not in json_data: json_data["command_dms"] = [] json_data["command_dms"].append( read_command_danmakus(reader.bytes_string()) ) elif type_ == 10: json_data["dm_setting"] = read_settings(reader.bytes_string()) elif type_ == 12: json_data["image_dms"] = read_image_danmakus(reader.bytes_string()) else: continue return json_data async def get_danmakus( self, page_index: int = 0, date: Union[datetime.date, None] = None, cid: Union[int, None] = None, from_seg: Union[int, None] = None, to_seg: Union[int, None] = None, ) -> List[Danmaku]: """ 获取弹幕。 Args: page_index (int, optional): 分 P 号,从 0 开始。Defaults to None date (datetime.Date | None, optional): 指定日期后为获取历史弹幕,精确到年月日。Defaults to None. cid (int | None, optional): 分 P 的 ID。Defaults to None from_seg (int, optional): 从第几段开始(0 开始编号,None 为从第一段开始,一段 6 分钟). Defaults to None. to_seg (int, optional): 到第几段结束(0 开始编号,None 为到最后一段,包含编号的段,一段 6 分钟). Defaults to None. 注意: - 1. 段数可以使用 `get_danmaku_view()["dm_seg"]["total"]` 查询。 - 2. `from_seg` 和 `to_seg` 仅对 `date == None` 的时候有效果。 - 3. 例:取前 `12` 分钟的弹幕:`from_seg=0, to_seg=1` Returns: List[Danmaku]: Danmaku 类的列表。 """ if date is not None: self.credential.raise_for_no_sessdata() if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) session = get_session() aid = self.get_aid() params: dict[str, Any] = {"oid": cid, "type": 1, "pid": aid} if date is not None: # 获取历史弹幕 api = API["danmaku"]["get_history_danmaku"] params["date"] = date.strftime("%Y-%m-%d") params["type"] = 1 from_seg = to_seg = 0 else: api = API["danmaku"]["get_danmaku"] if from_seg == None: from_seg = 0 if to_seg == None: view = await self.get_danmaku_view(cid=cid) to_seg = view["dm_seg"]["total"] - 1 danmakus = [] for seg in range(from_seg, to_seg + 1): if date is None: # 仅当获取当前弹幕时需要该参数 params["segment_index"] = seg + 1 config = {} config["url"] = api["url"] config["params"] = params config["headers"] = { "Referer": "https://www.bilibili.com", "User-Agent": "Mozilla/5.0", } config["cookies"] = self.credential.get_cookies() try: req = await session.get(**config) except Exception as e: raise NetworkException(-1, str(e)) if "content-type" not in req.headers.keys(): break else: content_type = req.headers["content-type"] if content_type != "application/octet-stream": raise ResponseException("返回数据类型错误:") # 解析二进制流数据 data = req.read() if data == b"\x10\x01": # 视频弹幕被关闭 raise DanmakuClosedException() reader = BytesReader(data) while not reader.has_end(): type_ = reader.varint() >> 3 if type_ != 1: if type_ == 4: reader.bytes_string() # 什么鬼?我用 protoc 解析出乱码! elif type_ == 5: # 大会员专属颜色 reader.varint() reader.varint() reader.varint() reader.bytes_string() elif type_ == 13: # ??? continue else: raise ResponseException("解析响应数据错误") dm = Danmaku("") dm_pack_data = reader.bytes_string() dm_reader = BytesReader(dm_pack_data) while not dm_reader.has_end(): data_type = dm_reader.varint() >> 3 if data_type == 1: dm.id_ = dm_reader.varint() elif data_type == 2: dm.dm_time = dm_reader.varint() / 1000 elif data_type == 3: dm.mode = dm_reader.varint() elif data_type == 4: dm.font_size = dm_reader.varint() elif data_type == 5: color = dm_reader.varint() if color != 60001: color = hex(color)[2:] else: color = "special" dm.color = color elif data_type == 6: dm.crc32_id = dm_reader.string() elif data_type == 7: dm.text = dm_reader.string() elif data_type == 8: dm.send_time = dm_reader.varint() elif data_type == 9: dm.weight = dm_reader.varint() elif data_type == 10: dm.action = str(dm_reader.string()) elif data_type == 11: dm.pool = dm_reader.varint() elif data_type == 12: dm.id_str = dm_reader.string() elif data_type == 13: dm.attr = dm_reader.varint() elif data_type == 14: dm.uid = dm_reader.varint() elif data_type == 15: dm_reader.varint() elif data_type == 20: dm_reader.bytes_string() elif data_type == 21: dm_reader.bytes_string() elif data_type == 22: dm_reader.bytes_string() elif data_type == 25: dm_reader.varint() elif data_type == 26: dm_reader.varint() else: break danmakus.append(dm) return danmakus async def get_special_dms( self, page_index: int = 0, cid: Union[int, None] = None ) -> List[SpecialDanmaku]: """ 获取特殊弹幕 Args: page_index (int, optional) : 分 P 号. Defaults to 0. cid (int | None, optional): 分 P id. Defaults to None. Returns: List[SpecialDanmaku]: 调用接口解析后的结果 """ if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) view = await self.get_danmaku_view(cid=cid) special_dms = view["special_dms"][0] if settings.proxy != "": sess = httpx.AsyncClient(proxies={"all://": settings.proxy}) else: sess = httpx.AsyncClient() dm_content = await sess.get(special_dms, cookies=self.credential.get_cookies()) dm_content.raise_for_status() reader = BytesReader(dm_content.content) dms: List[SpecialDanmaku] = [] while not reader.has_end(): spec_dm = SpecialDanmaku("") type_ = reader.varint() >> 3 if type_ == 1: reader_ = BytesReader(reader.bytes_string()) while not reader_.has_end(): type__ = reader_.varint() >> 3 if type__ == 1: spec_dm.id_ = reader_.varint() elif type__ == 3: spec_dm.mode = reader_.varint() elif type__ == 4: reader_.varint() elif type__ == 5: reader_.varint() elif type__ == 6: reader_.string() elif type__ == 7: spec_dm.content = reader_.string() elif type__ == 8: reader_.varint() elif type__ == 11: spec_dm.pool = reader_.varint() elif type__ == 12: spec_dm.id_str = reader_.string() else: continue else: continue dms.append(spec_dm) return dms async def get_history_danmaku_index( self, page_index: Union[int, None] = None, date: Union[datetime.date, None] = None, cid: Union[int, None] = None, ) -> Union[None, List[str]]: """ 获取特定月份存在历史弹幕的日期。 Args: page_index (int | None, optional): 分 P 号,从 0 开始。Defaults to None date (datetime.date | None): 精确到年月. Defaults to None。 cid (int | None, optional): 分 P 的 ID。Defaults to None Returns: None | List[str]: 调用 API 返回的结果。不存在时为 None。 """ if date is None: raise ArgsException("请提供 date 参数") self.credential.raise_for_no_sessdata() if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) api = API["danmaku"]["get_history_danmaku_index"] params = {"oid": cid, "month": date.strftime("%Y-%m"), "type": 1} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def has_liked_danmakus( self, page_index: Union[int, None] = None, ids: Union[List[int], None] = None, cid: Union[int, None] = None, ) -> dict: """ 是否已点赞弹幕。 Args: page_index (int | None, optional): 分 P 号,从 0 开始。Defaults to None ids (List[int] | None): 要查询的弹幕 ID 列表。 cid (int | None, optional): 分 P 的 ID。Defaults to None Returns: dict: 调用 API 返回的结果。 """ if ids is None or len(ids) == 0: raise ArgsException("请提供 ids 参数并至少有一个元素") self.credential.raise_for_no_sessdata() if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) api = API["danmaku"]["has_liked_danmaku"] params = {"oid": cid, "ids": ",".join(ids)} # type: ignore return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def send_danmaku( self, page_index: Union[int, None] = None, danmaku: Union[Danmaku, None] = None, cid: Union[int, None] = None, ) -> dict: """ 发送弹幕。 Args: page_index (int | None, optional): 分 P 号,从 0 开始。Defaults to None danmaku (Danmaku | None) : Danmaku 类。 cid (int | None, optional): 分 P 的 ID。Defaults to None Returns: dict: 调用 API 返回的结果。 """ if danmaku is None: raise ArgsException("请提供 danmaku 参数") self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) api = API["danmaku"]["send_danmaku"] if danmaku.is_sub: pool = 1 else: pool = 0 data = { "type": 1, "oid": cid, "msg": danmaku.text, "aid": self.get_aid(), "bvid": self.get_bvid(), "progress": int(danmaku.dm_time * 1000), "color": int(danmaku.color, 16), "fontsize": danmaku.font_size, "pool": pool, "mode": danmaku.mode, "plat": 1, } return await Api(**api, credential=self.credential).update_data(**data).result async def get_danmaku_xml( self, page_index: Union[int, None] = None, cid: Union[int, None] = None ) -> str: """ 获取所有弹幕的 xml 源文件(非装填) Args: page_index (int, optional) : 分 P 序号. Defaults to 0. cid (int | None, optional): cid. Defaults to None. Return: xml 文件源 """ if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) url = f"https://comment.bilibili.com/{cid}.xml" sess = get_session() config: dict[Any, Any] = {"url": url} # 代理 if settings.proxy: config["proxies"] = {"all://", settings.proxy} resp = await sess.get(**config) return resp.content.decode("utf-8") async def like_danmaku( self, page_index: Union[int, None] = None, dmid: Union[int, None] = None, status: Union[bool, None] = True, cid: Union[int, None] = None, ) -> dict: """ 点赞弹幕。 Args: page_index (int | None, optional) : 分 P 号,从 0 开始。Defaults to None dmid (int | None) : 弹幕 ID。 status (bool | None, optional): 点赞状态。Defaults to True cid (int | None, optional) : 分 P 的 ID。Defaults to None Returns: dict: 调用 API 返回的结果。 """ if dmid is None: raise ArgsException("请提供 dmid 参数") self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) api = API["danmaku"]["like_danmaku"] data = { "dmid": dmid, "oid": cid, "op": 1 if status else 2, "platform": "web_player", } return await Api(**api, credential=self.credential).update_data(**data).result async def get_online(self, cid: Optional[int] = None, page_index: Optional[int] = 0) -> dict: """ 获取实时在线人数 Returns: dict: 调用 API 返回的结果。 """ api = API["info"]["online"] params = {"aid": self.get_aid(), "bvid": self.get_bvid(), "cid": cid if cid is not None else await self.get_cid(page_index=page_index)} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def operate_danmaku( self, page_index: Union[int, None] = None, dmids: Union[List[int], None] = None, cid: Union[int, None] = None, type_: Union[DanmakuOperatorType, None] = None, ) -> dict: """ 操作弹幕 Args: page_index (int | None, optional) : 分 P 号,从 0 开始。Defaults to None dmids (List[int] | None) : 弹幕 ID 列表。 cid (int | None, optional) : 分 P 的 ID。Defaults to None type_ (DanmakuOperatorType | None): 操作类型 Returns: dict: 调用 API 返回的结果。 """ if dmids is None or len(dmids) == 0: raise ArgsException("请提供 dmid 参数") if type_ is None: raise ArgsException("请提供 type_ 参数") self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) api = API["danmaku"]["edit_danmaku"] data = { "type": 1, "dmids": ",".join(map(lambda x: str(x), dmids)), "oid": cid, "state": type_.value, } return await Api(**api, credential=self.credential).update_data(**data).result async def like(self, status: bool = True) -> dict: """ 点赞视频。 Args: status (bool, optional): 点赞状态。Defaults to True. Returns: dict: 调用 API 返回的结果。 """ self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = API["operate"]["like"] data = {"aid": self.get_aid(), "like": 1 if status else 2} return await Api(**api, credential=self.credential).update_data(**data).result async def pay_coin(self, num: int = 1, like: bool = False) -> dict: """ 投币。 Args: num (int, optional) : 硬币数量,为 1 ~ 2 个。Defaults to 1. like (bool, optional): 是否同时点赞。Defaults to False. Returns: dict: 调用 API 返回的结果。 """ self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() if num not in (1, 2): raise ArgsException("投币数量只能是 1 ~ 2 个。") api = API["operate"]["coin"] data = { "aid": self.get_aid(), "bvid": self.get_bvid(), "multiply": num, "like": 1 if like else 0, } return await Api(**api, credential=self.credential).update_data(**data).result async def share(self) -> int: """ 分享视频 Returns: int: 当前分享数 """ api = API["operate"]["share"] data = { "bvid": self.get_bvid(), "aid": self.get_aid(), "csrf": self.credential.bili_jct, } return await Api(**api, credential=self.credential).update_data(**data).result async def triple(self) -> dict: """ 给阿婆主送上一键三连 Returns: dict: 调用 API 返回的结果 """ api = API["operate"]["yjsl"] data = {"bvid": self.get_bvid(), "aid": self.get_aid()} return await Api(**api, credential=self.credential).update_data(**data).result async def add_tag(self, name: str) -> dict: """ 添加标签。 Args: name (str): 标签名字。 Returns: dict: 调用 API 返回的结果。会返回标签 ID。 """ self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = API["operate"]["add_tag"] data = {"aid": self.get_aid(), "bvid": self.get_bvid(), "tag_name": name} return await Api(**api, credential=self.credential).update_data(**data).result async def delete_tag(self, tag_id: int) -> dict: """ 删除标签。 Args: tag_id (int): 标签 ID。 Returns: dict: 调用 API 返回的结果。 """ self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = API["operate"]["del_tag"] data = {"tag_id": tag_id, "aid": self.get_aid(), "bvid": self.get_bvid()} return await Api(**api, credential=self.credential).update_data(**data).result async def appeal(self, reason: Any, detail: str): """ 投诉稿件 Args: reason (Any): 投诉类型。传入 VideoAppealReasonType 中的项目即可。 detail (str): 详情信息。 Returns: dict: 调用 API 返回的结果 """ api = API["operate"]["appeal"] data = {"aid": self.get_aid(), "desc": detail} if isfunction(reason): reason = reason() if isinstance(reason, int): reason = {"tid": reason} data.update(reason) # XXX: 暂不支持上传附件 return await Api(**api, credential=self.credential).update_data(**data).result async def set_favorite( self, add_media_ids: List[int] = [], del_media_ids: List[int] = [] ) -> dict: """ 设置视频收藏状况。 Args: add_media_ids (List[int], optional): 要添加到的收藏夹 ID. Defaults to []. del_media_ids (List[int], optional): 要移出的收藏夹 ID. Defaults to []. Returns: dict: 调用 API 返回结果。 """ if len(add_media_ids) + len(del_media_ids) == 0: raise ArgsException("对收藏夹无修改。请至少提供 add_media_ids 和 del_media_ids 中的其中一个。") self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = API["operate"]["favorite"] data = { "rid": self.get_aid(), "type": 2, "add_media_ids": ",".join(map(lambda x: str(x), add_media_ids)), "del_media_ids": ",".join(map(lambda x: str(x), del_media_ids)), } return await Api(**api, credential=self.credential).update_data(**data).result async def get_subtitle( self, cid: Union[int, None] = None, ) -> dict: """ 获取视频上一次播放的记录,字幕和地区信息。需要分集的 cid, 返回数据中含有json字幕的链接 Args: cid (int | None): 分 P ID,从视频信息中获取 Returns: 调用 API 返回的结果 """ if cid is None: raise ArgsException("需要 cid") return (await self.get_player_info(cid=cid)).get("subtitle") async def get_player_info( self, cid: Union[int, None] = None, epid: Union[int, None] = None, ) -> dict: """ 获取字幕信息 Args: cid (int | None): 分 P ID,从视频信息中获取 epid (int | None): 番剧分集 ID,从番剧信息中获取 Returns: 调用 API 返回的结果 """ if cid is None: raise ArgsException("需要 cid") api = API["info"]["get_player_info"] params = { "aid": self.get_aid(), "cid": cid, } if epid: params["epid"] = epid return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def submit_subtitle( self, lan: str, data: dict, submit: bool, sign: bool, page_index: Union[int, None] = None, cid: Union[int, None] = None, ) -> dict: """ 上传字幕 字幕数据 data 参考: ```json { "font_size": "float: 字体大小,默认 0.4", "font_color": "str: 字体颜色,默认 \"#FFFFFF\"", "background_alpha": "float: 背景不透明度,默认 0.5", "background_color": "str: 背景颜色,默认 \"#9C27B0\"", "Stroke": "str: 描边,目前作用未知,默认为 \"none\"", "body": [ { "from": "int: 字幕开始时间(秒)", "to": "int: 字幕结束时间(秒)", "location": "int: 字幕位置,默认为 2", "content": "str: 字幕内容" } ] } ``` Args: lan (str) : 字幕语言代码,参考 https://s1.hdslb.com/bfs/subtitle/subtitle_lan.json data (dict) : 字幕数据 submit (bool) : 是否提交,不提交为草稿 sign (bool) : 是否署名 page_index (int | None, optional): 分 P 索引. Defaults to None. cid (int | None, optional): 分 P id. Defaults to None. Returns: dict: API 调用返回结果 """ if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = API["operate"]["submit_subtitle"] # lan check,应该是这里面的语言代码 with open( os.path.join(os.path.dirname(__file__), "data/subtitle_lan.json"), encoding="utf-8", ) as f: subtitle_lans = json.load(f) for lan in subtitle_lans: if lan["lan"] == lan: break else: raise ArgsException("lan 参数错误,请参见 https://s1.hdslb.com/bfs/subtitle/subtitle_lan.json") payload = { "type": 1, "oid": cid, "lan": lan, "data": json.dumps(data), "submit": submit, "sign": sign, "bvid": self.get_bvid(), } return await Api(**api, credential=self.credential).update_data(**payload).result async def get_danmaku_snapshot(self) -> dict: """ 获取弹幕快照 Returns: 调用 API 返回的结果 """ api = API["danmaku"]["snapshot"] params = {"aid": self.get_aid()} return ( await Api(**api, credential=self.credential).update_params(**params).result ) async def recall_danmaku( self, page_index: Union[int, None] = None, dmid: int = 0, cid: Union[int, None] = None, ) -> dict: """ 撤回弹幕 Args: page_index(int | None, optional): 分 P 号 dmid(int) : 弹幕 id cid(int | None, optional) : 分 P 编码 Returns: 调用 API 返回的结果 """ if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = API["danmaku"]["recall"] data = {"dmid": dmid, "cid": cid} return await Api(**api, credential=self.credential).update_data(**data).result async def get_pbp( self, page_index: Union[int, None] = None, cid: Union[int, None] = None ) -> dict: """ 获取高能进度条 Args: page_index(int | None): 分 P 号 cid(int | None) : 分 P 编码 Returns: 调用 API 返回的结果 """ if cid is None: if page_index is None: raise ArgsException("page_index 和 cid 至少提供一个。") cid = await self.__get_cid_by_index(page_index) api = API["info"]["pbp"] params = {"cid": cid} session = get_session() return json.loads( ( await session.get( api["url"], params=params, cookies=self.credential.get_cookies() ) ).text ) async def add_to_toview(self) -> dict: """ 添加视频至稍后再看列表 Returns: 调用 API 返回的结果 """ self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = get_api("toview")["operate"]["add"] datas = { "aid": self.get_aid(), } return await Api(**api, credential=self.credential).update_data(**datas).result async def delete_from_toview(self) -> dict: """ 从稍后再看列表删除视频 Returns: 调用 API 返回的结果 """ self.credential.raise_for_no_sessdata() self.credential.raise_for_no_bili_jct() api = get_api("toview")["operate"]["del"] datas = {"viewed": "false", "aid": self.get_aid()} return await Api(**api, credential=self.credential).update_data(**datas).result class VideoOnlineMonitor(AsyncEvent): """ 视频在线人数实时监测。 示例代码: ```python import asyncio from bilibili_api import video # 实例化 r = video.VideoOnlineMonitor("BV1Bf4y1Q7QP") # 装饰器方法注册事件监听器 @r.on("ONLINE") async def handler(data): print(data) # 函数方法注册事件监听器 async def handler2(data): print(data) r.add_event_listener("ONLINE", handler2) asyncio.get_event_loop().run_until_complete(r.connect()) ``` Extends: AsyncEvent Events: ONLINE: 在线人数更新。 CallbackData: dict。 DANMAKU: 收到实时弹幕。 CallbackData: Danmaku。 DISCONNECTED: 正常断开连接。 CallbackData: None。 ERROR: 发生错误。 CallbackData: aiohttp.ClientWebSocketResponse。 CONNECTED: 成功连接。 CallbackData: None。 """ class Datapack(Enum): """ 数据包类型枚举。 + CLIENT_VERIFY : 客户端发送验证信息。 + SERVER_VERIFY : 服务端响应验证信息。 + CLIENT_HEARTBEAT: 客户端发送心跳包。 + SERVER_HEARTBEAT: 服务端响应心跳包。 + DANMAKU : 实时弹幕更新。 """ CLIENT_VERIFY = 0x7 SERVER_VERIFY = 0x8 CLIENT_HEARTBEAT = 0x2 SERVER_HEARTBEAT = 0x3 DANMAKU = 0x3E8 def __init__( self, bvid: Union[str, None] = None, aid: Union[int, None] = None, page_index: int = 0, credential: Union[Credential, None] = None, debug: bool = False, ): """ Args: bvid (str | None, optional) : BVID. Defaults to None. aid (int | None, optional) : AID. Defaults to None. page_index (int, optional) : 分 P 序号. Defaults to 0. credential (Credential | None, optional): Credential 类. Defaults to None. debug (bool, optional) : 调试模式,将输出更详细信息. Defaults to False. """ super().__init__() self.credential = credential self.__video = Video(bvid, aid, credential=credential) # 智能选择在 log 中展示的 ID。 id_showed = None if bvid is not None: id_showed = bvid else: id_showed = aid # logger 初始化 self.logger = logging.getLogger(f"VideoOnlineMonitor-{id_showed}") if not self.logger.handlers: handler = logging.StreamHandler() handler.setFormatter( logging.Formatter( "[" + str(id_showed) + "][%(asctime)s][%(levelname)s] %(message)s" ) ) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO if not debug else logging.DEBUG) self.__page_index = page_index self.__tasks = [] async def connect(self): """ 连接服务器 """ await self.__main() async def disconnect(self): """ 断开服务器 """ self.logger.info("主动断开连接。") self.dispatch("DISCONNECTED") await self.__cancel_all_tasks() await self.__ws.close() async def __main(self): """ 入口。 """ # 获取分 P id pages = await self.__video.get_pages() if self.__page_index >= len(pages): raise ArgsException("不存在该分 P。") cid = pages[self.__page_index]["cid"] # 获取服务器信息 self.logger.debug(f"准备连接:{self.__video.get_bvid()}") self.logger.debug(f"获取服务器信息中...") api = API["info"]["video_online_broadcast_servers"] resp = await Api(**api, credential=self.credential).result uri = f"wss://{resp['domain']}:{resp['wss_port']}/sub" self.__heartbeat_interval = resp["heartbeat"] self.logger.debug(f"服务器信息获取成功,URI:{uri}") # 连接服务器 self.logger.debug("准备连接服务器...") session = get_aiohttp_session() async with session.ws_connect(uri) as ws: self.__ws = ws # 发送认证信息 self.logger.debug("服务器连接成功,准备发送认证信息...") verify_info = { "room_id": f"video://{self.__video.get_aid()}/{cid}", "platform": "web", "accepts": [1000, 1015], } verify_info = json.dumps(verify_info, separators=(",", ":")) await ws.send_bytes( self.__pack( VideoOnlineMonitor.Datapack.CLIENT_VERIFY, 1, verify_info.encode() ) ) # 循环接收消息 async for msg in ws: if msg.type == aiohttp.WSMsgType.BINARY: data = self.__unpack(msg.data) self.logger.debug(f"收到消息:{data}") await self.__handle_data(data) # type: ignore elif msg.type == aiohttp.WSMsgType.ERROR: self.logger.warning("连接被异常断开") await self.__cancel_all_tasks() self.dispatch("ERROR", msg) break async def __handle_data(self, data: List[dict]): """ 处理数据。 Args: data (List[dict]): 收到的数据(已解析好)。 """ for d in data: if d["type"] == VideoOnlineMonitor.Datapack.SERVER_VERIFY.value: # 服务器认证反馈。 if d["data"]["code"] == 0: # 创建心跳 Task heartbeat = asyncio.create_task(self.__heartbeat_task()) self.__tasks.append(heartbeat) self.logger.info("连接服务器并验证成功") elif d["type"] == VideoOnlineMonitor.Datapack.SERVER_HEARTBEAT.value: # 心跳包反馈,同时包含在线人数。 self.logger.debug(f'收到服务器心跳包反馈,编号:{d["number"]}') self.logger.info(f'实时观看人数:{d["data"]["data"]["room"]["online"]}') self.dispatch("ONLINE", d["data"]) elif d["type"] == VideoOnlineMonitor.Datapack.DANMAKU.value: # 实时弹幕。 info = d["data"][0].split(",") text = d["data"][1] if info[5] == "0": is_sub = False else: is_sub = True dm = Danmaku( dm_time=float(info[0]), send_time=int(info[4]), crc32_id=info[6], color=info[3], mode=info[1], font_size=info[2], is_sub=is_sub, text=text, ) self.logger.info(f"收到实时弹幕:{dm.text}") self.dispatch("DANMAKU", dm) else: # 未知类型数据包 self.logger.warning("收到未知的数据包类型,无法解析:" + json.dumps(d)) async def __heartbeat_task(self): """ 心跳 Task。 """ index = 2 while True: self.logger.debug(f"发送心跳包,编号:{index}") await self.__ws.send_bytes( self.__pack( VideoOnlineMonitor.Datapack.CLIENT_HEARTBEAT, index, b"[object Object]", ) ) index += 1 await asyncio.sleep(self.__heartbeat_interval) async def __cancel_all_tasks(self): """ 取消所有 Task。 """ for task in self.__tasks: task.cancel() @staticmethod def __pack(data_type: Datapack, number: int, data: bytes): """ 打包数据。 # 数据包格式: 16B 头部: | offset(bytes) | length(bytes) | type | description | | ------------- | ------------- | ---- | ------------------- | | 0 | 4 | I | 数据包长度 | | 4 | 4 | I | 固定 0x00120001 | | 8 | 4 | I | 数据包类型 | | 12 | 4 | I | 递增数据包编号 | | 16 | 2 | H | 固定 0x0000 | 之后是有效载荷。 # 数据包类型表: + 0x7 客户端发送认证信息 + 0x8 服务端回应认证结果 + 0x2 客户端发送心跳包,有效载荷:'[object Object]' + 0x3 服务端回应心跳包,会带上在线人数等信息,返回 JSON + 0x3e8 实时弹幕更新,返回列表,[0]弹幕信息,[1]弹幕文本 Args: data_type (VideoOnlineMonitor.DataType): 数据包类型枚举。 Returns: bytes: 打包好的数据。 """ packed_data = bytearray() packed_data += struct.pack(">I", 0x00120001) packed_data += struct.pack(">I", data_type.value) packed_data += struct.pack(">I", number) packed_data += struct.pack(">H", 0) packed_data += data packed_data = struct.pack(">I", len(packed_data) + 4) + packed_data return bytes(packed_data) @staticmethod def __unpack(data: bytes): """ 解包数据。 Args: data (bytes): 原始数据。 Returns: tuple(dict): 解包后的数据。 """ offset = 0 real_data = [] while offset < len(data): region_header = struct.unpack(">IIII", data[:16]) region_data = data[offset: offset + region_header[0]] real_data.append( { "type": region_header[2], "number": region_header[3], "data": json.loads( region_data[offset + 18: offset + 18 + (region_header[0] - 16)] ), } ) offset += region_header[0] return tuple(real_data) class VideoQuality(Enum): """ 视频的视频流分辨率枚举 - _360P: 流畅 360P - _480P: 清晰 480P - _720P: 高清 720P60 - _1080P: 高清 1080P - _1080P_PLUS: 高清 1080P 高码率 - _1080P_60: 高清 1080P 60 帧码率 - _4K: 超清 4K - HDR: 真彩 HDR - DOLBY: 杜比视界 - _8K: 超高清 8K """ _360P = 16 _480P = 32 _720P = 64 _1080P = 80 _1080P_PLUS = 112 _1080P_60 = 116 _4K = 120 HDR = 125 DOLBY = 126 _8K = 127 class VideoCodecs(Enum): """ 视频的视频流编码枚举 - HEV: HEVC(H.265) - AVC: AVC(H.264) - AV1: AV1 """ HEV = "hev" AVC = "avc" AV1 = "av01" class AudioQuality(Enum): """ 视频的音频流清晰度枚举 - _64K: 64K - _132K: 132K - _192K: 192K - HI_RES: Hi-Res 无损 - DOLBY: 杜比全景声 """ _64K = 30216 _132K = 30232 DOLBY = 30250 HI_RES = 30251 _192K = 30280 @dataclass class VideoStreamDownloadURL: """ (@dataclass) 视频流 URL 类 Attributes: url (str) : 视频流 url video_quality (VideoQuality): 视频流清晰度 video_codecs (VideoCodecs) : 视频流编码 """ url: str video_quality: VideoQuality video_codecs: VideoCodecs @dataclass class AudioStreamDownloadURL: """ (@dataclass) 音频流 URL 类 Attributes: url (str) : 音频流 url audio_quality (AudioQuality): 音频流清晰度 """ url: str audio_quality: AudioQuality @dataclass class FLVStreamDownloadURL: """ (@dataclass) FLV 视频流 Attributes: url (str): FLV 流 url """ url: str @dataclass class HTML5MP4DownloadURL: """ (@dataclass) 可供 HTML5 播放的 mp4 视频流 Attributes: url (str): HTML5 mp4 视频流 """ url: str @dataclass class EpisodeTryMP4DownloadURL: """ (@dataclass) 番剧/课程试看的 mp4 播放流 Attributes: url (str): 番剧试看的 mp4 播放流 """ url: str class VideoDownloadURLDataDetecter: """ `Video.get_download_url` 返回结果解析类。 在调用 `Video.get_download_url` 之后可以将代入 `VideoDownloadURLDataDetecter`,此类将一键解析。 目前支持: - 视频清晰度: 360P, 480P, 720P, 1080P, 1080P 高码率, 1080P 60 帧, 4K, HDR, 杜比视界, 8K - 视频编码: HEVC(H.265), AVC(H.264), AV1 - 音频清晰度: 64K, 132K, Hi-Res 无损音效, 杜比全景声, 192K - FLV 视频流 - 番剧/课程试看视频流 """ def __init__(self, data: dict): """ Args: data (dict): `Video.get_download_url` 返回的结果 """ self.__data = data def check_video_and_audio_stream(self) -> bool: """ 判断是否为音视频分离流 Returns: bool: 是否为音视频分离流 """ if "dash" in self.__data.keys(): return True return False def check_flv_stream(self) -> bool: """ 判断是否为 FLV 视频流 Returns: bool: 是否为 FLV 视频流 """ if "durl" in self.__data.keys(): if self.__data["format"].startswith("flv"): return True return False def check_html5_mp4_stream(self) -> bool: """ 判断是否为 HTML5 可播放的 mp4 视频流 Returns: bool: 是否为 HTML5 可播放的 mp4 视频流 """ if "durl" in self.__data.keys(): if self.__data["format"].startswith("mp4"): if self.__data.get("is_html5") == True: return True return False def check_episode_try_mp4_stream(self): """ 判断是否为番剧/课程试看的 mp4 视频流 Returns: bool: 是否为番剧试看的 mp4 视频流 """ if "durl" in self.__data.keys(): if self.__data["format"].startswith("mp4"): if self.__data.get("is_html5") != True: return True return False def detect_all(self): """ 解析并返回所有数据 Returns: List[VideoStreamDownloadURL | AudioStreamDownloadURL | FLVStreamDownloadURL | HTML5MP4DownloadURL | EpisodeTryMP4DownloadURL]: 所有的视频/音频流 """ return self.detect() def detect( self, video_max_quality: VideoQuality = VideoQuality._8K, audio_max_quality: AudioQuality = AudioQuality._192K, video_min_quality: VideoQuality = VideoQuality._360P, audio_min_quality: AudioQuality = AudioQuality._64K, video_accepted_qualities: List[VideoQuality] = [ item for _, item in VideoQuality.__dict__.items() if isinstance(item, VideoQuality) ], audio_accepted_qualities: List[AudioQuality] = [ item for _, item in AudioQuality.__dict__.items() if isinstance(item, AudioQuality) ], codecs: List[VideoCodecs] = [VideoCodecs.AV1, VideoCodecs.AVC, VideoCodecs.HEV], no_dolby_video: bool = False, no_dolby_audio: bool = False, no_hdr: bool = False, no_hires: bool = False, ) -> List[ Union[ VideoStreamDownloadURL, AudioStreamDownloadURL, FLVStreamDownloadURL, HTML5MP4DownloadURL, EpisodeTryMP4DownloadURL, ] ]: """ 解析数据 Args: **以下参数仅能在音视频流分离的情况下产生作用,flv / mp4 试看流 / html5 mp4 流下以下参数均没有作用** video_max_quality (VideoQuality, optional) : 设置提取的视频流清晰度最大值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._8K. audio_max_quality (AudioQuality, optional) : 设置提取的音频流清晰度最大值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._192K. video_min_quality (VideoQuality, optional) : 设置提取的视频流清晰度最小值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._360P. audio_min_quality (AudioQuality, optional) : 设置提取的音频流清晰度最小值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._64K. video_accepted_qualities(List[VideoQuality], optional): 设置允许的所有视频流清晰度. Defaults to ALL. audio_accepted_qualities(List[AudioQuality], optional): 设置允许的所有音频清晰度. Defaults to ALL. codecs (List[VideoCodecs], optional) : 设置所有允许提取出来的视频编码. 此项不会忽略 HDR/杜比. Defaults to ALL codecs. no_dolby_video (bool, optional) : 是否禁止提取杜比视界视频流. Defaults to False. no_dolby_audio (bool, optional) : 是否禁止提取杜比全景声音频流. Defaults to False. no_hdr (bool, optional) : 是否禁止提取 HDR 视频流. Defaults to False. no_hires (bool, optional) : 是否禁止提取 Hi-Res 音频流. Defaults to False. Returns: List[VideoStreamDownloadURL | AudioStreamDownloadURL | FLVStreamDownloadURL | HTML5MP4DownloadURL | EpisodeTryMP4DownloadURL]: 提取出来的视频/音频流 """ if "durl" in self.__data.keys(): if self.__data["format"].startswith("flv"): # FLV 视频流 return [FLVStreamDownloadURL(url=self.__data["durl"][0]["url"])] else: if self.check_html5_mp4_stream(): # HTML5 MP4 视频流 return [HTML5MP4DownloadURL(url=self.__data["durl"][0]["url"])] else: # 会员番剧试看 MP4 流 return [EpisodeTryMP4DownloadURL(url=self.__data["durl"][0]["url"])] else: # 正常情况 streams = [] videos_data = self.__data["dash"]["video"] audios_data = self.__data["dash"]["audio"] flac_data = self.__data["dash"]["flac"] dolby_data = self.__data["dash"]["dolby"] for video_data in videos_data: video_stream_url = video_data["baseUrl"] video_stream_quality = VideoQuality(video_data["id"]) if video_stream_quality == VideoQuality.HDR and no_hdr: continue if video_stream_quality == VideoQuality.DOLBY and no_dolby_video: continue if ( video_stream_quality != VideoQuality.DOLBY and video_stream_quality != VideoQuality.HDR and video_stream_quality.value > video_max_quality.value ): continue if ( video_stream_quality != VideoQuality.DOLBY and video_stream_quality != VideoQuality.HDR and video_stream_quality.value < video_min_quality.value ): continue if ( video_stream_quality != VideoQuality.DOLBY and video_stream_quality != VideoQuality.HDR and (not video_stream_quality in video_accepted_qualities) ): continue video_stream_codecs = None for val in VideoCodecs: if val.value in video_data["codecs"]: video_stream_codecs = val if (not video_stream_codecs in codecs) and ( video_stream_codecs != None ): continue video_stream = VideoStreamDownloadURL( url=video_stream_url, video_quality=video_stream_quality, video_codecs=video_stream_codecs, # type: ignore ) streams.append(video_stream) if audios_data: for audio_data in audios_data: audio_stream_url = audio_data["baseUrl"] audio_stream_quality = AudioQuality(audio_data["id"]) if audio_stream_quality.value > audio_max_quality.value: continue if audio_stream_quality.value < audio_min_quality.value: continue if not audio_stream_quality in audio_accepted_qualities: continue audio_stream = AudioStreamDownloadURL( url=audio_stream_url, audio_quality=audio_stream_quality ) streams.append(audio_stream) if flac_data and (not no_hires): if flac_data["audio"]: flac_stream_url = flac_data["audio"]["baseUrl"] flac_stream_quality = AudioQuality(flac_data["audio"]["id"]) flac_stream = AudioStreamDownloadURL( url=flac_stream_url, audio_quality=flac_stream_quality ) streams.append(flac_stream) if dolby_data and (not no_dolby_audio): if dolby_data["audio"]: dolby_stream_data = dolby_data["audio"][0] dolby_stream_url = dolby_stream_data["baseUrl"] dolby_stream_quality = AudioQuality(dolby_stream_data["id"]) dolby_stream = AudioStreamDownloadURL( url=dolby_stream_url, audio_quality=dolby_stream_quality ) streams.append(dolby_stream) return streams def detect_best_streams( self, video_max_quality: VideoQuality = VideoQuality._8K, audio_max_quality: AudioQuality = AudioQuality._192K, video_min_quality: VideoQuality = VideoQuality._360P, audio_min_quality: AudioQuality = AudioQuality._64K, video_accepted_qualities: List[VideoQuality] = [ item for _, item in VideoQuality.__dict__.items() if isinstance(item, VideoQuality) ], audio_accepted_qualities: List[AudioQuality] = [ item for _, item in AudioQuality.__dict__.items() if isinstance(item, AudioQuality) ], codecs: List[VideoCodecs] = [VideoCodecs.AV1, VideoCodecs.AVC, VideoCodecs.HEV], no_dolby_video: bool = False, no_dolby_audio: bool = False, no_hdr: bool = False, no_hires: bool = False, ) -> Union[ List[FLVStreamDownloadURL], List[HTML5MP4DownloadURL], List[EpisodeTryMP4DownloadURL], List[Union[VideoStreamDownloadURL, AudioStreamDownloadURL, None]], ]: """ 提取出分辨率、音质等信息最好的音视频流。 Args: **以下参数仅能在音视频流分离的情况下产生作用,flv / mp4 试看流 / html5 mp4 流下以下参数均没有作用** video_max_quality (VideoQuality) : 设置提取的视频流清晰度最大值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._8K. audio_max_quality (AudioQuality) : 设置提取的音频流清晰度最大值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._192K. video_min_quality (VideoQuality, optional) : 设置提取的视频流清晰度最小值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._360P. audio_min_quality (AudioQuality, optional) : 设置提取的音频流清晰度最小值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._64K. video_accepted_qualities(List[VideoQuality], optional): 设置允许的所有视频流清晰度. Defaults to ALL. audio_accepted_qualities(List[AudioQuality], optional): 设置允许的所有音频清晰度. Defaults to ALL. codecs (List[VideoCodecs]) : 设置所有允许提取出来的视频编码. 在数组中越靠前的编码选择优先级越高. 此项不会忽略 HDR/杜比. Defaults to [VideoCodecs.AV1, VideoCodecs.AVC, VideoCodecs.HEV]. no_dolby_video (bool) : 是否禁止提取杜比视界视频流. Defaults to False. no_dolby_audio (bool) : 是否禁止提取杜比全景声音频流. Defaults to False. no_hdr (bool) : 是否禁止提取 HDR 视频流. Defaults to False. no_hires (bool) : 是否禁止提取 Hi-Res 音频流. Defaults to False. Returns: List[VideoStreamDownloadURL | AudioStreamDownloadURL | FLVStreamDownloadURL | HTML5MP4DownloadURL | None]: FLV 视频流 / HTML5 MP4 视频流 / 番剧或课程试看 MP4 视频流返回 `[FLVStreamDownloadURL | HTML5MP4StreamDownloadURL | EpisodeTryMP4DownloadURL]`, 否则为 `[VideoStreamDownloadURL, AudioStreamDownloadURL]`, 如果未匹配上任何合适的流则对应的位置位 `None` """ if self.check_flv_stream(): return self.detect_all() # type: ignore elif self.check_html5_mp4_stream(): return self.detect_all() # type: ignore elif self.check_episode_try_mp4_stream(): return self.detect_all() # type: ignore else: data = self.detect( video_max_quality=video_max_quality, audio_max_quality=audio_max_quality, video_min_quality=video_min_quality, audio_min_quality=audio_min_quality, video_accepted_qualities=video_accepted_qualities, audio_accepted_qualities=audio_accepted_qualities, codecs=codecs, ) video_streams = [] audio_streams = [] for stream in data: if isinstance(stream, VideoStreamDownloadURL): video_streams.append(stream) if isinstance(stream, AudioStreamDownloadURL): audio_streams.append(stream) def video_stream_cmp( s1: VideoStreamDownloadURL, s2: VideoStreamDownloadURL ): # 杜比/HDR 优先 if s1.video_quality == VideoQuality.DOLBY and (not no_dolby_video): return 1 elif s2.video_quality == VideoQuality.DOLBY and (not no_dolby_video): return -1 elif s1.video_quality == VideoQuality.HDR and (not no_hdr): return 1 elif s2.video_quality == VideoQuality.HDR and (not no_hdr): return -1 if s1.video_quality.value != s2.video_quality.value: return s1.video_quality.value - s2.video_quality.value # Detect the high quality stream to the end. elif s1.video_codecs.value != s2.video_codecs.value: return codecs.index(s2.video_codecs) - codecs.index(s1.video_codecs) return -1 def audio_stream_cmp( s1: AudioStreamDownloadURL, s2: AudioStreamDownloadURL ): # 杜比/Hi-Res 优先 if s1.audio_quality == AudioQuality.DOLBY and (not no_dolby_audio): return 1 if s2.audio_quality == AudioQuality.DOLBY and (not no_dolby_audio): return -1 if s1.audio_quality == AudioQuality.HI_RES and (not no_hires): return 1 if s2.audio_quality == AudioQuality.HI_RES and (not no_hires): return -1 return s1.audio_quality.value - s2.audio_quality.value video_streams.sort(key=cmp_to_key(video_stream_cmp), reverse=True) audio_streams.sort(key=cmp_to_key(audio_stream_cmp), reverse=True) if len(video_streams) == 0: video_streams = [None] if len(audio_streams) == 0: audio_streams = [None] return [video_streams[0], audio_streams[0]]